RocketMQ 5.x初体验
前沿
RocketMQ 5.0 是一个面向云原生场景设计的实时数据平台,它融合了消息队列、事件驱动、流式处理等能力,能够广泛适用于云、边缘和设备之间协同的数据传输与处理需求。
为什么选择 RocketMQ?
在 RocketMQ 初期由阿里巴巴开发时,就将其应用于很多场景,比如:
- 异步通信
- 搜索系统
- 社交网络中的活动流
- 数据管道
- 交易流程
随着我们的电商业务不断增长,消息集群(消息中间件系统)面临的压力也越来越大。
在观察和分析了 ActiveMQ 的 IO 模块性能 后,我们发现了一个瓶颈:当队列和虚拟主题数量增加时,性能下降严重。
我们尝试了多种方法来解决这个问题,比如:
- 限流(throttling)
- 熔断(circuit breakers)
- 服务降级(service downgrades)
但这些都不能从根本上解决问题。
我们也考虑过使用 Kafka(一个非常流行的消息系统),但 Kafka 无法满足我们对 低延迟和高可靠性 的需求(后文会有解释),所以我们最终决定开发一个新的消息引擎,它可以:
- 既支持传统的发布/订阅(pub/sub)模型
- 又能支持高并发、实时、零错误的交易系统
自诞生以来,Apache RocketMQ 被越来越多的企业开发者和云服务商广泛采用,原因包括:
- 架构简单
- 功能丰富,能满足多种业务需求
- 可扩展性极强
经过十多年的场景打磨,RocketMQ 已经成为“金融级可靠消息”的行业标准,在以下领域被广泛使用:
-
互联网
-
大数据
-
移动互联网
-
物联网(IoT)
-
其他高可靠性场景
✅ 总结一句话:
RocketMQ 是阿里为了满足高并发、高可靠、低延迟场景而自研的消息中间件,比 ActiveMQ 和 Kafka 更适合金融级、交易类系统,因此逐渐成为行业标准。
RocketMQ 5.x 消息通信模型
在 RocketMQ 5.x 中,消息通信模型进行了升级与重构,相比于 4.x 更具现代化架构,支持多协议、事件驱动、云原生、流处理等特性,适配复杂业务场景。
RocketMQ 5.x 引入了“事件驱动消息引擎(Event-Driven Messaging Engine)
”概念,核心通信模型如下图所示(可口头理解):
[Producer] → [Broker] → [Consumer]↑ ↑Metadata Store / Controller
🧱 核心组件
-
Producer(消息生产者): 负责消息发布,支持分布式集群部署,通过负载均衡模块选择相应的 Broker 集群队列进行消息投递
- 将消息发送给
Broker
- 支持同步、异步、单向等发送方式
- 消息发送前从
NameServer
(5.x 改为 Controller) 获取路由信息
- 将消息发送给
-
Broker Server(消息服务器): 负责消息的存储、投递和查询以及服务高可用保证
-
负责接收、存储和投递消息
-
引入了 多协议适配层(Protocol Adapter),支持:
- 原生 RocketMQ 协议
- gRPC
- MQTT(IoT 场景)
- OpenMessaging(云原生中间件标准)
-
提供两类消息服务:
- 标准消息(Standard Messaging)
- 事件流服务(Event Streaming Service)(如延迟消息、事务消息、顺序消息等)
-
-
Consumer(消息消费者): 负责消息消费,支持 push 推和 pull 拉两种模式,同时支持集群方式和广播方式的消费
- 从 Broker 拉取或等待推送消息
- 支持:
- Clustering 模式(负载均衡)
- Broadcasting 模式(广播)
- Pull 模式(主动拉取)
- Push 模式(监听式消费)
-
Controller(协调者)【新增】
- RocketMQ 5.x 新引入组件,取代部分 NameServer 职能
- 用于:
- 维护集群元数据
- 路由发现
- 控制面服务(比如 Topic 管理、Broker 状态管理等)
- 是 Cloud-Native 支持的关键组件
-
Proxy(代理节点)【新增】
- 支持边缘接入(用于公网、边缘设备、混合云等)
- 提供协议转换能力
- 可以屏蔽客户端与 Broker 的直接连接,增强安全和弹性
-
消息流模型
🚚 消息流向:Producer → Proxy(可选) → Broker → Consumer
🚚 通信流程
RocketMQ 的基本通信流程如下:- Broker 注册:Broker 启动后向 NameServer 注册,每隔 30s 定时上报 Topic 路由信息
- Producer 路由获取:Producer 从本地缓存获取路由信息,如果没有则从 NameServer 拉取,默认每 30s 拉取一次
- 消息发送:Producer 根据路由信息选择队列进行消息发送,Broker 接收并落盘存储
- Consumer 消费:Consumer 获取路由信息并完成负载均衡后,选择消息队列拉取并消费消息
🔁 路由发现流程:
Producer / Consumer → Controller → 获取 Topic 路由 → 与 Broker 通信
🌐 特性增强(5.x 相比 4.x)
功能 | RocketMQ 4.x | RocketMQ 5.x(增强) |
---|---|---|
协议支持 | 仅 RocketMQ 协议 | 多协议(gRPC、MQTT、OpenMessaging 等) |
路由发现 | NameServer | Controller + 动态感知 |
云原生支持 | 弱 | 强,支持 Kubernetes、Service Mesh |
Proxy 模式 | 不支持 | 支持,适合边缘接入、安全隔离 |
消息流处理模型 | 静态消费模型 | 事件驱动、流式处理框架(Streaming) |
多租户能力 | 弱 | 强,支持 Namespace、权限控制 |
✳️ 应用场景举例
- 金融交易系统(强一致性 + 高可用)
- IoT(MQTT 协议支持)
- 云原生微服务通信(支持 Sidecar 模式)
- 大数据采集(Stream 消息流)
- 跨国部署(Proxy + 多协议)
初步体验
🧱 一、前置条件
项目 | 要求 |
---|---|
JDK | JDK 11 以上(务必安装并配置环境变量) |
Maven | 可选(推荐用于 Java 项目) |
RocketMQ | 下载最新版 RocketMQ 5.x |
操作系统 | Windows 10 / 11 |
📦 二、下载 RocketMQ 5.x
1.打开浏览器,访问官网:
🔗 https://rocketmq.apache.org/download
2.下载最新版本的 rocketmq-all-5.x.x-bin-release.zip
(二进制包)
3.解压到一个路径,例如:
D:\rocketmq\rocketmq-all-5.1.4-bin-release
⚙️ 三、配置 RocketMQ(Windows)
✅ 1. 启动 NameServer
打开 PowerShell 或 CMD:
cd D:\rocketmq\rocketmq-all-5.1.4-bin-release\bin
start mqnamesrv.cmd
你会看到一个新窗口打开,提示如下:
The Name Server boot success.
✅ 2. 启动 Broker(需添加配置)
1.创建配置文件,例如 broker.conf
放在 conf
目录下:
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
namesrvAddr = 127.0.0.1:9876
grpcServerPort = 8081
autoCreateTopicEnable = true
2.回到 bin
目录,运行:
start mqbroker.cmd -c ..\conf\broker.conf
如果成功,会看到:
The broker [broker-a, 127.0.0.1:10911] boot success.
3、启动Proxy
>set NAMESRV_ADDR=127.0.0.1:9876
>start bin\mqproxy.cmd
如果看到如下就成功了
Sun Jul 13 17:21:48 CST 2025 rocketmq-proxy startup successfully
如果启动报找不到 topic,可以手动创建报No topic route info in name server for the topic: TestTopic
>set NAMESRV_ADDR=127.0.0.1:9876
>bin\mqadmin.cmd updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t TestTopic
🔍 四、验证 NameServer 与 Broker
可以使用telnet
验证:
telnet 127.0.0.1 9876
telnet 127.0.0.1 8081
两个端口都能连接,说明服务正常。
使用SDK发送和接收消息
依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq-client-java-version}</version>
</dependency>
import java.io.IOException;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ProducerExample {// 使用 SLF4J 记录日志private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException, IOException {// 设置 RocketMQ 5.x 的接入地址(通常为 proxy 或 controller 地址)String endpoint = "localhost:8081";// 要发送的主题(Topic)名称String topic = "TestTopic";// 加载 RocketMQ 客户端服务提供者(SPI 机制,自动选择实现)ClientServiceProvider provider = ClientServiceProvider.loadService();// 创建客户端配置构建器并设置 endpointClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 创建消息生产者(Producer)实例,绑定要发送的 Topic 和配置Producer producer = provider.newProducerBuilder().setTopics(topic) // 设置要发送的主题.setClientConfiguration(configuration) // 设置客户端配置.build(); // 构建 Producer 对象// 构造要发送的消息内容Message message = provider.newMessageBuilder().setTopic(topic) // 设置消息的 Topic.setKeys("messageKey") // 设置业务键(可用于去重、查询).setTag("messageTag") // 设置标签(可用于过滤).setBody("messageBody".getBytes()) // 设置消息体内容(字节数组).build(); // 构建消息对象// 发送消息并获取发送结果try {SendReceipt sendReceipt = producer.send(message); // 发送消息,返回消息IDlogger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {// 如果发送失败,打印异常信息logger.error("Failed to send message", e);}// 建议应用退出前关闭 Producer(已注释,如需关闭请取消注释)// producer.close();}
}
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PushConsumerExample {// 日志记录器,用于记录消费日志private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);private PushConsumerExample() {// 私有构造函数,防止被实例化}public static void main(String[] args) throws ClientException, IOException, InterruptedException {// 加载 RocketMQ 客户端服务提供器(SPI 机制)final ClientServiceProvider provider = ClientServiceProvider.loadService();// 设置 RocketMQ Proxy 地址(5.x 版本使用 proxy 作为网关)String endpoints = "localhost:8081";// 构建客户端连接配置ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 设置消息过滤表达式(使用 TAG 过滤,* 表示接收所有标签的消息)String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 消费者所属的消费组名称(需提前在 RocketMQ 控制台配置或由服务端自动创建)String consumerGroup = "YourConsumerGroup";// 要订阅的主题名称String topic = "TestTopic";// 创建 PushConsumer(推模式消费者)PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration) // 设置连接配置.setConsumerGroup(consumerGroup) // 设置消费组.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置订阅主题和过滤规则.setMessageListener(messageView -> {// 消息到达后调用此函数进行处理logger.info("Consume message successfully, messageId={}", messageView.getMessageId());return ConsumeResult.SUCCESS; // 返回消费成功}).build();// 让主线程阻塞防止程序退出(等待消费)Thread.sleep(Long.MAX_VALUE);// 程序关闭前可以关闭消费者,释放资源(这里注释了)// pushConsumer.close();}
}