Springboot 集成 RocketMQ(进阶-消息)
0. 入门篇
Springboot 集成 RocketMq(入门)-CSDN博客
1. 异步消息
1.1 生产者
@GetMapping("/send/async/{messageBody}")public String sendAsyncMsg(@PathVariable("messageBody") String messageBody) {// 构建消息对象Message message = new Message();message.setTopic("async");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes());rocketMqTemplate.asyncSend("async", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendAsyncMsg success messageBody:{}", messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendAsyncMsg fail messageBody:{}", messageBody);}});return "OK";}
1.2 消费者
@Component
@RocketMQMessageListener(topic = "async", consumerGroup = "async")
@Slf4j
public class MyAsyncConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("Received async message: {}", message);}}
2.广播消息
2.1 生产者
@GetMapping("/send/broadcast/{messageBody}")public String sendBroadcastMsg(@PathVariable("messageBody") String messageBody) {// 发送消息rocketMqTemplate.convertAndSend("broadcast", messageBody);return "OK";}
2.2 消费者
消费者监听注解上设置 messageModel = MessageModel.BROADCASTING,默认是(MessageModel.CLUSTERING)。
@Component
@RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast1", messageModel = MessageModel.BROADCASTING )
@Slf4j
public class MyBroadcast1Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("MyBroadcast1Consumer Received broadcast message: {}", message);}}@Component
@RocketMQMessageListener(topic = "broadcast", consumerGroup = "broadcast2", messageModel = MessageModel.BROADCASTING )
@Slf4j
public class MyBroadcast2Consumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("MyBroadcast2Consumer Received broadcast message: {}", message);}}
3. 延时、定时消息
3.1 延时消息
3.1.1 生产者
message.setDelayTimeMs(10);
@GetMapping("/send/delayed/{messageBody}")public String sendDelayedMsg(@PathVariable("messageBody") String messageBody) {// 发送消息Message message = new Message();message.setTopic("delayed");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes(StandardCharsets.UTF_8));// 延迟 10s 后投递message.setDelayTimeMs(10);// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(message);log.info("sendDelayedMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendDelayedMsg error", e);return "FAIL";}return "OK";}
3.1.2 消费者
@Component
@RocketMQMessageListener(topic = "delayed", consumerGroup = "delayed")
@Slf4j
public class MyDelayedConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received delayed message: {}", message);}}
3.2 定时消息
3.2.1 生产者
message.setDeliverTimeMs(System.currentTimeMillis() + 10);
@GetMapping("/send/scheduled/{messageBody}")public String sendScheduledMsg(@PathVariable("messageBody") String messageBody) {// 发送消息Message message = new Message();message.setTopic("scheduled");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes(StandardCharsets.UTF_8));// 定时 10s 后投递message.setDeliverTimeMs(System.currentTimeMillis() + 10);// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(message);log.info("sendScheduledMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendScheduledMsg error", e);return "FAIL";}return "OK";}
3.2.2 消费者
@Component
@RocketMQMessageListener(topic = "scheduled", consumerGroup = "scheduled")
@Slf4j
public class MyScheduledConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received scheduled message: {}", message);}}
4.批量消息
4.1 生产者
@GetMapping("/send/batch/{messageBody}")public String sendBatchMsg(@PathVariable("messageBody") String messageBody) {List<Message> messageList = new ArrayList<>();for (int i = 0; i < 10; i++) {Message message = new Message();message.setTopic("batch");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((messageBody + i).getBytes(StandardCharsets.UTF_8));}// 发送消息,打印日志SendResult sendResult = null;try {sendResult = defaultMqProducer.send(messageList);log.info("sendBatchMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendBatchMsg error", e);return "FAIL";}return "OK";}
4.2 消费者
@Component
@RocketMQMessageListener(topic = "batch", consumerGroup = "batch")
@Slf4j
public class MyBatchConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received batch message: {}", message);}}
5.顺序消息
5.1 局部有序
局部消息指的是消费者消费某个topic的某个队列中的消息是顺序的【队列扩容时操作部分数据乱序】。
5.1.1 生产者
@GetMapping("/send/partOrder/{targetId}/{messageBody}")public String sendPartOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = new Message();message.setTopic("partOrder");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((targetId + "_1_" + messageBody).getBytes(StandardCharsets.UTF_8));// 方法1// 同步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuerocketMqTemplate.syncSendOrderly("partOrder", message, targetId.toString());// 方法2// 异步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuemessage.setBody((targetId + "_2_" + messageBody).getBytes(StandardCharsets.UTF_8));rocketMqTemplate.asyncSendOrderly("partOrder", message, targetId.toString(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendPartOrderMsg success targetId:{}, messageBody:{}", targetId, messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendPartOrderMsg fail targetId:{}, messageBody:{}", targetId, messageBody);}});// 方法3// 发送消息,打印日志SendResult sendResult = null;try {message.setBody((targetId + "_3_" + messageBody).getBytes(StandardCharsets.UTF_8));sendResult = defaultMqProducer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object targetId) {Integer id = (Integer) targetId;int index = id % list.size();return list.get(index);}}, targetId);log.info("sendPartOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendPartOrderMsg error", e);return "FAIL";}return "OK";}
5.1.2 消费者
@Component
@RocketMQMessageListener(topic = "partOrder", consumerGroup = "partOrder", consumeMode = ConsumeMode.ORDERLY)
@Slf4j
public class MyPartOrderConsumer implements RocketMQListener<Message> {@Overridepublic void onMessage(Message message) {// 处理消息的逻辑log.info("Received partOrder message: {}", message);}}
5.2 全局有序
消费者消费全部消息都是顺序的,只能通过一个某个topic只有一个队列才能实现,这种应用场景较少,且性能较差。【向唯一队列中发送消息,队列无法扩展】。
5.2.1 生产者
@GetMapping("/send/overallOrder/{targetId}/{messageBody}")public String sendOverallOrderMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = new Message();message.setTopic("partOrder");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody((targetId + "_1_" + messageBody).getBytes(StandardCharsets.UTF_8));// 方法1// 同步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuerocketMqTemplate.syncSendOrderly("overallOrder", message, OVER_ALL_ORDER_KEY);// 方法2// 异步顺序消息// 默认根据id值采用SelectMessageQueueByHash策略来选择MessageQueuemessage.setBody((targetId + "_2_" + messageBody).getBytes(StandardCharsets.UTF_8));rocketMqTemplate.asyncSendOrderly("overallOrder", message, OVER_ALL_ORDER_KEY, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendOverallOrderMsg success targetId:{}, messageBody:{}", targetId, messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendOverallOrderMsg fail targetId:{}, messageBody:{}", targetId, messageBody);}});// 方法3// 发送消息,打印日志SendResult sendResult = null;try {message.setBody((targetId + "_3_" + messageBody).getBytes(StandardCharsets.UTF_8));sendResult = defaultMqProducer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object targetId) {return list.get(0);}}, targetId);log.info("sendOverallOrderMsg msgId:{},sendStatus:{}", sendResult.getMsgId(), sendResult.getSendStatus());} catch (Exception e) {log.info("sendOverallOrderMsg error", e);return "FAIL";}return "OK";}
6.事务消息
6.1 生产者
@RestController
@Slf4j
public class TransactionProducerController {@Resourceprivate RocketMQTemplate rocketMqTemplate;@GetMapping("/send/transaction/{targetId}/{messageBody}")public String sendTransactionMsg(@PathVariable("targetId") Integer targetId, @PathVariable("messageBody") String messageBody) {Message message = MessageBuilder.withPayload(messageBody).build();TransactionSendResult transaction = rocketMqTemplate.sendMessageInTransaction("transaction", message, targetId);return transaction.getTransactionId();}}@Slf4j
@RocketMQTransactionListener
public class TransactionMsgListener implements RocketMQLocalTransactionListener {/** 执行本地事务(在发送消息成功时执行) */@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {//模拟一个处理结果int index=2;/*** 模拟返回事务状态*/switch (index){case 1://处理业务String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);log.info("本地事务回滚,回滚消息,"+jsonStr);//返回ROLLBACK状态的消息会被丢弃return RocketMQLocalTransactionState.ROLLBACK;case 2://返回UNKNOW状态的消息会等待Broker进行事务状态回查log.info("需要等待Broker进行事务状态回查");return RocketMQLocalTransactionState.UNKNOWN;default:log.info("事务提交,消息正常处理");//返回COMMIT状态的消息会立即被消费者消费到return RocketMQLocalTransactionState.COMMIT;}}/*** 检查本地事务的状态* 回查间隔时间:系统默认每隔60秒发起一次定时任务,对未提交的半事务消息进行回查,共持续12小时。* 第一次消息回查最快*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {String transactionId = message.getHeaders().get("__transactionId__").toString();log.info("检查本地事务状态,transactionId:{}", transactionId);return RocketMQLocalTransactionState.COMMIT;}
}
6.2 消费者
@Component
@RocketMQMessageListener(topic = "transaction", consumerGroup = "transaction")
@Slf4j
public class MyTransactionConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 处理消息的逻辑log.info("Received transaction message: {}", message);}}
7.单项消息
单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败并不考虑。通常用于对可靠性要求不高的场景。
@GetMapping("/send/oneWay/{messageBody}")public String sendOneWayMsg(@PathVariable("messageBody") String messageBody) {// 构建消息对象Message message = new Message();message.setTopic("oneWay");message.setTags("boot-mq-tag");message.setKeys("boot-mq-key");message.setBody(messageBody.getBytes());// 单向不可靠消息 void 方法无返回值rocketMqTemplate.sendOneWay("oneWay", message);// 同步可靠消息SendResult syncResult = rocketMqTemplate.syncSend("oneWay", message);// 异步可到消息rocketMqTemplate.asyncSend("oneWay", message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("sendOneWayMsg success messageBody:{}", messageBody);}@Overridepublic void onException(Throwable throwable) {log.error("sendOneWayMsg fail messageBody:{}", messageBody);}});return "OK";}