当前位置: 首页 > news >正文

RabbitMQ—事务与消息分发

上篇文章:

RabbitMQ—TTL、死信队列、延迟队列https://blog.csdn.net/sniper_fandc/article/details/149311921?fromshare=blogdetail&sharetype=blogdetail&sharerId=149311921&sharerefer=PC&sharesource=sniper_fandc&sharefrom=from_link

目录

1 事务

2 消息分发

2.1 介绍

2.2 限流

2.3 负载均衡(非公平分发)


1 事务

        AMQP协议实现了事务机制,而RabbitMQ基于AMQP协议,因此也支持事务。RabbitMQ的事务机制要求消息的发送和接收是原子性的,要么同时成功,要么同时失败(失败后已发送的消息或接收的消息会回退)。

        这里实现了连续发送两条消息保证这两条消息同时发送成功或失败的事务机制:

        声明队列:

public class RabbitMQConnection {public static final String TRANS_QUEUE = "trans.queue";}
@Configurationpublic class RabbitMQConfig {@Bean("transQueue")public Queue transQueue(){return QueueBuilder.durable(RabbitMQConnection.TRANS_QUEUE).build();}}

        配置事务管理器和RabbitTemplate:

@Configurationpublic class RabbitTemplateConfig {//RabbitTemplate开启事务和下面的事务管理器都必须存在@Bean("transRabbitTemplate")public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);//开启事务return rabbitTemplate;}//事务管理器@Beanpublic RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "transRabbitTemplate")private RabbitTemplate transRabbitTemplate;//@Transactional也是开启事务必须存在的注解@Transactional@RequestMapping("trans")public String trans() {//两条消息要么同时发送成功,要么同时失败transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");int a = 1/0;transRabbitTemplate.convertAndSend("", RabbitMQConnection.TRANS_QUEUE, "Hello SpringBoot RabbitMQ");return "发送成功";}}

        未开启事务,发生异常,第一条消息发送成功。

        开始事务后,发生异常,两条消息均不成功。

2 消息分发

2.1 介绍

        当某个队列被多个消费者订阅,队列向消费者推送消息(消息分发对应推模式,对拉模式无效),每条消息只会发送给一个消费者。默认情况下,RabbitMQ不管消费者是否已经消费完消息并返回确认,而是采用轮询方式推送消息

        在这种情况下,假设有10条消息,两个消费者,每个消费者会被推送5条消息,消费者1消费速度快(容易空闲),消费者2消费速度慢,那么消费者2就会消息积压,系统实际吞吐量并不高。

        可以采用channel.basicQos(int prefetchCount)方式来进行消费分发的控制,prefetchCount表示通道上消费者能同时保持未确认消息的最大数量。

        队列每向消费者推送一条消息,prefetchCount+1。消费者每返回一个确认,prefetchCount-1。当prefetchCount达到设置的上限,队列就不再向消费者推送消息,直到有新的确认到来。这种方式就类似滑动窗口,很好地保证消费者负载压力不会过大。

2.2 限流

        在秒杀场景下,假设订单系统每秒能处理的订单数是10000,但是秒杀场景下可能某一瞬间会有50000订单数,这就会导致订单系统处理不过来而压垮。可以利用basicQos()来进行限流:

        1.SpringBoot配置文件用prefetch控制限流数,对应channel.basicQos(int prefetchCount)的prefetchCount。

        2.开启消息确认机制的手动确认模式manual。未手动确认的消息都视为未消费完的消费,prefetchCount并不会-1。

        配置文件:

spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量

        声明队列和交换机:

public class RabbitMQConnection {public static final String QOS_QUEUE = "qos.queue";public static final String QOS_EXCHANGE = "qos.exchange";}
@Configurationpublic class RabbitMQConfig {@Bean("qosQueue")public Queue qosQueue(){return QueueBuilder.durable(RabbitMQConnection.QOS_QUEUE).build();}@Bean("qosExchange")public DirectExchange qosExchange(){return ExchangeBuilder.directExchange(RabbitMQConnection.QOS_EXCHANGE).durable(true).build();}@Bean("qosQueueBinding")public Binding qosQueueBinding(@Qualifier("qosExchange") DirectExchange directExchange, @Qualifier("qosQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("qos");}}

        生产者代码:

@RestController@RequestMapping("/producer")public class ProducerController {@Resource(name = "rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("qos")public String qos() {for (int i = 0; i < 20; i++) {rabbitTemplate.convertAndSend(RabbitMQConnection.QOS_EXCHANGE, "qos", "Hello SpringBoot RabbitMQ");}return "发送成功";}}

        消费者代码:

@Componentpublic class QosListener {@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);System.out.println("消息处理完成");channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}}

        当进行限流时,如果不进行确认,则消费者最多只有5条消息:

        如果进行确认,则消息很快就被消费完了:

2.3 负载均衡(非公平分发)

        负载均衡就是指在分布式环境下,让每个服务接收其能同时处理上限的任务数,这样每个服务都不会空闲下来(最大化利用硬件资源),也不会因为负载过大导致崩溃。

        对应prefetchCount就应该配置为机器所能处理的最大上限数。假设消费者只能一次处理一个消息,此时prefetch就配置为1:

spring:rabbitmq:addresses: amqp://admin:admin@192.168.217.150:5672/testVirtuallistener:simple:acknowledge-mode: manual #消息接收确认(MQ-消费者):none(自动确认)、auto(正常自动确认,异常不确认)、manual(手动确认)prefetch: 1 #控制消费者从队列中预取(prefetch)消息的数量

        其它代码不变,增加一个消费者来模拟不同处理速度的服务:

@Componentpublic class QosListener {@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);Thread.sleep(5);channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}@RabbitListener(queues = RabbitMQConnection.QOS_QUEUE)public void queueListener2(Message message, Channel channel) throws IOException {long deliveryTag = message.getMessageProperties().getDeliveryTag();try {System.out.printf("listener2 ["+RabbitMQConnection.QOS_QUEUE+"]收到消息:%s, deliveryTag:%d \n",new String(message.getBody(),"UTF-8"),deliveryTag);Thread.sleep(10);channel.basicAck(deliveryTag,false);} catch (Exception e) {channel.basicNack(deliveryTag,false,true);}}}

        最终消费者2处理了8条消息,消费者1处理了12条消息。这样就不会出现某些消费者大量时间空闲,整个系统的吞吐量就会得到很大提升。

        注意:deliveryTag有重复是因为两个消费者占用不同的通道,deliveryTag在同一个通道里保持连续,通道与通道之间相互独立,因此出现这样的现象。

下篇文章:

http://www.lryc.cn/news/593009.html

相关文章:

  • espidf启用vTaskList方法
  • 使用MATLAB探索圆周率π的奇妙计算之旅
  • day25 力扣90.子集II 力扣46.全排列 力扣47.全排列 II
  • bws-rs:Rust 编写的 S3 协议网关框架,支持灵活后端接入
  • VBA 运用LISTBOX插件,选择多个选项,并将选中的选项回车录入当前选中的单元格
  • 关于NUC+雷达+倍福组网交换机是否完全足够的问题(是否需要一个路由器)
  • 软考 系统架构设计师系列知识点之杂项集萃(113)
  • WPF为启动界面(Splash Screen)添加背景音乐
  • 【NLP舆情分析】基于python微博舆情分析可视化系统(flask+pandas+echarts) 视频教程 - snowNLP库实现中文情感分析
  • 标准文件和系统文件I/O
  • 车载刷写框架 --- 关于私有节点刷写失败未报引起的反思
  • 《命令行参数与环境变量:从使用到原理的全方位解析》
  • 移除debian升级后没用的垃圾
  • laravel RedisException: Connection refused优雅草PMS项目管理系统报错解决-以及Redis 详细指南-优雅草卓伊凡
  • 2025第15届上海国际生物发酵展:聚焦合成生物与绿色制造,共启生物经济新时代
  • Rust Web 全栈开发(十):编写服务器端 Web 应用
  • 医疗AI与融合数据库的整合:挑战、架构与未来展望(下)
  • 【C# in .NET】19. 探秘抽象类:具体实现与抽象契约的桥梁
  • xss的利用
  • CS231n-2017 Lecture2图像分类笔记
  • Kafka深度解析:架构、原理与应用实践
  • [论文阅读] 人工智能 + 软件工程 | 强化学习在软件工程中的全景扫描:从应用到未来
  • windows docker-02-docker 最常用的命令汇总
  • GEO营销:AI时代的搜索优化新赛道——从DeepSeek爆火看生成式引擎优化的崛起
  • Elasticsearch 重命名索引
  • LVS 集群技术实践:NAT 与 DR 模式的配置与对比
  • 牛客-倒置字符串
  • Go语言中的类型转换与类型推断解析
  • 用 Numpy 手动实现矩阵卷积运算
  • 我们使用 Blender 和 Godot 的工作流程