当前位置: 首页 > news >正文

RocketMQ核心编程模型

RocketMQ核心编程模型与SpringBoot整合深度解析

笔记整理自RocketMQ官方文档与实战经验 | 图灵楼兰出品
配套视频课程学习效果更佳

一、RocketMQ架构核心回顾

RocketMQ采用经典发布-订阅模型,核心组件包括:

  • NameServer:轻量级服务发现中心(无状态)
  • Broker:消息存储与转发节点(主从架构)
  • Producer:消息生产者
  • Consumer:消息消费者

二、深入消息模型

1. 客户端基础流程

生产者固定步骤
// 1. 创建生产者(指定组名)
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 2. 配置NameServer地址
producer.setNamesrvAddr("192.168.65.112:9876"); 
// 3. 启动服务
producer.start();
// 4. 构建消息(Topic/Tag/Body)
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
// 5. 发送消息
SendResult result = producer.send(msg);
// 6. 关闭生产者
producer.shutdown();
消费者固定步骤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("192.168.65.112:9876");
consumer.subscribe("TopicTest", "*"); // 订阅Topic
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {msgs.forEach(msg -> System.out.println(new String(msg.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费状态
});
consumer.start();

2. 消息确认机制

三种发送方式对比
发送方式特点适用场景
单向发送不关心结果,吞吐量最高日志收集等低可靠性场景
同步发送阻塞等待Broker响应金融交易等高可靠性场景
异步发送回调处理结果,平衡性能与可靠性电商下单等并发场景
消费端重试策略
  • 返回RECONSUME_LATER触发重试
  • 最大重试次数默认16次(可配置)
  • 重试消息进入专属重试Topic:%RETRY%+ConsumerGroup

3. 高级消息类型

顺序消息(局部有序)
// 生产者:相同订单号的消息发往同一队列
Message msg = new Message("OrderTopic", "PAY", orderId.getBytes(), body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size()); // 自定义队列选择}
}, orderId);// 消费者:实现MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {// 处理顺序消息...
});
事务消息(两阶段提交)
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW; // 执行本地事务}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE; // 事务回查}
});
延迟消息
// 指定延迟级别(1-18对应预设时间)
message.setDelayTimeLevel(3); // 10秒后投递// 指定精确时间点(5.0+版本)
message.setDeliverTimeMs(System.currentTimeMillis() + 30_000); // 30秒后

4. ACL权限控制

启用步骤:

  1. Broker端开启aclEnable=true
  2. 配置plain_acl.yml
accounts:
- accessKey: RocketMQsecretKey: 12345678topicPerms:- topicA=DENY- topicB=PUB|SUB
  1. 客户端添加认证Hook:
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678"));
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);

三、SpringBoot整合实战

1. 快速集成

依赖配置:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>

配置文件application.yml

rocketmq:name-server: 192.168.65.112:9876producer:group: springboot-group

2. 消息生产与消费

// 生产者模板
@Autowired private RocketMQTemplate rocketMQTemplate;public void sendMessage() {rocketMQTemplate.convertAndSend("TestTopic", "Hello SpringBoot");
}// 消费者监听
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY
)
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received: " + message);}
}

3. 事务消息整合

@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.UNKNOWN;}
}

四、客户端最佳实践

1. 消息三要素规范

属性作用
MessageIdBroker生成的消息唯一标识(不建议作业务主键)
Key业务唯一键(如订单ID),用于消息追踪
Tag消息标签,用于高效过滤(性能远高于SQL过滤

2. 消费者幂等设计

重复消息场景:

  • 网络闪断导致生产者重试
  • 消费端ACK失败触发重投
  • Rebalance过程消息重复

解决方案示例:

consumer.registerMessageListener((msgs, context) -> {MessageExt msg = msgs.get(0);String orderId = msg.getKeys(); // 获取业务主键// 分布式锁或数据库唯一索引校验if (orderService.isProcessed(orderId)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理业务逻辑...
});

3. 死信队列处理

  • 命名规则:%DLQ%+ConsumerGroup
  • 运维注意:默认权限禁读,需手动改为可读
  • 处理方式:
    1. 查询死信原因:sh mqadmin queryMsgById
    2. 修复后重新投递到正常Topic
    3. 设置单独消费者处理死信

4. 重试策略优化

// 调整最大重试次数(超过16次间隔固定2小时)
consumer.setMaxReconsumeTimes(10); // 重试间隔配置(需Broker端配合)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h

官方说明:RocketMQ 仅保证 At Least Once,业务需自行实现幂等
源码位置:org.apache.rocketmq.common.consumer.ConsumeFromWhere


总结
RocketMQ的客户端设计充分考虑了金融级场景需求,通过多种消息模型组合可满足复杂业务场景。在实际使用中需特别注意:

  1. 生产环境务必启用ACL
  2. 顺序消息避免单队列堆积
  3. 死信队列监控不可或缺
  4. 消费者幂等是系统稳定性的生命线
http://www.lryc.cn/news/593973.html

相关文章:

  • 咨询进阶——解读业务流程优化与重组【附全文阅读】
  • 5.2.4 指令执行过程
  • 【原创】微信小程序添加TDesign组件
  • ChatIM项目语音识别安装与使用
  • ARFoundation系列讲解 - 101 VisionPro 真机调试
  • USRP B210生成信号最大带宽测试之BPSK
  • 人脸识别:AI 如何精准 “认人”?
  • FreeSwitch编译部署
  • 【星海出品】python安装调试篇
  • 【数据集】NOAA 全球监测实验室(GML)海洋边界层(MBL)参考简介
  • Docker实践:使用Docker部署WhoDB开源轻量级数据库管理工具
  • 传输层协议 TCP
  • Java什么是原子性
  • Java SpringBoot 对接FreeSwitch
  • AtCoder Beginner Contest 415
  • Web-SQL注入数据库类型用户权限架构分层符号干扰利用过程发现思路
  • 向日葵远程命令执行漏洞
  • 《深入C++多态机制:从虚函数表到运行时类型识别》​
  • IDEA中使用Tomcat两种方式
  • C51单片机学习笔记——定时器与中断
  • API接口签名和敏感信息加密使用国密SM方案
  • 上电复位断言的自动化
  • go-redis Pipeline 与事务
  • 《计算机网络》实验报告五 DNS协议分析与测量
  • Dockerfile配置基于 Python 的 Web 应用镜像
  • 随着GPT-5测试中泄露OpenAI 预计将很快发布 揭秘GPT-5冲击波:OpenAI如何颠覆AI战场,碾压谷歌和Claude?
  • 单片机启动流程和启动文件详解
  • 数组算法之【合并两个有序数组】
  • 嵌入式硬件篇---舵机(示波器)
  • 设备健康管理实施案例:从技术架构到落地效果的全栈解析