当前位置: 首页 > news >正文

RabbitMQ 消息顺序性保证

方式一:Consumer设置exclusive

在这里插入图片描述

注意条件

  • 作用于basic.consume
  • 不支持quorum queue
    在这里插入图片描述
    当同时有A、B两个消费者调用basic.consume方法消费,并将exclusive设置为true时,第二个消费者会抛出异常:
com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: #method<channel.close>(reply-code=403, reply-text=ACCESS_REFUSED - queue 'test' in vhost '/' in exclusive use, class-id=60, method-id=20)at com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:190)at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:223)at com.rabbitmq.client.impl.ChannelN.basicConsume(ChannelN.java:981)at com.dms.rabbitmq.TopicSender.lambda$main$2(TopicSender.java:63)at java.base/java.lang.Thread.run(Thread.java:840)

Spring AMQP 如何通过exclusive实现顺序消费:

在这里插入图片描述
核心逻辑

while (!DirectMessageListenerContainer.this.started && isRunning()) {this.cancellationLock.reset();try {for (String queue : queueNames) {consumeFromQueue(queue);}}catch (AmqpConnectException | AmqpIOException e) {long nextBackOff = backOffExecution.nextBackOff();if (nextBackOff < 0 || e.getCause() instanceof AmqpApplicationContextClosedException) {DirectMessageListenerContainer.this.aborted = true;shutdown();this.logger.error("Failed to start container - fatal error or backOffs exhausted",e);this.taskScheduler.schedule(this::stop, Instant.now());break;}this.logger.error("Error creating consumer; retrying in " + nextBackOff, e);doShutdown();try {Thread.sleep(nextBackOff); // NOSONAR}catch (InterruptedException e1) {Thread.currentThread().interrupt();}continue; // initialization failed; try again having rested for backOff-interval}DirectMessageListenerContainer.this.started = true;DirectMessageListenerContainer.this.startedLatch.countDown();
}
  1. 抛出异常后,会重试
  2. 重试间隔、次数受recoveryInterval(默认无限)、recoveryBackOff控制

方式二:single active consumer

在这里插入图片描述

原理:

在这里插入图片描述

代码示例

Channel ch = ...;
Map<String, Object> arguments = newHashMap<String, Object>();
arguments.put("x-single-active-consumer", true);
ch.queueDeclare("my-queue", false, false, false, arguments);

在这里插入图片描述
参考资料:https://www.rabbitmq.com/blog/2022/07/05/rabbitmq-3-11-feature-preview-single-active-consumer-for-streams

http://www.lryc.cn/news/534565.html

相关文章:

  • 防御保护作业二
  • Spring Boot中实现多租户架构
  • 【AI-27】DPO和PPO的区别
  • Git stash 暂存你的更改(隐藏存储)
  • 负载测试和压力测试的原理分别是什么
  • shell脚本控制——定时运行作业
  • LeetCode 热题 100 回顾
  • HTML5--网页前端编程(上)
  • 气体控制器联动风机,检测到环境出现异常时自动打开风机进行排风;
  • 示波器使用指南
  • Post-trained猜想
  • javaEE-10.CSS入门
  • eclipse配置Spring
  • 爬虫技巧汇总
  • 基于UVM搭验证环境
  • 【JavaWeb10】服务器渲染技术 --- JSP
  • 【Hadoop】大数据权限管理工具Ranger2.1.0编译
  • 微软AI研究团队推出LLaVA-Rad:轻量级开源基础模型,助力先进临床放射学报告生成
  • 06排序 + 查找(D2_查找(D1_基础学习))
  • 网站快速收录的秘诀:关键词布局与优化
  • AI大语言模型
  • 03-DevOps-安装并初始化Gitlab
  • Mac重复文件,一键查找并清理的工具
  • Unity Mesh 切割算法详解
  • ASUS/华硕天选1 FA506I 原厂Win10 专业版系统 工厂文件 带ASUS Recovery恢复 教程
  • 【计算机中级职称 信息安全工程师 备考】密码学知识,经典题目
  • 期权帮|初识股指期货:股指期货的交割结算价是怎么来的?
  • 伺服使能的含义解析
  • 数据集成实例分享:金蝶云星空对接旺店通实现库存管理自动化
  • Android 常用设计模式和实例