RocketMQ核心编程模型
RocketMQ核心编程模型与SpringBoot整合深度解析
笔记整理自RocketMQ官方文档与实战经验 | 图灵楼兰出品
配套视频课程学习效果更佳
一、RocketMQ架构核心回顾
RocketMQ采用经典发布-订阅模型,核心组件包括:
- NameServer:轻量级服务发现中心(无状态)
- Broker:消息存储与转发节点(主从架构)
- Producer:消息生产者
- Consumer:消息消费者
二、深入消息模型
1. 客户端基础流程
生产者固定步骤
// 1. 创建生产者(指定组名)
DefaultMQProducer producer = new DefaultMQProducer("group_name");
// 2. 配置NameServer地址
producer.setNamesrvAddr("192.168.65.112:9876");
// 3. 启动服务
producer.start();
// 4. 构建消息(Topic/Tag/Body)
Message msg = new Message("TopicTest", "TagA", "Hello".getBytes());
// 5. 发送消息
SendResult result = producer.send(msg);
// 6. 关闭生产者
producer.shutdown();
消费者固定步骤
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setNamesrvAddr("192.168.65.112:9876");
consumer.subscribe("TopicTest", "*"); // 订阅Topic
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {msgs.forEach(msg -> System.out.println(new String(msg.getBody())));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 消费状态
});
consumer.start();
2. 消息确认机制
三种发送方式对比
发送方式 | 特点 | 适用场景 |
---|---|---|
单向发送 | 不关心结果,吞吐量最高 | 日志收集等低可靠性场景 |
同步发送 | 阻塞等待Broker响应 | 金融交易等高可靠性场景 |
异步发送 | 回调处理结果,平衡性能与可靠性 | 电商下单等并发场景 |
消费端重试策略
- 返回
RECONSUME_LATER
触发重试 - 最大重试次数默认16次(可配置)
- 重试消息进入专属重试Topic:
%RETRY%+ConsumerGroup
3. 高级消息类型
顺序消息(局部有序)
// 生产者:相同订单号的消息发往同一队列
Message msg = new Message("OrderTopic", "PAY", orderId.getBytes(), body);
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;return mqs.get(id % mqs.size()); // 自定义队列选择}
}, orderId);// 消费者:实现MessageListenerOrderly
consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {// 处理顺序消息...
});
事务消息(两阶段提交)
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {return LocalTransactionState.UNKNOW; // 执行本地事务}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {return LocalTransactionState.COMMIT_MESSAGE; // 事务回查}
});
延迟消息
// 指定延迟级别(1-18对应预设时间)
message.setDelayTimeLevel(3); // 10秒后投递// 指定精确时间点(5.0+版本)
message.setDeliverTimeMs(System.currentTimeMillis() + 30_000); // 30秒后
4. ACL权限控制
启用步骤:
- Broker端开启
aclEnable=true
- 配置
plain_acl.yml
:
accounts:
- accessKey: RocketMQsecretKey: 12345678topicPerms:- topicA=DENY- topicB=PUB|SUB
- 客户端添加认证Hook:
RPCHook rpcHook = new AclClientRPCHook(new SessionCredentials("RocketMQ", "12345678"));
DefaultMQProducer producer = new DefaultMQProducer(rpcHook);
三、SpringBoot整合实战
1. 快速集成
依赖配置:
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.1</version>
</dependency>
配置文件application.yml
:
rocketmq:name-server: 192.168.65.112:9876producer:group: springboot-group
2. 消息生产与消费
// 生产者模板
@Autowired private RocketMQTemplate rocketMQTemplate;public void sendMessage() {rocketMQTemplate.convertAndSend("TestTopic", "Hello SpringBoot");
}// 消费者监听
@RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic",consumeMode = ConsumeMode.CONCURRENTLY
)
public class Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("Received: " + message);}
}
3. 事务消息整合
@RocketMQTransactionListener
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {return RocketMQLocalTransactionState.UNKNOWN;}
}
四、客户端最佳实践
1. 消息三要素规范
属性 | 作用 |
---|---|
MessageId | Broker生成的消息唯一标识(不建议作业务主键) |
Key | 业务唯一键(如订单ID),用于消息追踪 |
Tag | 消息标签,用于高效过滤(性能远高于SQL过滤) |
2. 消费者幂等设计
重复消息场景:
- 网络闪断导致生产者重试
- 消费端ACK失败触发重投
- Rebalance过程消息重复
解决方案示例:
consumer.registerMessageListener((msgs, context) -> {MessageExt msg = msgs.get(0);String orderId = msg.getKeys(); // 获取业务主键// 分布式锁或数据库唯一索引校验if (orderService.isProcessed(orderId)) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 处理业务逻辑...
});
3. 死信队列处理
- 命名规则:
%DLQ%+ConsumerGroup
- 运维注意:默认权限禁读,需手动改为可读
- 处理方式:
- 查询死信原因:
sh mqadmin queryMsgById
- 修复后重新投递到正常Topic
- 设置单独消费者处理死信
- 查询死信原因:
4. 重试策略优化
// 调整最大重试次数(超过16次间隔固定2小时)
consumer.setMaxReconsumeTimes(10); // 重试间隔配置(需Broker端配合)
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 30m 1h
官方说明:RocketMQ 仅保证 At Least Once,业务需自行实现幂等
源码位置:org.apache.rocketmq.common.consumer.ConsumeFromWhere
总结:
RocketMQ的客户端设计充分考虑了金融级场景需求,通过多种消息模型组合可满足复杂业务场景。在实际使用中需特别注意:
- 生产环境务必启用ACL
- 顺序消息避免单队列堆积
- 死信队列监控不可或缺
- 消费者幂等是系统稳定性的生命线