当前位置: 首页 > news >正文

RocketMQ学习系列之——客户端消息确认机制

一、客户端使用MQ基本代码示例

1、添加maven依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.0</version>
</dependency>

2、生产者代码示例

public class Producer {public static void main(String[] args) throws MQClientException, InterruptedException {//初始化一个消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 指定nameserver地址producer.setNamesrvAddr("192.168.65.112:9876");// 启动消息生产者服务producer.start();for (int i = 0; i < 2; i++) {try {// 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容Message msg = new Message("TopicTest","TagA",("Hello RocketMQ " +i).getBytes(RemotingHelper.DEFAULT_CHARSET));//发送消息,获取发送结果SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);} catch (Exception e) {e.printStackTrace();Thread.sleep(1000);}}//消息发送完后,停止消息生产者服务。producer.shutdown();}
}

3、消费者代码示例

public class Consumer {public static void main(String[] args) throws InterruptedException, MQClientException {//构建一个消息消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");//指定nameserver地址consumer.setNamesrvAddr("192.168.65.112:9876");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 订阅一个感兴趣的话题,这个话题需要与消息的topic一致consumer.subscribe("TopicTest", "*");// 注册一个消息回调函数,消费到消息后就会触发回调。consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {msgs.forEach(messageExt -> {try {System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {}});return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});//启动消费者服务consumer.start();System.out.print("Consumer Started");}
}

4、代码逻辑解读

生产者:

1. 创建消息生产者producer,并指定生产者组名
2. 指定Nameserver地址
3. 启动producer。可以认为这是消息生产者与服务端建立连接的过程。
4. 创建消息对象,指定Topic、Tag和消息体
5. 发送消息
6. 关闭生产者producer,释放资源。

消费者:

1. 创建消费者Consumer,必须指定消费者组名
2. 指定Nameserver地址
3. 订阅主题Topic和Tag
4. 设置回调函数,处理消息
5. 启动消费者consumer。消费者会一直挂起,持续处理消息。

二、消息确认机制

1、生产者确认机制

        生产者发送消息的方式有三种:

(1)单向发送:消息生产者只管往Broker发送消息,而全然不关心Broker端有没有成功接收到消息。

public class OnewayProducer {public static void main(String[] args)throws Exception{DefaultMQProducer producer = new DefaultMQProducer("producerGroup");producer.start();Message message = new Message("Order","tag","order info : orderId = xxx".getBytes(StandardCharsets.UTF_8));producer.sendOneway(message);Thread.sleep(50000);producer.shutdown();}
}

        sendOneway方法没有返回值,如果发送失败,生产者无法补救。

        单向发送有一个好处,就是发送消息的效率更高。适用于一些追求消息发送效率,而允许消息丢失的业务场景。比如日志。

(2)同步发送:消息生产者在往Broker端发送消息后,会阻塞当前线程,等待Broker端的相应结果。

 SendResult sendResult = producer.send(msg);

        SendResult来自于Broker的反馈。producer在send发出消息,到Broker返回SendResult的过程中,无法做其他的事情。在SendResult中有一个SendStatus属性,这个SendStatus是一个枚举类型,其中包含了Broker端的各种情况。

public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}

        在这几种枚举值中,SEND_OK表示消息已经成功发送到Broker上。至于其他几种枚举值,都是表示消息在Broker端处理失败了。使用同步发送的机制,我们就可以在消息生产者发送完消息后,对发送失败的消息进行补救。例如重新发送。

        但是此时要注意,如果Broker端返回的SendStatus不是SEND_OK,也并不表示消息就一定不会推送给下游的消费者。仅仅只是表示Broker端并没有完全正确的处理这些消息。因此,如果要重新发送消息,最好要带上唯一的系统标识,这样在消费者端,才能自行做幂等判断。也就是用具有业务含义的OrderID这样的字段来判断消息有没有被重复处理。

        这种同步发送的机制能够很大程度上保证消息发送的安全性。但是,这种同步发送机制的发送效率比较低。毕竟,send方法需要消息在生产者和Broker之间传输一个来回后才能结束。如果网速比较慢,同步发送的耗时就会很长。

(3)异步发送:生产者在向Broker发送消息时,会同时注册一个回调函数。接下来生产者并不等待Broker的响应。当Broker端有响应数据过来时,自动触发回调函数进行对应的处理。

	producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});

        在SendCallback接口中有两个方法,onSuccess和onException。当Broker端返回消息处理成功的响应信息SendResult时,就会调用onSuccess方法。当Broker端处理消息超时或者失败时,就会调用onExcetion方法,生产者就可以在onException方法中进行补救措施。

        此时同样有几个问题需要注意。一是与同步发送机制类似,触发了SendCallback的onException方法同样并不一定就表示消息不会向消费者推送,例如:如果Broker端返回响应信息太慢,超过了超时时间,也会触发onException方法。二是在SendCallback的对应方法被触发之前,生产者不能调用shutdown()方法。如果消息处理完之前,生产者线程就关闭了,生产者的SendCallback对应方法就不会触发。这是因为使用异步发送机制后,生产者虽然不用阻塞下来等待Broker端响应,但是SendCallback还是需要附属于生产者的主线程才能执行。

2、消费者确认机制

        消费者收到消息后,向 Broker 响应消息来进行确认。

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});

        这个返回值是一个枚举值,有两个选项 CONSUME_SUCCESS和RECONSUME_LATER。如果消费者返回CONSUME_SUCCESS,那么消息自然就处理结束了。但是如果消费者没有处理成功,返回的是RECONSUME_LATER,Broker就会过一段时间再发起消息重试。

        为了兼顾重试机制的成功率和性能,RocketMQ设计了一套非常完善的消息重试机制:

(1)失败重试与死信Topic

        Broker不可能无限制的向消费失败的消费者推送消息,Broker会记录每一个消息的重试次数。如果一个消息经过很多次重试后,消费者依然无法正常处理,那么Broker会将这个消息推入到消费者组对应的死信Topic中。死信Topic相当于windows当中的垃圾桶。我们可以人工介入对死信Topic中的消息进行补救,也可以直接彻底删除这些消息。RocketMQ默认的最大重试次数是16次。

(2)重试Topic与原Topic分离

        为了让这些重试的消息不会影响Topic下其他正常的消息,Broker会给每个消费者组设计对应的重试Topic。MessageQueue是一个具有严格FIFO特性的数据结构。如果需要重试的这些消息还是放在原来的MessageQueue中,就会对当前MessageQueue产生阻塞,让其他正常的消息无法处理。RocketMQ的做法是给每个消费者组自动生成一个对应的重试Topic。在消息需要重试时,会先移动到对应的重试Topic中。后续Broker只要从这些重试Topic中不断拿出消息,往消费者组重新推送即可。这样,这些重试的消息有了自己单独的队列,就不会影响到Topic下的其他消息了。

(3)尽量保证同一消费者组具有相同的逻辑

        RocketMQ中设定的消费者组都是订阅主题和消费逻辑相同的服务备份,所以当消息重试时,Broker会往消费者组中任意一个实例推送。因此,我们在编码时,尽量要保证一个消费者组处理业务的逻辑相同。

(4)消费逻辑尽量避免异步

        Broker端最终只通过消费者组返回的状态来确定消息有没有处理成功。至于消费者组自己的业务执行是否正常,Broker端是没有办法知道的。因此,在实现消费者的业务逻辑时,应该要尽量使用同步实现方式,保证在自己业务处理完成之后再向Broker端返回状态。

http://www.lryc.cn/news/600177.html

相关文章:

  • 【AI论文】Franca:用于可扩展视觉表示学习的嵌套套娃聚类
  • 第七章 愿景11 琦琦复盘测试
  • Yolo底层原理学习(V1~V3)(第一篇)
  • Java研学-RabbitMQ(二)
  • C语言————原码 补码 反码 (超绝详细解释)
  • JAVA_FourTEEN_常见算法
  • 笔试——Day18
  • LlamaIndex 和 Elasticsearch Rerankers:无与伦比的简洁
  • Mysql实现高可用(主从、集群)
  • 【运维】ubuntu 安装图形化界面
  • iOS苹果和Android安卓测试APP应用程序的区别差异
  • 地铁逃生
  • 浅谈生成式AI语言模型的现状与展望
  • gig-gitignore工具实战开发(三):gig add基础实现
  • 服务端处于 TIME_WAIT 状态的 TCP 连接,收到相同四元组的 SYN 后会发生什么?详解
  • 引用文章链接
  • VSCode——python选择解释器消失的解决办法
  • pytorch学习笔记-使用DataLoader加载固有Datasets(CIFAR10),使用tensorboard进行可视化
  • 前端-html+CSS基础到高级(一)html基础
  • SEO实战派白杨SEO:SEO中说的框计算、知心搜索(知识图谱)是什么?有什么用处?
  • Microsoft SharePointServer 远程命令执行漏洞复现(CVE-2025-53770)
  • 学习笔记《区块链技术与应用》第二天 共识机制
  • Vue2上
  • 机器学习(九):KNN算法全解析与项目实践
  • C/C++---I/O性能优化
  • 谁将统治AI游戏时代?腾讯、网易、米哈游技术暗战
  • 《C++ vector 完全指南:vector的模拟实现》
  • LeetCode|Day25|389. 找不同|Python刷题笔记
  • UE5多人MOBA+GAS 30、技能升级机制
  • 动漫花园资源网在线观看,动漫花园镜像入口