当前位置: 首页 > news >正文

RabbitMQ队列的选择

文章目录

  • 前言
  • 一、Rabbit MQ队列的选择
    • 1.1、Classic
    • 1.2、Quorum
    • 1.3、Stream
    • 1.4、懒队列


前言

  本篇是Rabbit MQ高级特性的学习笔记,记录RabbitMQ的Classic,Quorum,Stream队列,懒队列的特性和运用场景


一、Rabbit MQ队列的选择

  Rabbit MQ默认提供了三种队列:

  • Classic:经典队列,在单机模式下是最常用的。
  • Quorum:仲裁队列,通常用于集群环境下,保证集群的高可用性。
  • Stream:流式队列,是自3.9.0版本开始引入的新特性,这种队列类型的消息是持久化到磁盘并且具备分布式备份的。

1.1、Classic

  经典队列是Rabbit MQ默认的队列,与Spring Boot整合时,使用new Queue,创建的队列就是普通队列:
在这里插入图片描述
  经典队列在创建时除了指定队列的名称,还有额外的四个选项:
在这里插入图片描述
  对应的是页面上的:
在这里插入图片描述
  durable代表了队列是否进行持久化,如果开启持久化,则会将队列保存到磁盘上,将来Rabbit MQ重启后,可以从磁盘恢复,否则队列只存在于内存中,服务重启后会自动删除。这里的durable只是设置了队列的持久化。消息和交换机,同样可以设置持久化。如果消息设置了持久化,而队列未设置持久化,重启之后,消息依旧会丢失。所以如果要保证消息的零丢失,消息和队列都需要设置持久化

// 队列声明
channel.queueDeclare("safe_queue", true, false, false, null); // Durability=true// 消息发布
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().deliveryMode(2) // 持久化消息.build();

  exclusive代表了队列是否排他。当属性为true时,表示仅允许声明了该队列的连接进行访问,其他连接无法访问该队列。通常用于单消费者模式,确保队列只能被特定消费者访问。
  autoDelete代表了队列是否自动删除,如果设置为true,表示该队列在最后一个消费者断开连接之后,进行删除操作。通常用于临时队列的场景。
  arguments可以指定更多的参数,比如指定死信交换机和路由键,消息过期时间,最大长度,是否为懒队列等

1.2、Quorum

  Quorum是针对镜像队列的一种优化,目前已经取代了镜像队列,作为Rabbit MQ集群部署保证高可用性的解决方案。传统的镜像队列,是将消息副本存储在一组节点上,以提高可用性和可靠性。镜像队列将队列中的消息复制到一个或多个其他节点上,并使这些节点上的队列保持同步。当一个节点失败时,其他节点上的队列不受影响,因为它们上面都有消息的备份。
  镜像队列使用主从模式,所有消息写入和读取均通过主节点,并异步复制到镜像节点。主节点故障时需重新选举,期间队列不可用。而仲裁队列基于Raft分布式共识算法,所有节点组成仲裁组。消息需被多数节点持久化后才确认成功,Leader故障时自动触发选举。
  相比较于传统的主从模式,避免了发生网络分区时的脑裂问题(基于Raft分布式共识算法避免)。
在这里插入图片描述
  和普通队列的区别:
在这里插入图片描述
  相比较于普通队列,仲裁队列增加了一个对于有毒消息的处理。什么是有毒消息?首先,消费者从队列中获取到了元素,队列会将该元素删除,但是消费者消费失败了,会给队列nack,并且可以设置消息重新入队。这样可能存在因为业务代码的问题,某条消息一直处理不成功的问题。仲裁队列会记录消息的重新投递次数,判断是否超过了设置的阈值,如果超过了就直接丢弃,或者放入死信队列人工处理。
  如果需要声明一个仲裁队列,只需要加入参数:

@Configuration
public class QuorumConfig {@Beanpublic Queue quorumQueue() {Map<String,Object> params = new HashMap<>();params.put("x-queue-type","quorum");return new Queue(MyConstants.QUEUE_QUORUM,true,false,false,params);}
}

  仲裁队列适用于集群环境下,队列长期存在,并且对于消息可靠性要求高,允许牺牲一部分性能(因为raft算法,消息需被多数节点持久化后才确认成功)的场景。

1.3、Stream

  在传统的队列模型中,同一条消息只能被一个消费者消费(一个队列如果有多个消费者,是工作分发的机制。消息1->消费者1,消息2->消费者2,消息3->消费者1,不能两个消费者读同一条消息。),并且消息是阅后即焚的(消费者接收到消息后,队列中的该消息就删除,如果消费者拒绝签收并且设置了重新入队,再把消息重新放入队列中),无法重复从队列中获取相同的消息。并且在当队列中积累的消息过多时,性能下降会非常明显。
  Stream队列正是解决了以上的这些问题。Stream队列的核心是用aof文件的形式存储队列,将消息以aof的方式追加到文件中。允许用户在日志的任何一个连接点开始重新读取数据。(需要用户自己记录偏移量)
  声明一个stream队列:

@Configuration
public class StreamConfig {@Beanpublic Queue streamQueue() {Map<String,Object> params = new HashMap<>();params.put("x-queue-type","stream");params.put("x-max-length-bytes", 20_000_000_000L); // 指定队列的大小params.put("x-stream-max-segment-size-bytes", 100_000_000); // 文件分片存储,每一片的大小//必须设置持久化为true,同时独占和自动删除模式为falsereturn new Queue(MyConstants.QUEUE_STREAM,true,false,false,params);}
}

  声明消费者:

	public void stremReceiver(Channel channel,String message){try {channel.basicQos(100);Consumer myconsumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("========================");String routingKey = envelope.getRoutingKey();System.out.println("routingKey >"+routingKey);String contentType = properties.getContentType();System.out.println("contentType >"+contentType);long deliveryTag = envelope.getDeliveryTag();System.out.println("deliveryTag >"+deliveryTag);System.out.println("content:"+new String(body,"UTF-8"));// (process the message components here ...)//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。//没有答复过的消息,服务器会一直不停转发。channel.basicAck(deliveryTag, false);}};Map<String,Object> consumeParam = new HashMap<>();//first: 从日志队列中第一个可消费的消息开始消费//last: 消费消息日志中最后一个消息//next: 相当于不指定offset,消费不到消息。//Offset: 一个数字型的偏//Timestamp:一个代表时间的Data类型变量,表示从这个时间点开始消费。//例如 一个小时前 Date timestamp = new Date(System.currentTimeMillis() - 60 * 60 * 1_000)consumeParam.put("x-stream-offset","last");channel.basicConsume(MyConstants.QUEUE_STREAM, false,consumeParam, myconsumer);} catch (IOException e) {e.printStackTrace();}System.out.println("quorumReceiver received message : "+ message);}

在这里插入图片描述

1.4、懒队列

  Rabbit MQ对于常规队列的处理是,将消息优先存在于内存中,在合适的时机再持久化到磁盘上,而懒队列则相反,懒队列会尽可能早的将消息内容保存到磁盘当中,并且只有在用户请求到时,才临时从磁盘加载到内存当中。懒队列的设计也是为了应对消息堆积问题的。
  声明懒队列的方式,只需要加入参数,相应的,当一个队列被声明为懒队列,那即使队列被设定为不持久化,消息依然会写入到硬盘中。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");

  懒队列适合消息量大且长期有堆积的队列,可以减少内存使用,加快消费速度。

http://www.lryc.cn/news/586657.html

相关文章:

  • 微信小程序案例 - 本地生活(首页)
  • CCS-MSPM0G3507-6-模块篇-OLED的移植
  • 什么时候需要用到 multiprocessing?
  • 深度学习图像分类数据集—猫七种表情识别分类
  • Android 响应式编程完整指南:StateFlow、SharedFlow、LiveData 详解
  • MySQL 的 `EXPLAIN` 输出中,`filtered` 属性使用
  • spring--@Autowired
  • spring-ai-alibaba 1.0.0.2 学习(十六)——多模态
  • Java_Springboot技术框架讲解部分(二)
  • Infoblox NetMRI 远程命令执行漏洞复现(CVE-2025-32813)
  • 基于 CentOS 7 的 LVS+DR+Web+NFS 旅游攻略分享平台部署
  • linux中at命令的常用用法。
  • Vue配置特性(ref、props、混入、插件与作用域样式)
  • DHS及HTTPS工作过程
  • 【Java Stream】基本用法学习
  • vue2入门(1)vue核心语法详解复习笔记
  • 算法学习笔记:18.拉斯维加斯算法 ——从原理到实战,涵盖 LeetCode 与考研 408 例题
  • 一扇门铃,万向感应——用 eventfd 实现零延迟通信
  • 14.使用GoogleNet/Inception网络进行Fashion-Mnist分类
  • 4. 观察者模式
  • Java行为型模式---观察者模式
  • Typecho分类导航栏开发指南:从基础到高级实现
  • 低代码引擎核心技术:OneCode常用动作事件速查手册及注解驱动开发详解
  • Pytorch实现感知器并实现分类动画
  • 深入理解观察者模式:构建松耦合的交互系统
  • 为什么玩游戏用UDP,看网页用TCP?
  • 【C++详解】STL-priority_queue使用与模拟实现,仿函数详解
  • 信息收集实战
  • 【读书笔记】《C++ Software Design》第九章:The Decorator Design Pattern
  • 设计模式:软件开发的高效解决方案(单例、工厂、适配器、代理)