消息队列-MQ

同步通讯和异步通讯

image-20221007163138062

同步调用的问题

微服务间基于Feign的调用属于同步方式,存在一些问题。

image-20221007163406290

image-20221007163727790

image-20221007171324820

总结

同步调用的优点

  • 时效性强、立即获得结果

同步调用的问题

  • 耦合性高
  • 性能和吞吐能力下降
  • 有额外的资源消耗
  • 有级联失败的问题

异步调用方案

异步调用常见的实现是事件驱动模式

image-20221007181054098

异步通信的优点:

  • 优势一:通过事件发布完成服务解耦,耦合度低
  • 优势二:性能提升,吞吐量提高
  • 优势三:服务没有强依赖,不担心级联失败问题,故障隔离
  • 优势四:流量削峰

异步通信的缺点:

  • 依赖于Broken的可靠性,安全性,吞吐能力
  • 架构复杂,业务没有明显的流程线,不易于追踪管理

MQ

MQ(Message Queue)是消息队列,存放消息的队列,就是事件驱动架构的Broker。

image-20221010211020957

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源通信中间件。

RabbitMQ部署

下载docker镜像
1
docker pull rabbitmq:3.9-management
启动RabbitMQ服务
1
2
3
4
5
docker run \
-p 5672:5672 -p 15672:15672 \
--name rabbitmq \
--hostname rabbitmq1 \
-d rabbitmq:3.9-management
访问地址:

http://{ip}:15672/

输入初始的账号密码:guest guset

image-20221018182241708

RabbitMQ的结构和概念

image-20221018182812550

RabbitMQ的概念
channel:操作队列的工具
exchange:路由消息到队列中
queue:缓存消息
virtual:虚拟主机是对queue和exchange等资源的逻辑分组

RabbitMQ常见消息模型

  • 基本消息队列(BasicQueue)
  • 工作消息队列(WorkQueue)
  • 发布订阅(Publish\Subscribe)
    • Fanout Exchange:广播
    • Direct Exchange:路由
    • Topic Exchange:主题

image-20221018220302269

基本消息队列

最基本的消息队列模型只包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息

image-20221018220806394

基本案例

image-20221019101819705

导入依赖

1
2
3
4
5
<!--Spring AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

生产者测试类

image-20221019102020346

消费者测试类

image-20221019102048266

基本消息队列的消息发送流程

  • 建立connect
  • 创建channel
  • 利用channel声明队列
  • 利用channel向队列发送消息

基本消息队列的消息接收流程

  • 建立connect
  • 创建channel
  • 利用channel声明队列
  • 定义consumer的消息行为handleDelivery()
  • 利用channel将消费者和与队列绑定

SpringAMQP

image-20221019103336534

AMQP(Advance Message Queuing Protocol),用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。

Spring AMQP是基于AMQP协议定义的一套 API规范,提供了模板来发送和接收消息,包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

image-20221019141304343

简单模式

简单模式是最简单的消费模式,它包含一个生产者,一个消费者和一个队列。生产者向队列里发送消息。消费者从队列中获取消息并消费。

img

image-20221019142246296

代码实现

在父工程中引入spring-amqp依赖
1
2
3
4
5
<!--Spring AMQP依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写application.yml配置文件
1
2
3
4
5
6
7
spring:
rabbitmq:
host: 121.37.84.213
port: 5672
username: guest
password: guest
virtual-host: /
注入RabbitTemplate完成消息的发送
1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
public class SimpleSender {
@Autowired
private RabbitTemplate rabbitTemplate;

public static final String QUEUE_NAME = "simple.queue";

public void send() {
String message = "hello rabbitmq";
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
log.info("send message: {}", message);
}
}
通过@RabbitListener注解接受方法
1
2
3
4
5
6
7
8
9
10
11
@Slf4j
@RabbitListener(queues = "simple.queue")
public class SimpleReceiver {
@Autowired
private RabbitTemplate rabbitTemplate;

@RabbitHandler
public void receive(String msg) throws InterruptedException {
log.info("receive : {}", msg);
}
}
通过controller测试发送和接受方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/rabbit")
@Api(tags = "RabbitController", description = "RabbitMQ Amqp功能测试控制器")
public class SimpleController {
@Resource
private SimpleSender simpleSender;

@ApiOperation(value = "简单模式消息队列发送消息")
@GetMapping(value = "/simpleSend")
public CommonResult simpleSend() {
for (int i = 0; i < 10; i++) {
simpleSender.send();
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

总结

image-20221025152509407

image-20221025154616188

工作模式

工作模式是指向多个相互竞争的消费者发送消息的模式,它包含一个生产者、两个消费者和一个队列。两个消费者同时绑定到一个队列上,当消费者获取消息处理耗时任务时,空闲的消费者从队列中获取并消费消息。

代码实现

img

添加相关配置,注入队列、生产者和消费者
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Configuration
public class WorkRabbitConfig {
@Bean
public Queue workQueue() {
return new Queue("work.queue");
}

@Bean
public WorkReceiver workReceiver1() {
return new WorkReceiver(1);
}

@Bean
public WorkReceiver workReceiver2() {
return new WorkReceiver(2);
}

@Bean
public WorkSender workSender() {
return new WorkSender();
}
}
生产者通过send方法向队列中发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class WorkSender {
@Autowired
private RabbitTemplate rabbitTemplate;

private static final String QUEUE_NAME = "work.queue";

public void send(int index) {
StringBuilder sb = new StringBuilder("hello work");
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
sb.append(".");
}
sb.append(index + 1);
String message = sb.toString();
rabbitTemplate.convertAndSend(QUEUE_NAME, message);
log.info("send message: {}", message);
}
}
消费者从队列中获取消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
@RabbitListener(queues = "work.queue")
public class WorkReceiver {
private final int instance;

public WorkReceiver(int instance) {
this.instance = instance;
}

@RabbitHandler
public void receive(String message) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("instance {} receive {}", this.instance, message);
doWork(message);
stopWatch.stop();
log.info("instance {} Done in {}", this.instance, stopWatch.getTotalTimeMillis());
}

private void doWork(String message) {
for (char ch : message.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

总结

image-20221025162106610

发布订阅模式(Publish/Subscribe)

发布订阅模式与之前模式的区别是允许将同一消息发送给多个消费者,实现方式是加入exchange(交换机)。

发布/订阅模式是指同时向多个消费者发送消息的模式(类似广播的形式),它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列中去,两个队列绑定到交换机上去,生产者通过发送消息到交换机,所有消息者接收并消费消息。

Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue。

img

代码实现

image-20221030221012471

添加发布/订阅模式相关Java配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 发布/订阅消息队列配置类
* Created by YuanJW on 2022/10/27.
*/
@Configuration
public class FanoutRabbitConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}

@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}

@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}

@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1)
.to(fanoutExchange);
}

@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2)
.to(fanoutExchange);
}

@Bean
public FanoutSender fanoutSender() {
return new FanoutSender();
}

@Bean
public FanoutReceiver fanoutReceiver() {
return new FanoutReceiver();
}
}
生产者向交换机发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class FanoutSender {
@Autowired
private RabbitTemplate rabbitTemplate;

private static final String EXCHANGE_NAME = "fanout.exchange";

public void send(int index) {
StringBuilder sb = new StringBuilder();
int limitIndex = index % 3 + 1;
for (int i = 0; i < limitIndex; i++) {
sb.append('.');
}
sb.append(index);
String message = sb.toString();
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", message);
log.info("send:{}", message);
}
}
消费者从绑定的匿名队列中获取消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class FanoutReceiver {
@RabbitListener(queues = "fanout.queue1")
public void receive1(String message) {
receive(message, 1);
}

@RabbitListener(queues = "fanout.queue2")
public void receive2(String message) {
receive(message, 2);
}

private void receive(String message, int receiver) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("instance {} receive {}", receiver, message);
doWork(message);
stopWatch.stop();
log.info("instance {} done in {}s", receiver, stopWatch.getTotalTimeMillis());
}

private void doWork(String message) {
for (char ch : message.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}
controller中添加测试接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@RestController
@RequestMapping("/rabbit")
@Api(tags = "RabbitController", description = "RabbitMQ Amqp功能测试控制器")
public class RabbitController {
@Resource
private FanoutSender fanoutSender;

@ApiOperation(value = "发布/订阅模式消息队列发送消息")
@GetMapping(value = "/fanout")
public CommonResult fanout() {
for (int i = 0; i < 10; i++) {
fanoutSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

总结

交换机的作用:
  • 接收publisher发送的消息
  • 将消息按照规则路由到与之绑定的队列
  • 不能缓存消息,路由失败,消息丢失
  • FanoutExchange的会将消息路由到每个绑定的队列

image-20221030222616265

路由模式

路由模式是可以根据路由键选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。两个消费者同时绑定到不同的队列上去,两个队列通过路由键绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键转发到不不同的队列中,队列绑定的消费者接收并消费消息。

img

DirectExchange

image-20221030224244880

代码实现

添加路由模式相关Java配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
public class DirectRabbitConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}

@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}

@Bean
public Queue directQueue2() {
return new Queue("direct.queue2");
}

@Bean
public Binding directBindingRed1(DirectExchange directExchange, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("red");
}

@Bean
public Binding directBindingBlack1(DirectExchange directExchange, Queue directQueue1) {
return BindingBuilder.bind(directQueue1).to(directExchange).with("black");
}

@Bean
public Binding directBindingYellow2(DirectExchange directExchange, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");
}

@Bean
public Binding directBindingBlack2(DirectExchange directExchange, Queue directQueue2) {
return BindingBuilder.bind(directQueue2).to(directExchange).with("black");
}

@Bean
public DirectSender directSender() {
return new DirectSender();
}

@Bean
public DirectReceiver directReceiver() {
return new DirectReceiver();
}
}
生产者使用不同的路由键发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class DirectSender {
@Autowired
private RabbitTemplate rabbitTemplate;

private static final String directExchange = "direct.exchange";

private final String[] keys = {"red", "yellow", "green", "black"};

public void send(int index) {
StringBuilder sb = new StringBuilder();
int limitIndex = index % 4;
String key = keys[limitIndex];
sb.append(key).append(' ').append(index + 1);
String message = sb.toString();
rabbitTemplate.convertAndSend(directExchange, key, message);
log.info("send:{}", message);
}
}
消费者从绑定的队列中获取消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Slf4j
public class DirectReceiver {
@RabbitListener(queues = "direct.queue1")
public void receive1(String message) {
receive(message, 1);
}

@RabbitListener(queues = "direct.queue2")
public void receive2(String message) {
receive(message, 2);
}

public void receive(String message, int receive) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("instance {} receive : {}", receive, message);
doWork(message);
stopWatch.stop();
log.info("instance {} done in {}s", receive, stopWatch.getTotalTimeMillis());
}

public void doWork(String message) {
for (char ch : message.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}
使用@RabbitListener注入和绑定
1
2
3
4
5
6
7
8
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"green", "black"}
))
public void receive3(String message) {
receive(message, 3);
}
controller中添加测试接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("/rabbit")
@Api(tags = "RabbitController", description = "RabbitMQ Amqp功能测试控制器")
public class RabbitController {
@Resource
private DirectSender directSender;

@ApiOperation(value = "路由模式消息队列发送消息")
@GetMapping(value = "/direct")
public CommonResult direct() {
for (int i = 0; i < 10; i++) {
directSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

总结

image-20221031180227318

通配符模式

通配符模式是可以根据路由键匹配规则选择性给多个消费者发送消息的模式,它包含一个生产者、两个消费者、两个队列和一个交换机。

两个消费者同时绑定到不同的队列上去,两个队列通过路由键匹配规则绑定到交换机上去,生产者发送消息到交换机,交换机通过路由键匹配规则转发到不同的队列,队列绑定的消费者接收并消费消息。

TopicExchange

TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割

image-20221101162655587

特殊匹配符号

  • *:只能匹配一个单词
  • #:可以匹配零个或多个单词

img

代码实现

添加路由模式相关配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@Configuration
public class TopicRabbitConfig {
@Bean
public TopicExchange TopicExchange() {
return new TopicExchange("topic.exchange");
}

@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}

@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}

@Bean
public Queue topicQueue3() {
return new Queue("topic.queue3");
}

@Bean
public Binding TopicBinding1a(TopicExchange topicExchange, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("a.#");
}

@Bean
public Binding TopicBinding1b(TopicExchange topicExchange, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1).to(topicExchange).with("*.b.*");
}

@Bean
public Binding TopicBinding2b(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2).to(topicExchange).with("*.*.c");
}

@Bean
public TopicSender topicSender() {
return new TopicSender();
}

@Bean
public TopicReceiver topicReceiver() {
return new TopicReceiver();
}
}
路由模式生产者发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Slf4j
public class TopicSender {
@Autowired
private RabbitTemplate rabbitTemplate;

public static final String TOPIC_EXCHANGE = "topic.exchange";

public final String[] keys ={"a.z.1", "1.b.2", "1.3.c", "2.d.1"};

public void send(int index) {
StringBuilder sb = new StringBuilder();
int limitIndex = index % keys.length;
String key = keys[limitIndex];
sb.append(key).append(' ').append(index + 1);
String message = sb.toString();
rabbitTemplate.convertAndSend(TOPIC_EXCHANGE, key, message);
log.info("send:{}", message);
}
}
路由模式生产者接收消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Slf4j
public class TopicReceiver {
@RabbitListener(queues = "topic.queue1")
public void receive1(String message) {
receive(message, 1);
}
@RabbitListener(queues = "topic.queue2")
public void receive2(String message) {
receive(message, 2);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue3"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "#.d.#"
))
public void receive3(String message) {
receive(message, 3);
}

public void receive(String message, int receive) {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
log.info("instance {} receive : {}", receive, message);
doWork(message);
stopWatch.stop();
log.info("instance {} done in {}s", receive, stopWatch.getTotalTimeMillis());
}

private void doWork(String in){
for (char ch : in.toCharArray()) {
if (ch == '.') {
ThreadUtil.sleep(1000);
}
}
}
}

image-20221101170653303

controller中添加测试接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RestController
@RequestMapping("/rabbit")
@Api(tags = "RabbitController", description = "RabbitMQ Amqp功能测试控制器")
public class RabbitController {
@Resource
private DirectSender directSender;

@ApiOperation(value = "适配器模式消息队列发送消息")
@GetMapping(value = "/topic")
public CommonResult topic() {
for (int i = 0; i < 10; i++) {
topicSender.send(i);
ThreadUtil.sleep(1000);
}
return CommonResult.success(null);
}
}

image-20221101170301498

MessageConverter消息转换器

默认情况下,消息体被转换为二进制的数据方式进行传输,SpringAMQP将对象通过JDK的方式进行序列化,这种序列化方式存在一些问题:性能比较差,安全性存在隐患。

基于JDK的ObjectOutputStream进行序列化

新建队列

image-20221101175545469

对指定队列发送消息
1
2
3
4
5
6
7
public void sendObject() {
Map<String, Object> map = new HashMap<>();
map.put("name", "test");
map.put("age", 20);
rabbitTemplate.convertAndSend(OBJECT_QUEUE, map);
log.info("send message: {}", map);
}

image-20221101173403725

使用JSON方式进行序列化

image-20221101175642565

引入依赖
1
2
3
4
5
6
<!-- Jackson -->
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.11.4</version>
</dependency>
注入MessageConvert
1
2
3
4
5
6
7
@Configuration
public class JacksonConfig {
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
}

image-20221101174602598