当前位置: 首页 > news >正文

系列六、Springboot操作RocketMQ

一、同步消息

1.1、发送&接收简单消息

1.1.1、发送简单消息

/*** 测试发送简单消息*/
@Test
public void sendSimpleMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_SIMPLE", "我是一个简单消息");// 往[BOOT_TOPIC_SIMPLE]主题发送一个简单消息log.info("测试发送简单消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:39:18.296  INFO 14700 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送简单消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_SIMPLE"},"msgId":"7F000001396C18B4AAC230D9778C0000","offsetMsgId":"C0A8B58A00002A9F0000000000029FA6","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.1.2、接收简单消息

/*** @Description: 消费者消息消息,就添加一个监听*/
@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_SIMPLE",consumerGroup = "BOOT_TOPIC_SIMPLE_GROUP",messageModel = MessageModel.CLUSTERING)
public class MySimpleMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?*      没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收简单消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 11:39:18.293  INFO 6044 --- [_SIMPLE_GROUP_2] o.star.listener.MySimpleMessageListener  : 接收简单消息message:我是一个简单消息

1.2、发送&接收对象消息

1.2.1、发送对象消息

/*** 测试发送对象消息*/
@Test
public void sendObjectMessage() {Order order = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");// 往BOOT_TOPIC_OBJ主题发送一个订单对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_OBJ", order);log.info("测试发送对象消息result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 11:42:57.879  INFO 35812 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送对象消息result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_OBJ"},"msgId":"7F0000018BE418B4AAC230DCD14D0000","offsetMsgId":"C0A8B58A00002A9F000000000002A0C2","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.2.2、接收对象消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_OBJ", consumerGroup = "BOOT_TOPIC_OBJ_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyObjectMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收对象消息message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 13:44:35.458  INFO 35816 --- [PIC_OBJ_GROUP_1] o.star.listener.MyObjectMessageListener  : 接收对象消息message:{"orderSn":"f5c39c2e86f74649b9582e5e50c500ff","userId":1,"description":"小米2s,为发烧而生"}

1.3、发送&接收集合消息

1.3.1、发送集合消息

/*** 测试发送集合消息*/
@Test
public void sendCollectionMessage() {Order order1 = new Order(UUID.randomUUID().toString().replace("-", ""), 1, "小米2s,为发烧而生");Order order2 = new Order(UUID.randomUUID().toString().replace("-", ""), 2, "小米3s,为发烧而生,你值得拥有");List<Order> orders = Arrays.asList(order1, order2);// 往[BOOT_TOPIC_COLLECTION]主题发送集合对象SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_COLLECTION", orders);log.info("测试发送集合消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 13:50:25.053  INFO 28696 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送集合消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_COLLECTION"},"msgId":"7F000001701818B4AAC2315181130000","offsetMsgId":"C0A8B58A00002A9F000000000002A21F","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

1.3.2、接收集合消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_COLLECTION", consumerGroup = "BOOT_TOPIC_COLLECTION_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyCollectionMessageListener implements RocketMQListener<MessageExt> {/*** 这个方法就是消费者的方法,如果泛型指定了固定的类型,那么消息体就是我们的参数。MessageExt类型消息是消息的所有内容。* 如何判断消息是否接收?*      没有报错就表示消息被签收了,如果报错了就表示消息被拒收了,会重试* @param messageExt*/@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收集合消息message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 13:50:39.726  INFO 30076 --- [LECTION_GROUP_1] o.s.l.MyCollectionMessageListener        : 接收集合消息message:[{"orderSn":"141bb7c6535b472d83a5099a43422d04","userId":1,"description":"小米2s,为发烧而生"},{"orderSn":"d2b41a75e087455e8910c1cba84f830f","userId":2,"description":"小米3s,为发烧而生,你值得拥有"}]

二、异步消息

发送&接收异步消息

发送异步消息

/*** 测试发送异步消息** @throws Exception*/
@Test
public void sendASyncSimpleMessage() throws Exception {rocketMQTemplate.asyncSend("BOOT_TOPIC_ASYNC", "我是一个异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult result) {log.info("测试发送异步消息 result:{}", JSON.toJSONString(result));}@Overridepublic void onException(Throwable throwable) {log.info("测试发送异步消息 error:{}", throwable.getMessage());}});log.info("我先执行");// 挂起JVM不让方法结束System.in.read();
}
// 控制台打印结果
2023-08-10 14:02:21.125  INFO 30988 --- [           main] cketmqSpringbootProducerApplicationTests : 我先执行
2023-08-10 14:02:21.793  INFO 30988 --- [ublicExecutor_1] cketmqSpringbootProducerApplicationTests : 测试发送异步消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":0,"topic":"BOOT_TOPIC_ASYNC"},"msgId":"7F000001790C18B4AAC2315C70D50000","offsetMsgId":"C0A8B58A00002A9F000000000002A3FC","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收异步消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ASYNC",consumerGroup = "BOOT_TOPIC_ASYNC_GROUP",messageModel = MessageModel.CLUSTERING)
public class MyASyncMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收异步消息 message:{}",new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:02:36.334  INFO 10676 --- [C_ASYNC_GROUP_1] o.star.listener.MyASyncMessageListener   : 接收异步消息 message:我是一个异步消息

三、单向消息

发送&接收单向消息

发送单向消息

/*** 适用场景:适用于不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送*/
@Test
public void sendOnewayMessage() {// 发送单向消息,没有返回值和结果rocketMQTemplate.sendOneWay("BOOT_TOPIC_ONE_WAY", "我是一个单向消息");
}

接收单向消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ONE_WAY", consumerGroup = "BOOT_TOPIC_ONE_WAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyOnewayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收单向消息 message:{}",new String(messageExt.getBody()));}}
// 控制台打印结果
2023-08-10 14:07:23.965  INFO 32740 --- [ONE_WAY_GROUP_1] o.star.listener.MyOnewayMessageListener  : 接收单向消息 message:我是一个单向消息

四、延迟消息

发送&接收延迟消息

发送延迟消息

/*** 测试发送延迟消息*/
@Test
public void sendDelayMessage() {Message<String> message = MessageBuilder.withPayload("我是一个延迟消息").build();/*** 设定消息的延迟等级(1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h)*      1s: 等级1*      5s: 等级2*      10s:等级3*      30s:等级4*      1m: 等级5* 发送一个延迟消息,延迟等级为4级,也就是30s后被监听消费* 注意事项:RocketMQ不支持任意时间的延时,只支持上述的延迟规则*/SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_DELAY", message, 2000, 4);log.info("测试发送延迟消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:13:03.859  INFO 4804 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送延迟消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_DELAY"},"msgId":"7F00000112C418B4AAC231663CE60000","offsetMsgId":"C0A8B58A00002A9F000000000002A634","queueOffset":4,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收延迟消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_DELAY", consumerGroup = "BOOT_TOPIC_DELAY_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyDelayMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收延迟消息 message:{}",new String(messageExt.getBody()));}}// 控制台打印结果
2023-08-10 14:13:33.860  INFO 26112 --- [C_DELAY_GROUP_1] o.star.listener.MyDelayMessageListener   : 接收延迟消息 message:我是一个延迟消息

五、顺序消息

发送&接收顺序消息

发送顺序消息

/*** 发送顺序消息,控制流程:下订单==》发短信==》物流* 测试发送顺序消息*/
@Test
public void sendOrderlyMessage() {// 顺序消息,发送者将一组消息都发送至同一个队列,消费者需要单线程进行消费List<Order> orders = Arrays.asList(new Order("aaa", 1, "下订单"),new Order("aaa", 1, "发短信"),new Order("aaa", 1, "物流"),new Order("bbb", 2, "下订单"),new Order("bbb", 2, "发短信"),new Order("bbb", 2, "物流"));orders.forEach(order -> {// 发送,一般都是以json的方式进行处理SendResult result = rocketMQTemplate.syncSendOrderly("BOOT_TOPIC_ORDERLY", JSON.toJSONString(order), order.getOrderSn());log.info("订单id:{},队列id:{},结果:{}",order.getUserId(),result.getMessageQueue().getQueueId(),result.getSendStatus());});
}
// 控制台打印结果
2023-08-10 14:23:02.023  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.026  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.028  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:1,队列id:1,结果:SEND_OK
2023-08-10 14:23:02.029  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.030  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK
2023-08-10 14:23:02.031  INFO 33668 --- [           main] cketmqSpringbootProducerApplicationTests : 订单id:2,队列id:2,结果:SEND_OK

接收顺序消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_ORDERLY",consumerGroup = "BOOT_TOPIC_ORDERLY_GROUP",consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式,单线程maxReconsumeTimes = 5              // 消费重试的次数
)
public class MyOrderlyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {Order order = JSON.parseObject(new String(messageExt.getBody()),Order.class);log.info("接收顺序消息order:{}",order);}
}
// 控制台打印结果
2023-08-10 14:23:46.714  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=下订单)
2023-08-10 14:23:46.714  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=下订单)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=发短信)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_1] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=aaa, userId=1, description=物流)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=发短信)
2023-08-10 14:23:46.901  INFO 27356 --- [ORDERLY_GROUP_2] o.s.listener.MyOrderlyMessageListener    : 接收顺序消息order:Order(orderSn=bbb, userId=2, description=物流)

六、带Tag的消息

发送&接收带Tag的消息

发送带Tag消息

/*** 测试发送带Tag的消息*/
@Test
public void sendTagMessage() {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_TAG:TagA", "我是一个带Tag的消息");log.info("测试发送带Tag的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:34:07.806  INFO 30388 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送带Tag的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":3,"topic":"BOOT_TOPIC_TAG"},"msgId":"7F00000176B418B4AAC2317986310000","offsetMsgId":"C0A8B58A00002A9F000000000002B029","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收带Tag消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_TAG",consumerGroup = "BOOT_TOPIC_TAG_GROUP",messageModel = MessageModel.CLUSTERING,selectorType = SelectorType.TAG,    // tag过滤模式selectorExpression = "TagA || TagB"
)
public class MyTagMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Tag标签的消息 result:{}", new String(messageExt.getBody()));}
}
// 控制台打印结果
2023-08-10 14:34:27.260  INFO 10868 --- [PIC_TAG_GROUP_1] org.star.listener.MyTagMessageListener   : 接收带Tag标签的消息 result:我是一个带Tag的消息

七、带Key的消息

发送&接收带Key的消息

发送带Key的消息

/*** 测试发送带Key的消息*/
@Test
public void sendKeyMessage() {// key写在消息头里边Message<String> message = MessageBuilder.withPayload("我是一个带Key的消息").setHeader(RocketMQHeaders.KEYS, "STAR").build();SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_KEY", message);log.info("测试发送带Key的消息 result:{}", JSON.toJSONString(result));
}
// 控制台打印结果
2023-08-10 14:45:42.963  INFO 26148 --- [           main] cketmqSpringbootProducerApplicationTests : 测试发送带Key的消息 result:{"messageQueue":{"brokerName":"broker-a","queueId":2,"topic":"BOOT_TOPIC_KEY"},"msgId":"7F000001662418B4AAC2318421A80000","offsetMsgId":"C0A8B58A00002A9F000000000002B275","queueOffset":0,"regionId":"DefaultRegion","sendStatus":"SEND_OK","traceOn":true}

接收带Key的消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_KEY",consumerGroup = "BOOT_TOPIC_KEY_GROUP",messageModel = MessageModel.CLUSTERING
)
public class MyKeyMessageListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("接收带Key的消息:{},Key:{}",new String(messageExt.getBody()), messageExt.getKeys());}
}
// 控制台打印结果
2023-08-10 14:45:42.963  INFO 23040 --- [PIC_KEY_GROUP_1] org.star.listener.MyKeyMessageListener   : 接收带Key的消息:我是一个带Key的消息,Key:STAR

八、集群模式的消息

发送&接收集群模式的消息

发送集群消息

/*** 测试消息消费模式-集群模式* 此种方式消费者会采取轮询的方式进行消费*/
@Test
public void modeForClusterSendMessage() {for (int i = 1; i <= 10; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_CLUSTER", "我是第" + i + "个消息");log.info("集群模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}// 控制台打印结果
2023-08-10 14:58:14.203  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.207  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.211  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.213  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.216  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.218  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK
2023-08-10 14:58:14.220  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:3,结果:SEND_OK
2023-08-10 14:58:14.222  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:0,结果:SEND_OK
2023-08-10 14:58:14.224  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:1,结果:SEND_OK
2023-08-10 14:58:14.227  INFO 13416 --- [           main] cketmqSpringbootProducerApplicationTests : 集群模式,队列id:2,结果:SEND_OK

接收集群消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:{}",new String(messageExt.getBody()));}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_CLUSTER", consumerGroup = "BOOT_TOPIC_CLUSTER_GROUP", messageModel = MessageModel.CLUSTERING)
public class MyClusterMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:{}",new String(messageExt.getBody()));}
}// 控制台打印结果
2023-08-10 14:58:14.209  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第1个消息
2023-08-10 14:58:14.212  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第2个消息
2023-08-10 14:58:14.212  INFO 7300 --- [CLUSTER_GROUP_1] o.s.listener.MyClusterMessageListener1   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第3个消息
2023-08-10 14:58:14.216  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第4个消息
2023-08-10 14:58:14.217  INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第5个消息
2023-08-10 14:58:14.220  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第6个消息
2023-08-10 14:58:14.222  INFO 7300 --- [CLUSTER_GROUP_2] o.s.listener.MyClusterMessageListener1   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第1个消费者 message:我是第7个消息
2023-08-10 14:58:14.223  INFO 7300 --- [CLUSTER_GROUP_4] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第8个消息
2023-08-10 14:58:14.226  INFO 7300 --- [CLUSTER_GROUP_5] o.s.listener.MyClusterMessageListener3   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第3个消费者 message:我是第9个消息
2023-08-10 14:58:14.229  INFO 7300 --- [CLUSTER_GROUP_3] o.s.listener.MyClusterMessageListener2   : 我是集群模式[BOOT_TOPIC_CLUSTER_GROUP]组的第2个消费者 message:我是第10个消息

九、广播模式的消息

发送&接收广播模式的消息

发送广播模式的消息

/*** 测试消息消费模式-广播模式* 此种方式每一个消费者都会消费一次消息*/
@Test
public void modeForBroadcastingSendMessage() {for (int i = 1; i <= 5; i++) {SendResult result = rocketMQTemplate.syncSend("BOOT_TOPIC_BROADCASTING", "我是第" + i + "个消息");log.info("广播模式,队列id:{},结果:{}", result.getMessageQueue().getQueueId(),result.getSendStatus());}
}
// 控制台打印结果
2023-08-10 15:12:10.081  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK
2023-08-10 15:12:10.084  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:1,结果:SEND_OK
2023-08-10 15:12:10.086  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:2,结果:SEND_OK
2023-08-10 15:12:10.088  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:3,结果:SEND_OK
2023-08-10 15:12:10.090  INFO 37728 --- [           main] cketmqSpringbootProducerApplicationTests : 广播模式,队列id:0,结果:SEND_OK

接收广播模式的消息

@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener2 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}@Slf4j
@Component
@RocketMQMessageListener(topic = "BOOT_TOPIC_BROADCASTING", consumerGroup = "BOOT_TOPIC_BROADCASTING_GROUP", messageModel = MessageModel.BROADCASTING)
public class MyBroadcastingMessageListener3 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {log.info("我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:{},队列id:{}",new String(messageExt.getBody()),messageExt.getQueueId());}
}// 控制台打印结果
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第2个消息,队列id:1
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_2] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_3] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第1个消息,队列id:0
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_4] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第4个消息,队列id:3
2023-08-10 15:12:10.089  INFO 34160 --- [CASTING_GROUP_1] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第3个消息,队列id:2
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener3     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第3个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener2     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第2个消费者 message:我是第5个消息,队列id:0
2023-08-10 15:12:10.092  INFO 34160 --- [CASTING_GROUP_5] o.s.l.MyBroadcastingMessageListener1     : 我是广播模式[BOOT_TOPIC_BROADCASTING]组的第1个消费者 message:我是第5个消息,队列id:0

http://www.lryc.cn/news/118269.html

相关文章:

  • 【jupyter异常错误】Kernel started:No module named ipykernel_launcher
  • 使用langchain与你自己的数据对话(五):聊天机器人
  • 爬虫与搜索引擎优化:通过Python爬虫提升网站搜索排名
  • 2024软考系统架构设计师论文写作要点
  • 【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承
  • 数组slice、splice字符串substr、split
  • 程序漏洞:安全威胁的隐患
  • 0基础学C#笔记09:希尔排序法
  • DOCKER的容器
  • 跳跃游戏——力扣55
  • 将本地项目上传至gitee的详细步骤
  • iOS开发-导航栏UINavigationBar隐藏底部线及透明度
  • 题目:2520.统计能整除数字的位数
  • matplotlib 笔记 注释annotate
  • Windows 无法安装到这个硬盘。选中的磁盘具有MBR分区。在EFI系统上,Windows只能安装到GPT磁盘
  • 学C的第三十三天【C语言文件操作】
  • 线性表的基本操作及在顺序存储及链式存储的实现
  • 合宙Air724UG LuatOS-Air script lib API--nvm
  • springboot单元测试的详细介绍
  • Apache Doris 入门教程26:资源管理
  • 【金融量化】Python实现根据收益率计算累计收益率并可视化
  • 解读spring中@Value 如何将配置转自定义的bean
  • 前端开发实习总结参考范文(合集)
  • ♥ vue中$forceUpdate()
  • Java一般用于postgis空间数据库通用的增删查改sql命令
  • 【C++类和对象】类有哪些默认成员函数呢?(上)
  • (docker)mysql镜像拉取-创建容器-容器的使用【个人笔记】
  • 【时间格式引发的事故】
  • 【数据结构】栈及其实现
  • Linux命令200例:mount将文件系统挂载到指定目录下(常用)