RocketMQ

RocketMQ

请回忆消息队列和生产者消费者模型。

2026年5月8日14:26:54,一直以来,我可能对消息队列RocketMQ类工具的学习认知产生了偏差,这不是开发的一部分,他是一个工具,和Docker一样的工具,也是一个基础设施,这和开发逻辑没有多大关系。所以,没必要在win+idea上反复纠缠,最好是在Docker中运行,因为最后这些代码是要放到Linux里面跑的,嗯,这是一次很重要的反思。但是不必担心,人类在曲折中前进。

基本概念

  • Message(消息):Message 是 RocketMQ 传输的基本单元,包含了具体的业务数据以及一些元数据(如消息 ID、主题、标签、发送时间等)。消息可以是文本、二进制数据或其他任何序列化后的对象形式。
  • Topic(主题):Topic 是一类消息的逻辑分类名,是 Apache RocketMQ 中消息传输和存储的顶层容器。类似于邮件系统中的邮箱地址或发布/订阅模式中的“频道”。生产者向特定的 Topic 发送消息,消费者则根据 Topic 订阅并接收消息。一个 Topic 可以被多个生产者写入,同时也能被多个消费者订阅。
  • Queue(队列):每个 Topic 被划分为多个 Queue(队列),或称 MessageQueue,这些队列用于存储消息。生产者发送到 Topic 的消息会被分配到其下的各个 Queue 中;消费者则是从这些 Queue 中拉取消息进行消费。
  • Subscription(订阅):Subscription 表示消费者对某个 Topic 消息的兴趣表达。订阅关系由消费者分组动态注册到服务端系统,并在消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度的维护。
  • Producer(生产者):生产者是消息产生的源头,将消息发送到服务端指定 Topic。
  • Consumer(消费者):消费者负责从服务端中拉取消息并进行处理。
  • ProducerGroup(生产者组):ProducerGroup 是一组生产者的逻辑分组,共享同样的 Topic 发送配置,实现发送端的负载均衡和容错。如果组内某个生产者失败,其他生产者可以继续工作,保证消息发送的连续性。
  • ConsumerGroup(消费者组):消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。分组中的消费者共同订阅同一个 Topic 并以某种策略(如广播、集群消费)消费消息。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

架构分析

Apache RocketMQ 服务端基础组件包括 NameServer,Broker,Proxy,推荐使用存储计算分离模式部署。

组件 核心职责 类比理解
NameServer 服务发现与路由中心。轻量级,管理 Broker 的元数据(IP、存活状态、Topic 队列信息)。客户端启动时先问它:“我想发消息,该找哪个 Broker?” 导航地图。不运货,但告诉你货车(Broker)在哪里。
Broker 消息存储与转发核心。真正负责消息的写入、读取、持久化、HA 主从同步。处理客户端具体的收发请求。 快递中转仓。实际保管包裹(消息),处理存和取。
Proxy 接入网关 / 协议适配层RocketMQ 5.0 新增)。接收客户端请求后,转发给 Broker,可做协议转换(GRPC/HTTP)、负载均衡、无状态易扩展。 快递柜或前台。客户不直接找仓库,而是通过统一入口交接。部分场景可代替直连 NameServer。

O1CN01aD4VOe1TB8xPi5xcg_!!6000000002343-0-tps-2084-766

安装

将使用Docker模拟服务器环境安装,我不会再去找什么Win特供版本了。

创建一个docker-compose.yml文件,其中写入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
version: '3.8'

services:
# NameServer 服务
namesrv:
image: apache/rocketmq:5.2.0
container_name: rocketmq-namesrv
restart: always
ports:
- "9876:9876"
environment:
- JAVA_OPT_EXT=-server -Xms256m -Xmx256m -Xmn128m
volumes:
- ./data/namesrv/logs:/home/rocketmq/logs
- ./data/namesrv/store:/home/rocketmq/store
command: sh mqnamesrv

# Broker 服务
broker:
image: apache/rocketmq:5.2.0
container_name: rocketmq-broker
restart: always
hostname: broker # 确保容器主机名就是 broker
depends_on:
- namesrv
ports:
- "10909:10909" # SSL 端口(不强制启用,可保留)
- "10911:10911" # Broker 主端口
- "10912:10912" # HA 端口
environment:
- JAVA_OPT_EXT=-server -Xms256m -Xmx256m -Xmn128m
volumes:
- ./data/broker/logs:/home/rocketmq/logs
- ./data/broker/store:/home/rocketmq/store
- ./broker.conf:/opt/rocketmq/conf/broker.conf
command: sh mqbroker -c /opt/rocketmq/conf/broker.conf

# RocketMQ 控制台
console:
image: apacherocketmq/rocketmq-dashboard:latest
container_name: rocketmq-console
restart: always
depends_on:
- namesrv
ports:
- "8080:8082"
environment:
- JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876

再创建一个文件名为broker.conf配置broker的链接:

1
2
3
4
5
6
7
8
9
10
11
12
13
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

# NameServer 地址(容器内部使用服务名)
namesrvAddr = namesrv:9876

# 自动向 NameServer 注册的 IP,必须设为服务名或可被其他容器解析的地址
brokerIP1 = broker

随后用docker-compose up -d自动拉取镜像和创建容器。

这会创建一个包含了Broker,NameServer和ProxyMQ集群,你能够看到三个容器,这三个容器已经在运行中了,您不需要做任何操作。

image-20260506112757979

9876 端口(NameServer) 扮演着“中枢神经”的角色,负责存储和分发路由信息。

Broker 需要上报路由信息

Console 需要获取集群状态所以会被这两个访问。

10911 端口:主通信与业务端口,负责在默认模式下承接Broker的大部分工作,包括生产者生产和消费者拉取。一个 TCP 端口完全可以展开双向工作,因为它背后建立的是一条“全双工”的通信通道

console是控制台,能够通过访问localhost:8080来访问控制台:

image-20260506112822627

测试

进入web端,在主题处添加新的:

image-20260506113128503

随后发送一条消息:

image-20260506113727513

这就是发送成功了,而且也能够在消息详情中查询到:image-20260506114256318

那么测试完毕,集群配置正确。

Java接入RocketMQ

RocketMQ,是一个独立设施,但是我们部署他是为了在开发中使用它以优化性能。

Maven中引入RocketMQ依赖:

1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.2.0</version>
</dependency>
</dependencies>

随后用JAVA编写生产者:

消息生产者分别通过三种方式发送消息:

  • 同步发送:等待消息返回后再继续进行下面的操作。
  • 异步发送:不等待消息返回直接进入后续流程。broker将结果返回后调用callback函数,并使用CountDownLatch计数。
  • 单向发送:只负责发送,不管消息是否发送成功。

这里只编写同步发送,引入依赖后新建JAVA类Procduer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 同步发送
*/
public class Producer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("test_topic", //上面创建的测试主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"Simple-Sync".getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}

注意,这里的端口是Docker对外映射的端口,IP就写本地127.0.0.1就可以了,因为容器和客户端都在一台PC上。

但是你仍有可能连接不上,因为Docker设置的127.0.0.1回环地址,在宿主机上指向宿主机自己,在容器内部指向容器自己。在本例中,MQ运作流程是:

连接 NameServer → 宿主机 127.0.0.1:9876 通过 Docker 端口映射,到达容器内的 NameServer。

获取 Topic 路由 → NameServer 返回 Broker 的地址,这个地址是 Broker 在启动时注册 的。

连接 Broker → 客户端拿到的 Broker 地址去连接,发送消息。

但是请回忆,Broker.conf是在docker容器里面写就的,也就是说客户端拿到Broker这个字符串根本不知道它指向哪里所以无法连接。

如果使用局域网IP,那么因为局域网IP是容器和宿主机共用的,那么数据就能够在网卡中回环,宿主机就能够连接上容器,且同一局域网的机器也能够连接上Docker容器。

所以你如果遇到超时请在host文件里加一行127.0.0.1 broker,来让客户端认识他,知道Broker就是127.0.0.1,需要管理员权限。

或者修改Broker.conf,将IP地址定位局域网IP地址

随后是消费者:

消费者消费消息分两种:

  • 拉模式:消费者主动去Broker上拉取消息。
  • 推模式:消费者等待Broker把消息推送过来。

随后新建Java类Consumer,这里只介绍推模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 推模式
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("SimplePushConsumer");
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
pushConsumer.subscribe("test_topic","*");
pushConsumer.setMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach( n->{
System.out.printf("收到消息: %s%n" , n);
});
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
pushConsumer.start();
System.out.printf("Consumer Started.%n");
}
}

执行后能够返回:

image-20260507110648368

image-20260507110713611

在控制台也能查到信息:

image-20260507110826579

无需理会红色警告。
那么本地的JAVA开发测试就完成了,下面介绍一下各种消息类型。

顺序消息

顺序消息(Orderly Message) 指的是消息的生产顺序消费顺序保持一致。
它分为两种:

  • 全局有序:所有消息严格按照发送顺序被消费,只能使用 一个队列(Topic 下只有一个 Queue),性能较低。
  • 分区有序(局部有序):同一业务标识(如订单 ID)的消息在同一个队列内严格有序,但不同并行队列之间是并行无序的,这是最常用的方式。

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package Order;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* 顺序消息生产者
*/
public class OrderProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
Message message = new Message("OrderTopic","TagA",
("order_" + j + "_step_" + i).getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
Integer id = (Integer) o;
int index = id % list.size();
return list.get(index);
}
}, j);
System.out.printf("%s%n", sendResult);
}
}
producer.shutdown();
}
}

重点解释一下队列选择中的代码:双层循环,j是订单号,又名步骤标识,i是订单内的步骤号。

匿名函数的第三个参数 j 会作为 Object o 传入 select 方法,即业务标识

id % list.size() 是实现选择装入的核心:

  • 假设 OrderTopic 有 8 个队列(编号 0~7),list.size() 为 8。
  • order_0j=00 % 8 = 0 → 始终选择队列 0。
  • order_1j=11 % 8 = 1 → 始终选择队列 1。
  • order_22 % 8 = 2 → 队列 2,依此类推。

结果:同一订单的所有消息(step_0step_9)都被路由到同一个固定的队列中,且单个订单的发送的顺序就是 step_0step_9同一个队列(Queue)内的消息是严格 FIFO(先入先出)的,无论是存储还是拉取,都保持写入顺序。

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package Order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
* 顺序消息消费者
*/
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic","*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
list.forEach(n->{
System.out.println("QueueId:"+n.getQueueId() + "收到消息内容 "+new String(n.getBody()));
});
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}

消费者进程注册一个顺序消息监听器 MessageListenerOrderly。这是实现顺序消费的关键。

与并发监听器 MessageListenerConcurrently 不同,顺序监听器保证同一队列的消息在同一时刻只被一个线程消费,且消费顺序与队列内存储顺序一致。

所以:

  • 生产者通过 MessageQueueSelector 将同业务键的消息固定到一个队列,队列内 FIFO 存储。
  • 消费者使用 MessageListenerOrderly对每个队列只会分配一个线程去顺序拉取和处理消息,严格保障消费顺序。

广播消息

广播消息(Broadcasting) 是 RocketMQ 的一种消费模式,与默认的集群模式(Clustering) 相对。

模式 消费行为 应用场景
集群模式(默认) 同组内所有消费者均摊队列,每条消息只被一个消费者处理 分布式任务处理,如订单履约
广播模式 同组内每个消费者都收到全部消息,相当于每个人都独立消费全量数据 推送配置更新、缓存刷新、通知所有人

广播模式消费者JAVA写法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package broadcast; // 声明包名,通常用于组织代码

// 导入 RocketMQ 客户端所需的类
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; // 推模式消费者
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; // 并发消费上下文
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; // 并发消费状态
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; // 并发消息监听器
import org.apache.rocketmq.client.exception.MQClientException; // 客户端异常
import org.apache.rocketmq.common.message.MessageExt; // 消息扩展对象(包含消息详情)
import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; // 消费模式枚举(集群/广播)

import java.util.List; // Java 集合类,用于存储消息列表

/**
* 广播消息消费模式
*/
public class BroadcastConsumer {
public static void main(String[] args) throws MQClientException {
// 1. 创建推模式消费者,指定消费者组名为 "BroadCastConsumer"
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("BroadCastConsumer");

// 2. 设置 NameServer 地址,用于发现 Broker 和路由信息
consumer.setNamesrvAddr("127.0.0.1:9876");

// 3. 订阅主题 "simple","*" 表示接收该主题下的所有标签(Tag)消息
consumer.subscribe("simple", "*");

// 4. 设置消费模式为广播(默认是集群模式)
consumer.setMessageModel(MessageModel.BROADCASTING); // 广播模式
// consumer.setMessageModel(MessageModel.CLUSTERING); // 如果取消注释,则切换为集群模式

// 5. 注册并发消息监听器(广播模式下不能使用顺序监听器)
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 遍历本次拉取到的消息列表
list.forEach(n -> {
// 打印消息所在的队列 ID 和消息体内容
System.out.println("QueueId:" + n.getQueueId() + " 收到消息内容 " + new String(n.getBody()));
});
// 返回消费成功状态,Broker 会更新消费进度
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

// 6. 启动消费者,开始监听并消费消息
consumer.start();

// 控制台打印启动成功标识
System.out.printf("Broadcast Consumer Started.%n");
}
}

该广播模式消费者组的每个实例都会将主题的每一条消息接收并处理,不会因为组里有示例已接收就放弃。

而对于默认的集群模式,比如订单处理服务部署了 5 个实例,加入同一个消费者组 OrderConsumerGroup。 10 个订单队列均匀分配给 5 个实例,每条订单消息只被一个实例处理,不会出现多个实例重复发货的问题。实际中会运行五个甚至更多一模一样的程序来提升并发度从而提升运行效率。这是分布式系统里典型的方式(水平扩展方式),目的就是为了提升并发度和整体吞吐量。

延迟消息

延迟消息指的是生产者发送消息(立即传到Broker)后,Broker不会立即投递给消费者,而是按照预设的延迟时间,到了指定时间后才被消费者可见并消费。这是RocketMQ特有的一个功能。使用方法有两种:

  • **message.setDelayTimeLevel(3)**:预定日常定时发送。1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;可以在dashboard中broker配置查看。
  • **msg.setDelayTimeMs(10L)**:指定时间定时发送。默认支持最大延迟时间为3天,可以根据broker配置:timerMaxDelaySec修改。

如预定日期生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.time.LocalTime;

/**
* 预定日程定时发送
*/
public class ScheduleProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("ScheduleProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 2; i++) {
Message msg = new Message("Schedule", //主题
"TagA", //设置消息Tag,用于消费端根据指定Tag过滤消息。
"ScheduleProducer".getBytes(StandardCharsets.UTF_8) //消息体。
);
//1到18分别对应messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
msg.setDelayTimeLevel(3);
producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", LocalTime.now());
}
producer.shutdown();
}
}

批量消息

批量消息(Batch Message) 是指生产者将多条消息合并为一个请求发送给 Broker,以减少网络往返次数,提升吞吐量。

批量消息的使用限制:

  • 消息大小不能超过4M,虽然源码注释不能超1M,但是实际使用不超过4M即可。平衡整体的性能,建议保持1M左右。
  • 相同的Topic主题
  • 相同的waitStoreMsgOK
    • waitStoreMsgOK 是 Apache RocketMQ 中一个核心的可靠性保障参数。它是一条指令,告诉 Broker(消息服务器):在向生产者(Producer)返回“发送成功”的响应之前,是否必须等待消息被写入磁盘。
  • 不能是延迟消息、事务消息等特殊信息。

插播一条waitStoreMsgOK:

对比维度 waitStoreMsgOK = true (默认) waitStoreMsgOK = false
等待行为 生产者线程会阻塞,等待写入完成 生产者发送后立即返回,不等待写入结果
性能影响 较低。每次发送都涉及磁盘I/O操作的等待时间 非常高。发送操作不被磁盘I/O阻塞
数据可靠性 极高。一旦确认,消息已持久化到磁盘,不易丢失 较低。若Broker在内存数据刷盘前宕机,消息会丢失
组合使用 Broker 的 flushDiskType 必须设置为 SYNC_FLUSH Broker 刷盘模式可以是 SYNC_FLUSHASYNC_FLUSH
适用场景 金融交易、支付回调等绝不能丢失消息的场景 海量日志采集、非关键性监控数据上报等,允许少量丢失以换取高吞吐的场景

如生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;

/**
* 批量发送消息
*/
public class BatchProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
ArrayList<Message> messages = new ArrayList<>();
messages.add(new Message("simple","TagA", "BatchProducer0".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message("simple","TagA", "BatchProducer1".getBytes(StandardCharsets.UTF_8)));
messages.add(new Message("simple","TagA", "BatchProducer2".getBytes(StandardCharsets.UTF_8)));
SendResult send = producer.send(messages);
System.out.printf(".发送消息成功:%s%n", send);
producer.shutdown();
}
}

将信息都装入一个动态数组ArrayList中,在用send发出去就行。这貌似能够用直觉理解,至于这背后的源码怎么写,我们大多数人应该不在乎。

消息过滤

Tag过滤

所有的消息都可以带Tag,生产者用Tag标记消息,消费者用Tag过滤消息:

如生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 过滤消息-tag过滤生产者
*/
public class TagFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length]).getBytes(StandardCharsets.UTF_8) //消息体。
);
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}

Tag是RocketMQ中特有的一个消息属性。

RocketMQ的最佳实践中就建议使用RocketMQ时,一个应用可以就用一个Topic,而应用中的不同业务就用Tag来区分。

SQL过滤

有时候现实情况很复杂,仅仅用 一个Tag来定义一个消息是不够的。所以我们用SQL来做进一步区分。

如生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package filter;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.nio.charset.StandardCharsets;

/**
* 过滤消息-SQL过滤生产者
*/
public class SqlFilterProducer {
public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("SyncProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for (int i = 0; i < 15; i++) {
Message msg = new Message("FilterTopic", //主题
tags[i % tags.length], //设置消息Tag,用于消费端根据指定Tag过滤消息。
("TagFilterProducer_"+tags[i % tags.length] + "_i_" + i).getBytes(StandardCharsets.UTF_8) //消息体。
);
msg.putUserProperty("baiLi", String.valueOf(i));
SendResult send = producer.send(msg);
System.out.printf(i + ".发送消息成功:%s%n", send);
}
producer.shutdown();
}
}

我们重点看这句: msg.putUserProperty("baiLi", String.valueOf(i));

putUserProperty 给消息附加了一个名为 baiLi 的属性,值为当前循环序号 i(字符串形式)。

后续消费者可以使用 SQL92 表达式进行过滤,例如:

consumer.subscribe("FilterTopic", MessageSelector.bySql("baiLi BETWEEN 0 AND 5"));

这样消费者只会收到 baiLi 属性在 05 之间的消息,即序号 05 的消息(共 6 条)。

SQL肯定是比Tag更灵活,但也更复杂。

事务消息

事务( Transaction)(消息是 RocketMQ 用来保证本地事务执行和消息发送原子性的一种高级消息类型,它基于 两阶段提交(2PC)事务回查机制,确保两者在分布式环境下最终一致,要么一起成功,要么一起失败。

image

第一阶段:发送“半消息”
生产者向 Broker 发送一条消息。这条消息特殊之处在于,Broker 成功保存后,会向生产者确认,但暂时不对消费者投递。此时,这条消息处于“半消息”状态。

第二阶段:执行本地事务
生产者收到上一步的成功确认后,开始执行自身的本地业务逻辑,例如“将订单状态更新为‘已支付’”。

第三阶段:提交或回滚
生产者根据本地事务的执行结果,向 Broker 发送二次确认:

  • Commit:本地事务成功后发送。Broker 收到后将“半消息”标记为“可投递”,消费者就能消费了。
  • Rollback:本地事务失败后发送。Broker 会将之前的“半消息”丢弃。

保证原子性的最后一个方法是事务回查

如果因网络问题或生产者宕机,Broker 迟迟收不到二次确认,它会启动事务回查

  1. 主动询问最终状态:经过一段固定时间(例如,默认30秒),Broker 会向同一生产者组中的任意一个实例,发起回查请求,询问该消息关联的本地事务的最终状态。
  2. 实现checkLocalTransaction方法:为了保护回查机制能正常工作,开发者必须实现TransactionListener接口中的checkLocalTransaction方法,Broker会主动调用这个方法来确定本地事务的最终状态。
  3. 最终处理:生产者确认后,Broker 会再次根据 Commit 或 Rollback 的结果,对“半消息”进行最终处理。

如事务消息生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

/**
* 事务消息生产者
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionMQProducer producer = new TransactionMQProducer("TransProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
//使用executorService异步提交事务状态,从而提高系统的性能和可靠性
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);

//本地事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);

producer.start();
String[] tags = new String[] {"TagA","TagB","TagC","TagD","TagE"};
for (int i = 0; i < 10; i++) {
Message message = new Message("TransactionTopic",
tags[ i % tags.length],
("Transaction_"+ tags[ i % tags.length]).getBytes(StandardCharsets.UTF_8));
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.printf("%s%n", transactionSendResult);

Thread.sleep(10); //延迟10毫秒
}

Thread.sleep(100000);//等待broker端回调
producer.shutdown();
}
}

TransactionListenerImpl监听器需要开发者自己写,因为RocketMQ无法预知你的回查逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

/**
* 本地事务监听器
*/
public class TransactionListenerImpl implements TransactionListener {

@Override
/**
* 在提交完事务消息后执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String tags = message.getTags();
//TagA的消息会立即被消费者消费到
if(StringUtils.contains(tags,"TagA")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagB的消息会被丢弃
}else if(StringUtils.contains(tags,"TagB")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//其他消息会等待Broker进行事务状态回查。
}else{
return LocalTransactionState.UNKNOW;
}
}

@Override
/**
* 在对UNKNOWN状态的消息进行状态回查时执行。
* 返回COMMIT_MESSAGE状态的消息会立即被消费者消费到。
* 返回ROLLBACK_MESSAGE状态的消息会被丢弃。
* 返回UNKNOWN状态的消息会由Broker过一段时间再来回查事务的状态。
*/
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String tags = messageExt.getTags();
//TagC的消息过一段时间会被消费者消费到
if(StringUtils.contains(tags,"TagC")){
return LocalTransactionState.COMMIT_MESSAGE;
//TagD的消息也会在状态回查时被丢弃掉
}else if(StringUtils.contains(tags,"TagD")){
return LocalTransactionState.ROLLBACK_MESSAGE;
//剩下TagE的消息会在多次状态回查后最终丢弃
}else{
return LocalTransactionState.UNKNOW;
}
}
}

事务消息的使用限制

  • 事务消息不支持延迟消息和批量消息。
  • 为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的transactionCheckMax参数来修改此限制。如果已经检查某条消息超过N次的话(N = transactionCheckMax)则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。可以通过重写AbstractTransactionCheckListener类来修改这个行为。
  • 事务性消息可能不止一次被检查或消费。

RocketMQ如何保证数据不丢失

这是一个补充说明:

image

image

“刷盘 (Flush Disk)” 是消息队列(Message Queue)为保证消息可靠性的核心机制。简单来说,它指的是将暂存在内存(操作系统Page Cache)中的消息数据,强制写入到物理磁盘上的过程。

我们将消息流程分为三大部分,每一部分都有可能会丢失数据。

  • 生产阶段:Producer通过网络将消息发送给Broker,这个发送可能会发生丢失。比如网络延迟不可达等。
  • 存储阶段:Broker肯定是先把消息放到内存的,然后根据刷盘策略持久化到硬盘中。刚收到Producer的消息,放入内存,但是异常宕机了,导致消息丢失。
  • 消费阶段:消费失败。比如先提交ack再消费,处理过程中出现异常,该消息就出现了丢失。

解决方式:

  • 生产阶段:使用同步发送失败重试机制;异步发送重写回调方法检查发送结果;Ack确认机制。
  • 存储阶段:同步刷盘机制;集群模式采用同步复制。
  • 消费阶段:正常消费处理完成才提交ACK;如果处理异常返回重试标识。