当前位置: 首页 > news >正文

RocketMQ 5.x初体验

前沿
RocketMQ 5.0 是一个面向云原生场景设计的实时数据平台,它融合了消息队列、事件驱动、流式处理等能力,能够广泛适用于云、边缘和设备之间协同的数据传输与处理需求。
在这里插入图片描述
为什么选择 RocketMQ?
在 RocketMQ 初期由阿里巴巴开发时,就将其应用于很多场景,比如:

  • 异步通信
  • 搜索系统
  • 社交网络中的活动流
  • 数据管道
  • 交易流程

随着我们的电商业务不断增长,消息集群(消息中间件系统)面临的压力也越来越大。
在观察和分析了 ActiveMQ 的 IO 模块性能 后,我们发现了一个瓶颈:当队列和虚拟主题数量增加时,性能下降严重。
我们尝试了多种方法来解决这个问题,比如:

  • 限流(throttling)
  • 熔断(circuit breakers)
  • 服务降级(service downgrades)

但这些都不能从根本上解决问题。
我们也考虑过使用 Kafka(一个非常流行的消息系统),但 Kafka 无法满足我们对 低延迟和高可靠性 的需求(后文会有解释),所以我们最终决定开发一个新的消息引擎,它可以:

  • 既支持传统的发布/订阅(pub/sub)模型
  • 又能支持高并发、实时、零错误的交易系统

自诞生以来,Apache RocketMQ 被越来越多的企业开发者和云服务商广泛采用,原因包括:

  • 架构简单
  • 功能丰富,能满足多种业务需求
  • 可扩展性极强

经过十多年的场景打磨,RocketMQ 已经成为“金融级可靠消息”的行业标准,在以下领域被广泛使用:

  • 互联网

  • 大数据

  • 移动互联网

  • 物联网(IoT)

  • 其他高可靠性场景

总结一句话:
RocketMQ 是阿里为了满足高并发、高可靠、低延迟场景而自研的消息中间件,比 ActiveMQ 和 Kafka 更适合金融级、交易类系统,因此逐渐成为行业标准。

RocketMQ 5.x 消息通信模型

RocketMQ 5.x 中,消息通信模型进行了升级与重构,相比于 4.x 更具现代化架构,支持多协议、事件驱动、云原生、流处理等特性,适配复杂业务场景。
RocketMQ 5.x 引入了“事件驱动消息引擎(Event-Driven Messaging Engine)”概念,核心通信模型如下图所示(可口头理解):

[Producer] → [Broker] → [Consumer]↑   ↑Metadata  Store / Controller

🧱 核心组件

  1. Producer(消息生产者): 负责消息发布,支持分布式集群部署,通过负载均衡模块选择相应的 Broker 集群队列进行消息投递

    • 将消息发送给 Broker
    • 支持同步、异步、单向等发送方式
    • 消息发送前从 NameServer(5.x 改为 Controller) 获取路由信息
  2. Broker Server(消息服务器): 负责消息的存储、投递和查询以及服务高可用保证

    • 负责接收、存储和投递消息

    • 引入了 多协议适配层(Protocol Adapter),支持:

      • 原生 RocketMQ 协议
      • gRPC
      • MQTT(IoT 场景)
      • OpenMessaging(云原生中间件标准)
    • 提供两类消息服务:

      • 标准消息(Standard Messaging)
      • 事件流服务(Event Streaming Service)(如延迟消息、事务消息、顺序消息等)
  3. Consumer(消息消费者): 负责消息消费,支持 push 推和 pull 拉两种模式,同时支持集群方式和广播方式的消费

    • 从 Broker 拉取或等待推送消息
    • 支持:
      • Clustering 模式(负载均衡)
      • Broadcasting 模式(广播)
      • Pull 模式(主动拉取)
      • Push 模式(监听式消费)
  4. Controller(协调者)【新增】

    • RocketMQ 5.x 新引入组件,取代部分 NameServer 职能
    • 用于:
      • 维护集群元数据
      • 路由发现
      • 控制面服务(比如 Topic 管理、Broker 状态管理等)
    • 是 Cloud-Native 支持的关键组件
  5. Proxy(代理节点)【新增】

    • 支持边缘接入(用于公网、边缘设备、混合云等)
    • 提供协议转换能力
    • 可以屏蔽客户端与 Broker 的直接连接,增强安全和弹性
  6. 消息流模型
    🚚 消息流向:

    Producer → Proxy(可选) → Broker → Consumer
    

    🚚 通信流程
    RocketMQ 的基本通信流程如下:

    • Broker 注册:Broker 启动后向 NameServer 注册,每隔 30s 定时上报 Topic 路由信息
    • Producer 路由获取:Producer 从本地缓存获取路由信息,如果没有则从 NameServer 拉取,默认每 30s 拉取一次
    • 消息发送:Producer 根据路由信息选择队列进行消息发送,Broker 接收并落盘存储
    • Consumer 消费:Consumer 获取路由信息并完成负载均衡后,选择消息队列拉取并消费消息

    🔁 路由发现流程:

    Producer / Consumer → Controller → 获取 Topic 路由 → 与 Broker 通信
    

🌐 特性增强(5.x 相比 4.x)

功能RocketMQ 4.xRocketMQ 5.x(增强)
协议支持仅 RocketMQ 协议多协议(gRPC、MQTT、OpenMessaging 等)
路由发现NameServerController + 动态感知
云原生支持强,支持 Kubernetes、Service Mesh
Proxy 模式不支持支持,适合边缘接入、安全隔离
消息流处理模型静态消费模型事件驱动、流式处理框架(Streaming)
多租户能力强,支持 Namespace、权限控制

✳️ 应用场景举例

  • 金融交易系统(强一致性 + 高可用)
  • IoT(MQTT 协议支持)
  • 云原生微服务通信(支持 Sidecar 模式)
  • 大数据采集(Stream 消息流)
  • 跨国部署(Proxy + 多协议)

初步体验

🧱 一、前置条件

项目要求
JDKJDK 11 以上(务必安装并配置环境变量)
Maven可选(推荐用于 Java 项目)
RocketMQ下载最新版 RocketMQ 5.x
操作系统Windows 10 / 11

📦 二、下载 RocketMQ 5.x

1.打开浏览器,访问官网:
🔗 https://rocketmq.apache.org/download
2.下载最新版本的 rocketmq-all-5.x.x-bin-release.zip(二进制包)
3.解压到一个路径,例如:

D:\rocketmq\rocketmq-all-5.1.4-bin-release

⚙️ 三、配置 RocketMQ(Windows)

✅ 1. 启动 NameServer

打开 PowerShell 或 CMD:

cd D:\rocketmq\rocketmq-all-5.1.4-bin-release\bin
start mqnamesrv.cmd

你会看到一个新窗口打开,提示如下:

The Name Server boot success.

✅ 2. 启动 Broker(需添加配置)

1.创建配置文件,例如 broker.conf 放在 conf 目录下:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
listenPort = 10911
namesrvAddr = 127.0.0.1:9876
grpcServerPort = 8081
autoCreateTopicEnable = true

2.回到 bin 目录,运行:

start mqbroker.cmd -c ..\conf\broker.conf

如果成功,会看到:

The broker [broker-a, 127.0.0.1:10911] boot success.

3、启动Proxy

>set NAMESRV_ADDR=127.0.0.1:9876
>start bin\mqproxy.cmd

如果看到如下就成功了

Sun Jul 13 17:21:48 CST 2025 rocketmq-proxy startup successfully

如果启动报找不到 topic,可以手动创建报No topic route info in name server for the topic: TestTopic

>set NAMESRV_ADDR=127.0.0.1:9876
>bin\mqadmin.cmd updateTopic -n 127.0.0.1:9876 -c DefaultCluster -t TestTopic

🔍 四、验证 NameServer 与 Broker

可以使用telnet验证:

telnet 127.0.0.1 9876
telnet 127.0.0.1 8081

两个端口都能连接,说明服务正常。

使用SDK发送和接收消息

依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>${rocketmq-client-java-version}</version>
</dependency> 
import java.io.IOException;import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ProducerExample {// 使用 SLF4J 记录日志private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);public static void main(String[] args) throws ClientException, IOException {// 设置 RocketMQ 5.x 的接入地址(通常为 proxy 或 controller 地址)String endpoint = "localhost:8081";// 要发送的主题(Topic)名称String topic = "TestTopic";// 加载 RocketMQ 客户端服务提供者(SPI 机制,自动选择实现)ClientServiceProvider provider = ClientServiceProvider.loadService();// 创建客户端配置构建器并设置 endpointClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);ClientConfiguration configuration = builder.build();// 创建消息生产者(Producer)实例,绑定要发送的 Topic 和配置Producer producer = provider.newProducerBuilder().setTopics(topic)                     // 设置要发送的主题.setClientConfiguration(configuration) // 设置客户端配置.build();                             // 构建 Producer 对象// 构造要发送的消息内容Message message = provider.newMessageBuilder().setTopic(topic)                         // 设置消息的 Topic.setKeys("messageKey")                   // 设置业务键(可用于去重、查询).setTag("messageTag")                    // 设置标签(可用于过滤).setBody("messageBody".getBytes())       // 设置消息体内容(字节数组).build();                                // 构建消息对象// 发送消息并获取发送结果try {SendReceipt sendReceipt = producer.send(message); // 发送消息,返回消息IDlogger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());} catch (ClientException e) {// 如果发送失败,打印异常信息logger.error("Failed to send message", e);}// 建议应用退出前关闭 Producer(已注释,如需关闭请取消注释)// producer.close();}
}
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class PushConsumerExample {// 日志记录器,用于记录消费日志private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);private PushConsumerExample() {// 私有构造函数,防止被实例化}public static void main(String[] args) throws ClientException, IOException, InterruptedException {// 加载 RocketMQ 客户端服务提供器(SPI 机制)final ClientServiceProvider provider = ClientServiceProvider.loadService();// 设置 RocketMQ Proxy 地址(5.x 版本使用 proxy 作为网关)String endpoints = "localhost:8081";// 构建客户端连接配置ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder().setEndpoints(endpoints).build();// 设置消息过滤表达式(使用 TAG 过滤,* 表示接收所有标签的消息)String tag = "*";FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);// 消费者所属的消费组名称(需提前在 RocketMQ 控制台配置或由服务端自动创建)String consumerGroup = "YourConsumerGroup";// 要订阅的主题名称String topic = "TestTopic";// 创建 PushConsumer(推模式消费者)PushConsumer pushConsumer = provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration) // 设置连接配置.setConsumerGroup(consumerGroup) // 设置消费组.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) // 设置订阅主题和过滤规则.setMessageListener(messageView -> {// 消息到达后调用此函数进行处理logger.info("Consume message successfully, messageId={}", messageView.getMessageId());return ConsumeResult.SUCCESS; // 返回消费成功}).build();// 让主线程阻塞防止程序退出(等待消费)Thread.sleep(Long.MAX_VALUE);// 程序关闭前可以关闭消费者,释放资源(这里注释了)// pushConsumer.close();}
}
http://www.lryc.cn/news/587733.html

相关文章:

  • Linux 音频的基石: ALSA
  • React 第六十九节 Router中renderMatches的使用详解及注意事项
  • Android 性能优化:启动优化全解析
  • 019_工具集成与外部API调用
  • LabVIEW浏览器ActiveX事件交互
  • SpringMVC1
  • 数字孪生技术引领UI前端设计新潮流:智能交互界面的个性化定制
  • 【Linux系统】进程切换 | 进程调度——O(1)调度队列
  • RxSwift的介绍与使用
  • Android展示加载PDF
  • SAP ERP与微软ERP dynamics对比,两款云ERP产品有什么区别?
  • ETF期权的涨跌策略是什么?
  • vue3 JavaScript 数据累加 reduce
  • Jetpack Compose 重组陷阱:一个“乌龙”带来的启示
  • 数字孪生技术驱动UI前端革新:实现产品设计的虚拟仿真与实时反馈
  • SpringMVC3
  • 计算机毕业设计Java轩辕购物商城管理系统 基于 SpringBoot 的轩辕电商商城管理系统 Java 轩辕购物平台管理系统设计与实现
  • CICS Application Programming Fundamentals 第4章
  • 74、【OS】【Nuttx】【启动】深入理解 caller-saved 和 callee-saved(下)
  • 游戏框架笔记
  • 网络准入控制系统的作用解析,2025年保障企业入网安全第一道防线
  • 在 Azure Linux 上安装 RustFS
  • 使用 pytest 测试框架构建自动化测试套件之一
  • LightGBM 在处理**不均衡二分类任务**时,能在 **AUC 和 Accuracy** 两个指标上表现良好
  • SQL性能调优经验总结
  • 【Linux】基本指令详解(一) 树状文件结构、家目录、绝对/相对路径、linux文件类型
  • 1.2.1 面向对象详解——AI教你学Django
  • 【世纪龙科技】迈腾B8汽车整车检测与诊断仿真实训系统
  • 波兰无人机具身导航基准测试与最新进展!FlySearch:探索视觉语言模型的探索能力
  • 用Spring Boot逻辑删除(isDelete)优雅守护你的数据资产:告别物理删除的烦恼