RocketMQ

RocketMQ入门

消息队列

消息队列中间件 - Message Queue 是分布式系统中重要的组建,主要解决 应用解耦、异步消息、流量削峰 等问题。

消息队列实现了高性能、高可用、可伸缩和最终一致性架构,是大型分布式系统不可缺少的中间件。

目前主流的消息队列有 RocketMQ、Kafka、RabbitMQ、ActiveMQ等。本文将主要介绍 RocketMQ

消息队列的作用

  • 异构系统之间的调用解耦
  • 基于”发布订阅”机制的数据分发
  • 异步消息处理
  • 削峰限流

消息队列选型

Kafka

Apache Kafka 是一个开源流处理平台,是一个高吞吐的分布式发布订阅消息系统

使用场景:大流量应用、对消息延迟不敏感的场景

RabbitMQ

RabbitMQ 开源的消息队列系统,实现了高级消息队列协议(AMQP)

使用场景:企业级别内部应用,数据可靠性高,对于并发和延迟不敏感的场景

RocketMQ

Apache RocketMQ 由阿里巴巴集团贡献的开源分布式消息中间件

适用场景:低延迟应用,瞬间大流量处理效率不如kafka

RocketMQ概述

Apache RocketMQ 是一款分布式消息中间件系统,它最初由阿里巴巴集团开发并开源。RocketMQ 的设计目标是提供低延迟、高可用性、高吞吐量、高可靠性以及水平可扩展的消息传递服务。

Apache RocketMQ 官方文档:https://rocketmq.apache.org/zh/docs/

RocketMQ特性

  • 高可用:支持集群和水平扩容,实现负载均衡和高可用
  • 高性能:支持过亿级别的消息处理
  • 高可靠:支持消息持久化、失败重试、消息回溯等机制,确保消息的可靠性
  • 功能丰富:支持异步消息、同步消息、顺序消息、事务消息等功能

RocketMQ基本概念

生产者

生产者和生产者组

生产者(Producer)是 RocketMQ 系统中用来构建并传输消息到 RocketMQ服务端 的运行实体。

生产者组(ProducerGroup) 是 区分不同业务类型生产者的逻辑概念。

主题

主题(Topic)是消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息,主题通过TopicName来做唯一标识和区分。

消息

消息(Message)是 RocketMQ 中的最小数据传输单元。每个Message都必须指定Topic,RocketMQ 服务端将消息投递到消费端进行消费。

消息标签

消息标签(Message Tag)是 RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。

消息位点

消息按照到达 RocketMQ服务端的先后顺序进行存储在指定topic的队列中,每个消息在所在队列中都有一个唯一的long类型的坐标,用于快速查找消息的位置。

消息队列

消息队列(Message Queue)是 RocketMQ 中消息存储和传输的实际容器,也是消息的最小存储单元。

RocketMQ所有主题由多个队列组成(默认4个),以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。队列采用 FIFO - 先进先出 的模式传输,负责向消费者 Push 推送消息,或者由消费者直接 Pull 拉取消息。

消费者和消费者组

消费者(Consumer)是 RocketMQ 用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息并进行业务逻辑处理。

消费者组(ConsumerGroup)是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组的一个逻辑概念。在消费者组中初始化多个消费者实现消费性能的水平扩展以及高可用容灾。一个消费者组可以订阅多个 topic。如果一个主题的消息被多个消费者组订阅,每个消费者组都会独立地消费该消息,即每个消费都会被不同的消费者消费一遍。

消费点位

消费点位(ComsumeOffset)RocketMQ 会记录每个消费者组在一个特定主题(Topic)和队列(Queue)上的消费进度,是消费者组在某个队列上已经成功消费的消息的偏移量,用于快速定位该消费者组下次消费的起点地址。

订阅关系

订阅关系(Subscription)是 RocketMQ 定义了消费者获取消息、处理消息的规则和状态配置。

订阅关系由消费者组动态注册到 RocketMQ服务端,并在后续的消息投递过程中,RocketMQ broker按照注册上来的订阅关系进行消息过滤匹配和消费进度的维护。

RocketMQ基本工作流程

image-20240121182823616
  • Name Server启动,作为路由控制中心和topic的配置信息,负责维护Broker的路由信息,提供轻量级的服务注册发现和路由功能。Producer和Consumer都需要和Name Server交互来获取路由信息。
  • Broker和Namesrv建立连接注册信息(包括ip、端口、所存储topic信息)到namesrv,并定时(默认30秒)发送心跳包,用于健康监控(120秒没有收到用于心跳包,则broker会被namesrv剔除)。
  • Topic用于消息的分类,Topic是消息发布的地址,Producer将消息发送到特定的Topic,而Consumer从Topic接收消息。Topic可以手动添加,也可以在发送消息时由RocketMQ服务端自动创建(自动创建会根据配置文件中指定的默认队列数量分配相应数量的队列,并负载均衡或者其他算法分配到不同的broker节点中)。
  • Producer首先会从Name Server获取Broker的路由信息,然后根据路由信息将消息发送到指定的Broker中。
  • Consumer首先会从Name Server获取路由信息来找到对应的Topic所在的Broker中,然后从Broker订阅消息,拉取消息进行消费。

RocketMQ部署

image-20240117222521555

RocketMQ拓扑结构:

image-20240114190733792

在部署中,有三个重要的组建,分别是 namesrv、broker 和 console。它们承担着不同的角色和功能

  • namesrv:存储和管理整个RocketMQ集群中topic的路由信息,维护了一个topic到broker的映射关系。
    • 在producer发送消息时或者consumer消费消息时,需要向namesrv查询路由信息,以确定消息的发送或消费的broker。
  • broker:RocketMQ的消息存储和消息传递的核心组件,负责存储producer发送的消息,并且将这些消息推送给consumer。
  • console:RocketMQ的控制台即可视化管理界面,用于监控和管理RocketMQ集群。
    • 通过控制台,管理者可以查看集群的运行状态,监控息的发送和消费情况,管理topic和consumer等。

单节点 RocketMQ 部署

运行前 需要设置挂载目录读写权限

mkdir -p /mydata/rocketmq/namesrv

chmod 777 -R /mydata/rocketmq/namesrv/*

mkdir -p /mydata/rocketmq/broker

chmod 777 -R /mydata/rocketmq/broker/*

docker-compose.yml文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
version: '3'
services:
rocketmq_namesrv:
image: apache/rocketmq
container_name: rocketmq_namesrv
restart: always
privileged: true
volumes:
- /mydata/rocketmq/namesrv/logs:/home/rocketmq/logs
command: [ "sh","mqnamesrv" ]
environment:
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -Xms256m -Xmx256m -Xmn128m"
ports:
- 9876:9876
rocketmq_broker:
image: apache/rocketmq
container_name: rocketmq_broker
restart: always
privileged: true
depends_on:
- rocketmq_namesrv
command: [ "sh","mqbroker","-c","/home/rocketmq/broker.conf","autoCreateTopicEnable=true" ]
environment:
NAMESRV_ADDR: "rocketmq_namesrv:9876"
JAVA_OPT_EXT: "-Duser.home=/home/rocketmq -server -Xms128m -Xmx128m -Xmn128m"
volumes:
- /mydata/rocketmq/broker/logs:/home/rocketmq/logs
- /mydata/rocketmq/broker/store:/home/rocketmq/store
- /mydata/rocketmq/broker/conf/broker.conf:/home/rocketmq/broker.conf # broker.conf 配置参考: https://github.com/apache/rocketmq/blob/master/distribution/conf/broker.conf
ports:
- 10909:10909
- 10911:10911
rocketmq_dashboard:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-dashboard
restart: always
privileged: true
depends_on:
- rocketmq_namesrv
environment:
JAVA_OPTS: "-Xmx256m -Xms256m -Xmn128m -Drocketmq.namesrv.addr=rocketmq_namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
ports:
- 11880:8080

其中 broker.conf 配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 集群名称
brokerClusterName = DefaultCluster
# broker 名称
brokerName = broker-a
# broker IP地址
brokerIP1=xx.xxx.xxx.xxx
# brokerId 0为master 大于0为slave
brokerId = 0
# 每日凌晨4点删除过期日志
deleteWhen = 04
# 日志文件保留时间 默认48小时
fileReservedTime = 48
# broker主从复制策略(默认:ASYNC_MASTER)ASYNC_MASTER-异步复制Master SYNC_MASTER-同步双写Master
brokerRole = ASYNC_MASTER
# broker刷盘策略(默认:ASYNC_FLUSH):ASYNC_MASTER-异步刷盘(性能好,但宕机可能会导致数据丢失) ASYNC_FLUSH-同步刷盘(性能差,但不会导致数据丢失)
flushDiskType = ASYNC_FLUSH
# 是否允许 broker 在当前topic不存在时自动创建 topic (线上环境设置为 false)
autoCreateTopicEnable=true
# 是否允许 broker 自动创建订阅组(线上环境设置为 false)
# 自动创建时读写队列数量
defaultTopicQueueNums=4
# 开启SQL属性过滤
enablePropertyFilter=true

同步复制和异步复制

1
2
# broker主从辅助策略(默认:ASYNC_MASTER)ASYNC_MASTER-异步复制Master SYNC_MASTER-同步双写Master
brokerRole = ASYNC_MASTER
同步复制(默认)
  • 特点:生产者发送消息,在主节点接收到消息后,必须等待所有从节点都成功写入该消息即等待从从节点完成数据同步,主节点才会响应结果给生产者

  • 优点:数据一致性好,从节点数据与主节点完全一致,不会产生丢失数据的风险

  • 缺点:同步阻塞,性能相对较差

  • 应用场景:同步复制适用于强一致性要求较高,不能容忍数据丢失的场景,例如关键业务数据变更,数据安全性要求高的。

    image-20240121202223163
异步复制
  • 特点:生产者发送消息,在主节点接收到消息后,不需要等待从节点写入成功,直接响应结果给生产者

  • 优点:性能较好,主节点不需要等待从节点确认,可以更快地响应结果

  • 缺点:存在一定的数据不一致性,主从节点数据可能在一段时间内存在差异

  • 应用场景:异步复制适用于对一致性要求较低,追求吞吐量,可以容忍一定的时间内不一致和数据丢失的风险的场景

    image-20240121152510526

RocketMQ dashboard使用

image-20240121135503401

RabbitMQ高可用方案

高可用架构

生产使用高可用的多主多从的拓扑结构

image-20240121202426604

多个namesrv节点可以提高系统的可用性和容错性。多主多从的broker节点,主broker节点可以处理读写请求,从broker节点只能处理读请求,从节点用于备份和提高可用性。在主从broker之间,数据通过日志 commitlog 进行同步,

高可用实现

  • 当一个主broker-a节点挂了则停止服务,不会接受新的消息,rocketmq默认不会自动主从切换,此时生产者无法从rocketmq中消费消息,利用重试机制切换到其从节点消费消息,已经被同步到其从节点broker-slave-a的消息可以正常被消息者消费。

    在同步复制的模式下,消息数据不会丢失,而在异步复制的模式下,主节点可能还有消息没有同步,可能会导致消息数据丢失的问题。

    当生产者向从broker-a节点发送消息时,broker-a无法响应,生产者触发重试机制,向另一个从broker-b发送消息,从而保证高可用。

    主broker-a超过120秒没有上报信息到namesrv时,namesrv将剔除主broker-a的信息。

  • 当主从集群 broker-a 和 broker-slave-a 同时挂掉,服务无法响应读写请求,无法接受也无法消费消息,但是数据化持久化,消息不会丢失。新消息会被发到其他节点上。

  • 当 namesrv-a节点挂掉,生产者和消费者无法获取broker的路由信息,会轮询尝试到另一个namesrv节点获取。

  • 当 broker-a主从集群和 namesrv-a之间的网络阻塞的请求,导致 namesrv节点之间数据不一致,此时一个消费者从namesrv-a中获取路由信息,会导致broker-a主从集群产生的消息无法消费,持续挤压,解决方案只有调试网络或者动态不停机的修改消费者配置。

Java应用接入RocketMQ

引入 RocketMQ 客户端

1
2
3
4
5
6
7
<!-- rocketmq -->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>

新建测试类发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class SimpleTest {


private static final Logger logger = LoggerFactory.getLogger(RocketMQApp.class);

@Test
public void testProduce() {
// 创建 生产者为 producerGroup1 的 DefaultMQProducer 消息生产者对象
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producerGroup1");
// 设置 消息生产者对象 的 namesrv 地址 (多个节点间用分号分割)
defaultMQProducer.setNamesrvAddr("xx.xxx.xxx.xxx:9876");
try {
// 生产者启动,与 namesrv 建立长连接查询路由,与 topic 所处的 broker 建立长连接用于发送消息
defaultMQProducer.start();
// 创建 message
Message message = new Message ("simple", "hello wolrd!".getBytes());
SendResult sendResult = defaultMQProducer.send(message);
logger.info("send message:{}", sendResult);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
// 关闭长连接
defaultMQProducer.shutdown();
} catch (Exception e) {
// 防止 start 建立连接失败
logger.error(e.getMessage());
}
}
}

@Test
public void testComsume() {

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("comsumerGroup1");
defaultMQPushConsumer.setNamesrvAddr("xx.xxx.xxx.xxx:9876");
try {
// 消息订阅 subExpression * 表示 所有 tags
defaultMQPushConsumer.subscribe("simple", "*");
// 注册消息的监听器 - 监听 broker 推送的消息并处理
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
/**
*
* @param msgs msgs.size() >= 1<br> DefaultMQPushConsumer.consumeMessageBatchMaxSize=1,you can modify here
* 消息批量推送 提高消息消费的吞吐量
* @param context
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
logger.info("consume message:{}", content);
}
// 返回接收成功标识
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消费者启动,与namesrv建立长连接查询路由,与 topic 所处的 broker 建立长连接,开启监听消息的推送
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}

try {
new CountDownLatch(1).await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}
}

RocketMQ功能特性

普通消息

普通消息为 Apache RocketMQ 中最基础的消息,本章将介绍普通消息的应用场景、功能原理、使用方法和使用建议。

应用场景

普通消息一般应用于微服务解耦、事件驱动、数据集成等场景。这些场景要求消息的可靠性,但是对消息的处理时机、处理顺序没有特别要求。

  • 异步解耦

    在线消息处理

    上游系统将一个一个业务事件封装成独立的普通消息并发送至 RocketMQ服务端,下游按需订阅 RocketMQ服务端 消息,拉取到消息后按照本地消费逻辑处理下游任务。

  • 数据集成

    数据传输

    以日志收集场景为例,日志信息转发到 RocketMQ,每条消息即是一段日志数据,RocketMQ 不用不做任何处理,将日志数据可靠的投递到下游的存储系统和分析系统等,后续由不同系统处理即可。

功能原理

普通消息是 rocketmq 的基本功能,支持生产者和消费者的异步解耦通信

普通消息生命周期

生命周期

  • 初始化 - Initialized:消息被生产者构建并完成初始化,待发送到RocketMQ服务端的状态
  • 待消费 - Ready:消息被发送到RocketMQ服务端,等待消费者消费的状态
  • 消费中 - Inflight:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程
    • 此过程RocketMQ服务端会等待消费者完成消费并提交消费结果至RocketMQ服务端
    • 如果一定时间后没有收到消费者响应,RocketMQ会对消息进行重试处理
  • 消息确认:消费者完成消费处理,并向RocketMQ服务端提交消费结果,服务端标记当前消息的处理状态(包括消费成功和失败)
    • RocketMQ 默认持久化所有消息,被消费的消息不会立即删除,而是进行逻辑标记消费状态
    • 当消息在保存时间到期或者存储空间不足被删除前,消费者支持回溯消息来重新消费
  • 消息删除:RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public class NormalTest {

private static final Logger logger = LoggerFactory.getLogger(RocketMQApp.class);

@Test
public void testProduce() {
// DefaultMQProducer 用于发送非事务消息
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producer-group-normal");
// 设置 namesrv 地址
defaultMQProducer.setNamesrvAddr("xx.xxx.xxx.xxx:9876");
// 异步发送失败后重试2次 消息高可靠性的处理方式
defaultMQProducer.setRetryTimesWhenSendAsyncFailed(2);
try {
// 生产者启动
defaultMQProducer.start();
// 创建 message
Message message = new Message ("simple", "default-tag", "1", "hello wolrd!".getBytes());
SendResult sendResult = defaultMQProducer.send(message);
logger.info("send message:{}", sendResult);
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
// 关闭长连接
defaultMQProducer.shutdown();
} catch (Exception e) {
// 防止 start 建立连接失败
logger.error(e.getMessage());
}
}
}

@Test
public void testComsume() {

DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("producer-group-normal");
defaultMQPushConsumer.setNamesrvAddr("xx.xxx.xxx.xxx:9876");
try {
// 消息订阅 subExpression * 表示 所有 tags
defaultMQPushConsumer.subscribe("simple", "*");
// 注册消息的监听器 - 监听 broker 推送的消息并处理
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
logger.info("consume message:{}", content);
}
// 返回接收成功标识
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 消费者启动,与namesrv建立长连接查询路由,与 topic 所处的 broker 建立长连接,开启监听消息的推送
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}

try {
new CountDownLatch(1).await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}
}

使用建议

设置全局唯一业务key,方便问题追踪和定位

image-20240125200801026

定时/延迟消息

定时/延时消息是 Apache RocketMQ 中的高级特性消息,是生产者将消息发送给broker后,broker不会立即将消息投递给具体的消费者,而是在一定的时间后或者指定的某个时间点再进行消息的投递

定时和延时的本质是一样的,服务端都是根据消息设置的定时时间在某一个固定的时刻将消息投递给消费者消费。

应用场景

定时/延迟消息可以应用于分布式定时调度、任务超时处理等场景。

  • rocketmq5.x 支持任意时间的消息 怎么没有

功能原理

使用限制

使用示例

使用建议

顺序消息

应用场景

在有序事件处理,例如订单消息处理,订单需要严格按照 “订单创建->财务结算->物流消息” 等环节的顺序,以避免混乱或错误的处理。

当使用普通消息时,消息发送到不同的队列中,不同队列的消息积压或者其他因素会导致消息被投递到消费者消费的时机不同,这时就会导致消费没有按照顺序执行。

image-20240124132737720

功能原理

顺序消息是 RocketMQ 提供的一种高级消息类型,支持消费者按照生产者发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。

  • 分区有序消息和全局有序消息 消息组

事务消息

RocketMQ的事务消息支持在分布式场景下保障消息生产本地事务的最终一致性。

应用场景

分布式事务需要保证核心业务和多个下游业务的执行结果的完全一致。

事务消息诉求

基于普通消息方案,本地事务和消息的生产无法保证一致性

  • 先执行本地事务,再生产消息
    • 过程:执行本地事务,再生产消息,通过消息生产回执来判断消息生产的状态,成功则提交事务,失败则回滚事务
    • 问题 - 阻塞:当出现网络问题时,消息生产回执响应太慢,保证本地事务无法提交,方法响应时间长。在大并发的场景下,数据库连接池被耗尽,大量请求积压连接池等待,请求向前积压到tomcat连接池,网关等,导致雪崩,整个系统不可用
  • 先生产消息:再执行本地事务:
    • 过程:先发送消息,消息发送成功后,再进行本地事务的执行和提交。
    • 问题 - 回滚:当本地事务的执行失败并回滚时,则已经发送的消息需要被撤销,但普通消息无法撤销消息。

RocketMQ 提供的分布式事务消息可以保证应用本地事务消息生产最终一致性

上述普通消息无法保证一致性的本质原因是 普通消息无法具备提交、回滚和统一协调的能力。

而基于 RocketMQ 的分布式事务消息,将 本地消息和本地事务 绑定,实现全局提交结果的一致性。

image-20240126091647924

功能原理

事务消息流程

事务消息

  1. 生产者将消息发送至RocketMQ服务器
  2. RocketMQ服务将消息持久化成功后,向生产者返回ACK确认消息已经发送成功,但是此时消息被标记为半事务消息 - half message,在这种状态下消息”暂时不能投递”
  3. 生产者收到ACK确认消息后,开始执行本地事务
  4. 生产者向RocketMQ服务器提交本地事务的执行结果(commit|rollback),RocketMQ服务器根据执行结果做出不同的处理
    • 当提交的本地事务执行结果为 commit,RocketMQ 服务器将消息从半事务状态标记为可投递状态,并投递给消费者
    • 当提交的本地事务执行结果为 rollback,RocketMQ 服务器将不会对应事务的半事务消息投递给消费者
  5. 在断网或者生产者应用重启等特殊情况下,若 RocketMQ服务器没有收到生产者的本地事务的执行结果 或者 收到的结果为 UNKNOWN -未知状态。经过固定时间后,RocketMQ服务器对消息生产者集群的任一实例发起消息进行回查(回查的时间间隔和最大回查次数可以配置)。
  6. 任一生产者实例收到回查后,检查对应消息的本地事务的执行的最终结果。并将检查到的本地事务的最终状态再次提交,rocketMQ服务器按照步骤4对于半事务消息进行处理。
事务消息生命周期
image-20240126101352640

相比于,事务消息的生命周期多了事务待提交和消息回滚的周期,

  • 半事务消息在发送到RocketMQ服务器中时,不会直接被持久化到服务端,而是会被单独存储在事务存储系统中,此时消息对消费者不可见,等待二阶段本地事务执行后再返回结果。
  • 消息回滚,在本地事务执行结果为rollback时,RocketMQ会将半事务消息回滚,该事务消息流程终止。

使用示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
public class TransactionTest extends CommonTest {

@Test
public void produce() {
// 构建事务消息生产者 TransactionMQProducer
TransactionMQProducer transactionMQProducer = new TransactionMQProducer("produce_group_transaction");
transactionMQProducer.setNamesrvAddr("xx.xxx.xxx.xx:9876");

// cachedThreadPool 用于本地事务回查
ExecutorService cachedThreadPool = Executors.newCachedThreadPool(r -> {
// 重写线程工厂的构造方法 定义线程名称
Thread thread = new Thread(r);
thread.setName("check-transaction-thread");
return thread;
});
// 设置 本地事务连接池 用于回查
transactionMQProducer.setExecutorService(cachedThreadPool);

// 设置 本地事务监听器 同于执行代码
transactionMQProducer.setTransactionListener(new DemoTransactionListener());

try {
transactionMQProducer.start();
Message message = new Message("transaction", "transaction-1001",
"1001", "transaction json message 1001".getBytes());
logger.info("发送事务消息...");
// 使用 sendMessageInTransaction 发送事务消息
transactionMQProducer.sendMessageInTransaction(message, null);

} catch (MQClientException e) {
throw new RuntimeException(e);
} finally {
try {
// 关闭长连接
transactionMQProducer.shutdown();
} catch (Exception e) {
// 防止 start 建立连接失败
logger.error(e.getMessage());
}

}
}

@Test
public void consume() {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("producer-group-transaction");
defaultMQPushConsumer.setNamesrvAddr("xx.xxx.xxx.xxx:9876");
try {
// 消息订阅 subExpression * 表示 所有 tags
defaultMQPushConsumer.subscribe("transaction", "*");
// 注册消息的监听器 - 监听 broker 推送的消息并处理
defaultMQPushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody(), StandardCharsets.UTF_8);
logger.info("consume message:{}", content);
}
// 返回接收成功标识
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 消费者启动,与namesrv建立长连接查询路由,与 topic 所处的 broker 建立长连接,开启监听消息的推送
defaultMQPushConsumer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}

try {
new CountDownLatch(1).await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

}

}

class DemoTransactionListener implements TransactionListener {

private static final Logger logger = LoggerFactory.getLogger(DemoTransactionListener.class);

private static boolean isSucc = false;

/**
* 执行本地事务
*
* @param msg Half(prepare) message
* @param arg Custom business parameter
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
logger.info("正在执行本地事务...");

// 模拟本地事务执行状态
// try {
// // ...
// 本地事务执行成功
// connection.commit();
// return LocalTransactionState.COMMIT_MESSAGE;
// } catch (Exception e) {
// // 发生异常本地事务回滚
// connection.rollback();
// return LocalTransactionState.ROLLBACK_MESSAGE;
// }

// 模拟随机结果
isSucc = new Random().nextBoolean();
logger.info("正在执行本地事务结果:{}", isSucc);
return isSucc ?
LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}

/**
* 回查本地事务
*
* @param msg Check message
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
logger.info("正在执行本地事务回查");

// String msgId = msg.getMsgId();
// logger.info("回查检查本地事务key:{}状态", msgId);
// XX xx = xxDao.selectById(msgId);
// return xx != null ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;

return isSucc ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;
}

}

RocketMQ高级

消费者

概念

  • 消费者(Consumer):RocketMQ 用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息并进行业务逻辑处理。
  • 消费者组(ConsumerGroup):RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组的一个逻辑概念。在消费者组中初始化多个消费者实现消费性能的水平扩展以及高可用容灾。一个消费者组可以订阅多个 topic。
  • 订阅关系(Subscription):订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

消费模式

消费方式

集群消费模式

在集群消费模式中,一个消费者组中的每个消费者实例共同消费相同topic的消息。

每个消息只会被消费者组中的一个消费者实例消费,具体消费的策略可以配置,默认是负载均衡。

集群消费模式是默认的消费模式,它的消费进度保存在broker服务端。

使用方式
1
2
// 设置为消费模式为集群消费模式 默认为集群消费模式
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public class ClusterTest extends CommonTest {

@Test
public void produce() {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer("group-consume-cluster");
defaultMQProducer.setNamesrvAddr("110.42.239.193:9876");
try {
defaultMQProducer.start();
logger.info("producer start");
for (int i = 0; i < 10; i++) {
String content = "这是第" + i + "条消息";
Message message = new Message("topic-consume-cluster", "test", String.valueOf(i),content.getBytes());
SendResult sendResult = defaultMQProducer.send(message);
logger.info("sendResult:{}", sendResult);
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
defaultMQProducer.shutdown();
logger.info("producer shutdown");
} catch (Exception e) {
e.printStackTrace();
}
}
}

@Test
public void consume() {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("group-consume-cluster");
defaultMQPushConsumer.setNamesrvAddr("110.42.239.193:9876");
defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
try {
defaultMQPushConsumer.subscribe("topic-consume-cluster", "*");

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
msgs.forEach(messageExt -> logger.info("consume:{}", messageExt.getKeys()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

defaultMQPushConsumer.start();

} catch (MQClientException e) {
throw new RuntimeException(e);
}

try {
new CountDownLatch(1).await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
启动两个消费者实例:
实例1:
consume:0
consume:3
consume:4
consume:7
consume:8
实例2:
consume:1
consume:2
consume:5
consume:6
consume:9
广播消费模式

在广播消费模式中,一个消费者组中的每个消费者实例都会消费相同主题的所有消息。

每个消息会被消费者组中的每个消费者实例都处理一次。

广播消息的消费进度被保存在每个消费者实例本地,而不是broker服务端中。

使用方式
1
2
3
// 设置消费模式 广播消费模式
defaultMQPushConsumer.setMessageModel(MessageModel.BROADCASTING);
每个实例都会消费 10条消息

消费重试

消费重试是值消费者出现异常或者消费某条消息失败时,RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复,如果超过一定的次数还没有成功,则消息不再继续重试,而是直接被发送到死信队列中。

在这个过程被RocketMQ分为了3个阶段,分别是 正常消费,重试消费和死信,对应主题为 正常原始topic、重试topic和死信topic

image-20240129232123639
  • 正常消费:生产者正常消费消息并返回确认给broker,无需做任何处理

  • 重试消息:当网络原因或者其他因素导致消费者消费消息失败时,消息会自动自动保存到重试topic中,格式为”%RETRY%消费者组”,在重试时消费者会自动订阅该重试topic,重新尝试处理消息

    • 重试一共会进行16次,每次会按照固定的时间间隔进行,每次重试时间间隔不一样

      image-20240129233807693
  • 死信:重试次数耗尽,消息仍然没有消费成功,消息会被保存到死信topic中,死信topic的格式”%DLQ%消费者组名”,进入死信topic的消息不会被再次消费,需要人工介入处理问题

消息获取方式

push

push 推送模式:broker主动向消费者推送最新的消息。

push 模式,消费位点由broker管理,使用简单

image-20240203140109107

push 模式使用的消费者类为 DefaultMQPushConsumer。

pull

pull 拉取模式:消费者主动从broker中拉取消息,提交消费位点。

pull 模式,消费者自主管理消费位点,可以灵活把握消费进度和消费速度,适合流计算和耗时等特殊消费场景。

image-20240203140230182

拉取模式是指由消费者定时向broker发起队列查询请求,broker收到请求后将没有消费的消息返回给消费者进行消费,消费后会发送确认应答给broker。

pull 模式使用的消费者类为 DefaultLitePullConsumer。

对比
image-20240129235856435

消费过滤

消费者订阅了某个主题后,RocketMQ 会将该主题中的所有消息投递给消费者。若消费者只需要关注部分消息,可通过设置过滤条件在 RocketMQ 消费者端进行过滤,只获取到需要关注的消息子集,避免接收到大量无效的消息。

应用场景

在实际业务场景中,同一个主题下的消息往往会被多个不同的下游业务方处理,各下游的处理逻辑不同,只关注自身逻辑需要的消息子集。

RocketMQ 主要解决的单个业务域即同一个主题内不同消息子集的过滤问题,一般是基于同一业务下更具体的分类进行过滤匹配。如果是对不同业务域的消息拆分,应该使用不同的主题进行区分处理。

功能原理

消息过滤的含义指的是将在RocketMQ Broker端将符合条件的消息投递给消费者,而不是在生产者端将匹配到的消息过滤掉。具体而言就是,RocketMQ 服务端根据生产者和消费者定义的过滤条件(属性或者标签)进行筛选匹配,将符合条件的消息投递给消费者进行消费。

消息过滤

流程:

  • 生产者:生产者在初始化消息时预先为消息设置一些属性和标签,用于后续消费时指定过滤目标

  • 消费者:消费者在初始化及后续消费流程中通过调用订阅关系注册接口向服务端上报需要订阅指定主题的目标消息,即过滤条件

  • 服务端:消费者获取消息时会触发服务端的动态过滤计算,RocketMQ 服务端根据消费者上报的过滤条件的表达式进行匹配,并将符合条件的消息投递给消费者。

订阅关系一致性

过滤表达式属于订阅关系,RocketMQ 的领域模型规定,同一消费者分组内的多个消费者的订阅关系的过滤表达式必须保持一致,否则可能会导致部分消息消费不到。

过滤分类

RocketMQ 提供了两种过滤方式:Tag标签过滤 和 SQL标签过滤

image-20240130101608721
tag 过滤

消息标签是 RocketMQ 提供的细粒度消息分类属性,可以在主题层级之下做消息类型的细分。消费者通过订阅特定的标签来实现细粒度过滤。

使用方式

生产者在发送消息时,设置消息的Tag标签

1
org.apache.rocketmq.common.message.Message#Message(java.lang.String topic, java.lang.String tags, java.lang.String keys, byte[] body) 

消费者通过 tag 标签过滤表达式需要指定已有的Tag标签来进行匹配订阅

1
org.apache.rocketmq.client.consumer.DefaultMQPushConsumer#subscribe(java.lang.String topic, java.lang.String subExpression)
tag标签设置
  • tag 由生产者发送消息时为消息设置,每条消息只能有一个tag标签
tag 标签过滤表达式

Tag标签过滤为精准字符串匹配,过滤规则 subExpression 设置格式如下:

  • 单Tag匹配 - “tag1”:过滤表达式为目标Tag。表示只有消息标签为指定目标Tag的消息符合匹配条件,会被发送给消费者。
  • 多Tag匹配 - “tag1||tag2||tag3”:多个Tag之间为或的关系,不同Tag间使用两个竖线(||)隔开。例如,Tag1||Tag2||Tag3,表示标签为Tag1或Tag2或Tag3的消息都满足匹配条件,都会被发送给消费者进行消费。
  • 全部匹配 - ““:使用星号()作为全匹配表达式。表示主题下的所有消息都将被发送给消费者进行消费。
SQL 属性过滤

SQL 属性过滤是 RocketMQ 提供的高级消息过滤方式。生产者为消息设置多个属性(每个属性都是一对自定义的键值对Key-Value),消费者订阅时可设置SQL语法的过滤表达式过滤多个属性。

生产者设置消息的自定义属性,通过 message.putUserProperty 方法设置,每个消息可以设置多个属性

消费者设置sql过滤规则:

1
defaultMQPushConsumer.subscribe("topic", MessageSelector.bySql("SQL属性过滤表达式"));

SQL 属性过滤表达式:IS NULL、IS NOT NULL、*>* >= < <=、BETWEEN xxx AND xxx、IN (xxx, xxx)等

消费者端设置过滤规则,注册订阅关系到服务器端,服务端根据过滤规则进行动态过滤计算,将符号条件的消息投递给消费者。

在消息处理时,出现异常情况、空值情况、数值类型不符的都会被过滤掉,不会投递给消费者。

使用建议

topic和tag的合理划分和选择:不同业务类型的消息使用topic进行拆分,相同业务不同属性或者类型使用tag区分

Rebalance 机制

当外部环境发生变化时,例如broker发生掉线,topic扩缩容,消费者扩缩容等,RocketMQ 会自动感知并调整自身的消费,尽量减少消息没有消费

消息存储和清理

image-20240128150629985

存储介质

RocketMQ 采用文件的方式来做持久化,持久化的模式有两种:同步刷盘和异步刷盘。

消息存储

RabbitMQ 的消息存储采用 顺序写、随机读,保证了消息存储的速度。

image-20240128151352481

存储文件结构

RocketMQ 消息默认存储在本地磁盘文件,RocketMQ broker 的存储文件结构如下:

消息存储

  • commitlog:存储消息物理文件
    • 文件名长度为20,由文件保存消息的最小物理偏移量offset的基础上高位补0组成
    • 文件大小一般最大为1g,可以通过 mapedFileSizeCommitLog 进行配置
  • comsumeQueue:存储逻辑队列索引
  • index:存储消息唯一标识-message key的索引信息
  • config:保存了当前broker的topic、订阅关系等配置元数据信息。
    • broker 会定时将配置信息从内存持久化到该目录中,用于宕机后到快速恢复
消息构成

image-20240128164256846

commitlog 目录

commitlog 目录下,可以有多个commitlog文件,但是逻辑上是一个文件,只是为了方便检索和读写将文件物理拆分成多个子文件(默认最大大小为1g),子文件之间通过其保存的第一个物理点位和上一个文件的最后一个物理点位进行连接。

image-20240128165134974

RocketMQ 的CommitLog文件是按照消息的时间顺序和物理offset的顺序来写入的,以保证消息的顺序性和持久性,在执行写入操作时会加锁,以保证多线程访问的安全性。当一个子文件写满后,将会创建一个新的 commitlog,在上一个文件的 offset 的基础上继续写操作。因此所有 commitlog 文件都是连续的,被写入的总是最后一个子文件。

消息检索

image-20240128231201092
  • 利用 MessageID 查询消息

    MessageID 是由生产者向broker发送消息成功,由broker自动生成的消息编号,这个编号由ip + port + offset 组成,所以通过它本身就快速查找和操作特定的消息。

  • 实现 tag 过滤查询消息

    基于 comsumeQueue 的消息索引实现 tag 过滤查询消息。

    comsumeQueue 是一个用于存储消息偏移量信息的数据结构,它可以帮助快速定位消息在 commitlog 文件中的位置。comsumeQueue 的索引条目与消息的偏移量相关联。

    comsumeQueue 文件目录结构:comsumeQueue文件夹->topic目录->队列编号目录(0|1|2|4)->consumeQueue文件,其中consumeQueue文件中的数据结构为 物理位点(offset-8字节) + 消息体size大小(4字节) + tag的hashcode值(8字节)。

    流程:

    1
    2
    3
    4
    5
    1. 对于指定的 tag 进行 hash 运算得到对应的 hashcode
    2. 从consumeQueue文件中查找对应hashcode的数据
    3. 根据这些的数据的物理位置offset从commitlog中提取这些消息
    4. 由于哈希冲突即有不同的tag会有相同的hashcode值的情况,因此要对于从commitlog获取的这些offset的数据进行根据具体的tag进行过滤
    5. 将过滤过的消息提取出来并封装为 message 进行返回
  • 通过 key 查询消息

    key 是生产者赋予消息的业务唯一标识,RocketMQ的底层存储结构的IndexFile提供了基于key或者时间的索引功能。

    IndexFile 是提供的基于 message key 和 时间 的索引文件

消息清理

RocketMQ Broker的消息存储文件CommitLog和其他索引文件都是通过顺序写的方式存储在磁盘上,磁盘的空间是有限的,数据不可能无限制追加。

RocketMQ Broker引入了过期文件清理机制。

CommitLog文件的清理

Broker 任务调度每隔10秒对当前的commitlog文件进行一次扫描,检查出当前满足删除条件。

删除条件

删除72小时内没有产生消费的commitlog文件

  • 默认(可配置)明日凌晨4点
  • 磁盘使用空间超过默认的85%,执行删除

删除commitlog的执行过程

  1. 扫描并根据系统配置的文件保留时间来确定 commitlog 文件的过期状态
  2. 对于过期的commitlog文件,首先会进行预删除检查(是否有进程正在使用这些文件,内存引用等)
  3. 执行删除操作
  4. 处理删除失败的情况(文件正在被使用等),会间隔120秒再次尝试执行重试删除直到文件删除

删除索引文件的执行过程

遍历每一个consumeQueue文件,删除过期的消费队列文件以及更新消费队列的最小偏移量,在遍历删除的过程中,每删除一个consumeQueue文件,都会休眠一段时间,再对下一个comsumeQueue文件进行检查和删除操作。indexfile文件同理。

消息发送

零拷贝

RocketMQ 采用零拷贝技术,提高性能,降低延迟。

零拷贝:在网络请求和数据返回处理的过程中,避免了数据在用户空间和内核空间之间的多次复制,从而减少拷贝操作,提高数据传输速度。

image-20240128152707430

RocketMQ 使用 MappedByteBuffer类( Java NIO提供的类,该类提供了内存映射文件的方式即在用户空间创建内核空间文件的内存映射区域,这里的内存映射是指地址映射,而不是整个文件 )将文件直接映射到用户空间的内存地址空间中,对于文件的操作直接在用户空间对应文件映射的内存区域中进行,避免了数据在内核空间和用户空间的相互拷贝过程。

同步刷盘和异步刷盘

RocketMQ 处理消息是在内存中进行,但会被持久化到磁盘的 commitlog 文件中以便消息存储和故障恢复。

RocketMQ 提供了两种消息存储到 commitlog 时数据写入磁盘的不同机制:同步刷盘(Synchronous Flush)和异步刷盘(Asynchronous Flush)

image-20240128232339508

同步刷盘

生产者发送的消息,broker接收消息,消息先存储在内存中,然后 broker 会立即将这些数据同步写入磁盘,消息被持久化到磁盘后,生产者才会收到写入成功的确认。

特点:提供了更高的数据安全性,服务宕机不会丢失数据

异步刷盘

生产者发送的消息,broker接收消息,消息先存储在内存中, broker 不会立即进行磁盘同步操作,生产者直接收到写入成功的确认,消息会在定时或者适当的时机批量的写入到磁盘中。

特点:提高消息的响应速度和系统的吞吐量,服务宕机可能会丢失数据

配置方式

1
2
flushDiskType=SYNC_FLUSH # 同步刷盘
flushDiskType=ASYNC_FLUSH # 异步刷盘

冥等性保障

冥等性

幂等性(Idempotency)是指无论一个操作执行多少次,都会得到相同的结果。

在 RocketMQ 中,保证幂等性主要是为了确保即使同一消息被重复消费(例如,在消息重新投递或系统故障后),也不会对业务产生重复的影响。

RocketMQ 导致消息重复的原因:

  • 发送时的消息重复:早期生产者发送消息到broker,broker完成了数据的持久化后,在响应应答给生产者时网络异常等,导致broker对于生产者应答失败,此时生产者重新发送消息,消费者此时会收到两条完全一样的消息。在现在的版本中,这个问题已经被解决,broker对于相同的messageID进行去重,不会再存储两条相同的消息,而是直接返回成功的响应给生产者。
  • 投递时的消息重复:
    • 消息已经投递到消费者并完成业务处理,在消费者响应应答给消费者时,网络出现抖动异常或者broker重启或者消费者重启等,broker 没有接收到应答,为了保证消息至少被消费一次,会重试再次投递相同的消息,消费者此时可能处理两次相同的消息。
    • 当RocketMQ 发生重启、因为扩缩容导致的Rebalance也可能会导致重复投递消息。

因此处理消息的冥等性非常的重要

处理方式

处理消息冥等性问题最常见的做法就是在 业务层实现冥等性,为每一条消息设置业务唯一标识(例如:订单号、事务ID等,建议不要使用MessageID作为处理依据,因为MessageId存在重复的情况),然后在消费者的业务逻辑中检查该标识符。在

1
2
3
4
# 生产者
Message message = new Message();
message.setKey("order_1234");
SendResult sendResult = producer.send(message);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 消费者
consumer.subscribe("demo_topic", "*", new MessageListener() {
public Action consume(Message message, ConsumeContext context) {
// ...
// 获取分布式锁 保证线程安全
lock.tryLock();
try {
String key = message.getKey();
// 查询数据库获取指定key订单状态
if(订单状态为"等待支付") {
// 处理扣款
// 返回支付成功响应
}
else if(订单状态为"已支付") {
// 直接返回支付成功响应
}
} finally {
lock.unlock();
}
}
});

SpringBoot接入RocketMQ

参考