当前位置: 首页 > news >正文

RabbitMQ实现多线程处理接收消息

前言:在使用@RabbitListener注解来指定消费方法的时候,默认情况是单线程去监听队列,但是这个如果在高并发的场景中会出现很多个任务,但是每次只消费一个消息,就会很缓慢。单线程处理消息容易引起消息处理缓慢,消息堆积,不能最大利用硬件资源,这个就很伤。

处理办法:可以添加配置类,设置RabbitMQ的容器工厂参数,增加并发处理数量即可实现多线程处理监听队列,实现多线程处理消息。

一、编写配置类

package com.quick.config;import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;/*** RabbitMQ配置类*/
@Configuration
/*@ConditionalOnClass(RabbitTemplate.class) //有RabbitTemplate依赖才会生效,否则不生效*/
public class MqConfig {// 定义线程数、最大线程数常量private static final int INITIAL_CONCURRENT_CONSUMERS = 10;private static final int MAX_CONCURRENT_CONSUMERS = 10;/*** 将多线程配置配置注入容器工厂*/@Bean("customContainerFactory")public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConcurrentConsumers(INITIAL_CONCURRENT_CONSUMERS); //设置线程数factory.setMaxConcurrentConsumers(MAX_CONCURRENT_CONSUMERS); //最大线程数configurer.configure(factory, connectionFactory);return factory;}}

  • setConcurrentConsumers(int concurrentConsumers): 这个方法设置了容器应该同时启动的监听器(消费者)线程的数量。这些线程会并发地从RabbitMQ队列中拉取并处理消息。这个值决定了系统初始时能够并行处理消息的能力。

  • setMaxConcurrentConsumers(int maxConcurrentConsumers): 这个方法设置了容器在需要时可以增加到的最大并发消费者数量。这通常用于处理负载高峰,当队列中的消息积压时,可以动态地增加并发消费者数量以提高处理速度。然而,请注意,这并不意味着系统会立即创建所有最大数量的线程,而是会根据需要逐渐增加到这个上限。

这个容器负责监听 RabbitMQ 的队列,并将接收到的消息分发给相应的处理器(即 @RabbitListener 注解标记的方法)

二、修改监听者

在接收消息方里面的@RabbitListener注解中添加配置

@RabbitListener(queues = {"监听队列名"},containerFactory = "customContainerFactory")


/*** 接收消息*/
@Component
public class StoreListener {@Resourceprivate IStoreService storeService;@Resourceprivate StoreMapper storeMapper;/*** 更新店铺收藏人数,实现收藏人数+1* @param storeId 店铺id*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.addFavorite.success.queue", durable = "true"), // 队列 起名规则(服务名+业务名+成功+队列),durable持久化exchange = @Exchange(name = "addFavorite.direct"), // 交换机名称,交换机默认类型就行direct,所以不用配置directkey = "addFavorite.success" // 绑定的key),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void listenAddFavoriteCountsSuccess(Long storeId){storeService.updateStoreFavoriteUsersCountAdd1(storeId);}/*** 根据传过来的店铺实体类修改店铺信息* @param store 店铺实体类*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "store.updateStore.success.queue", durable = "true"),exchange = @Exchange(name = "updateStore.direct"),key = "updateStore.success"),// 在@RabbitListener注解中指定容器工厂containerFactory = "customContainerFactory")public void updateStoreByEntity(Store store){storeMapper.updateById(store);}}

多线程的好处:

1、提高吞吐量:通过并行处理消息,系统可以更快地处理大量消息,从而提高整体吞吐量。

2、更好的资源利用率:在多核处理器上,多线程可以更好地利用硬件资源,减少处理延迟。

http://www.lryc.cn/news/424123.html

相关文章:

  • AI智能网关 边缘计算 视觉AI
  • Java基础之原反补码
  • Unity如何使用Spine动画导出的动画
  • 变量位操作
  • 内网渗透—横向移动RDPWinRMWinRSSPN扫描Kerberos攻击
  • Python套接字综合应用(UDP篇)
  • 服务器安装哪吒面板详细教程
  • LLM微调(精讲)-以高考选择题生成模型为例(DataWhale AI夏令营)
  • 安全基础学习-RC4加密算法
  • 雨云宁波电信大带宽服务器测评(非广告)
  • 2024年,最新前端趋势
  • Linux静态进程和动态进程查看管理
  • CPU飙升 怎么定位问题
  • The Sandbox 游戏制作教程第 4 章|使用装备制作游戏,触发独特互动
  • JS 和 JSX、TS 和 TSX 的区别
  • 25款极氪007上市,小米SU7就不该买?
  • 旋转字符串 | LeetCode-796 | 模拟 | KMP | 字符串匹配
  • 网络安全测试工具Burp Suite基本使用
  • 使用pytest+selenium编写网页UI自动化脚本和用例
  • 新能源遇“秋老虎”,8月第二周销量集体下滑,问界惨遭腰斩
  • SEO模板网站的wordpress主题最适合google外贸SEO
  • fetch跨域请求数据的前端设置和后端php的header设置
  • Ted靶机
  • HarmonyOS ArkTS 构建布局
  • yolov5详解(二):通过yaml文件构建完整模型
  • 8月8日学习笔记 python基础
  • 电动自行车出海黑马Avento独立站拆解(上)丨出海笔记
  • Gerrit 使用教程
  • sudu提权命令账号安全控制(su命令)执行单个命令并返回原用户、执行多个命令并返回原用户、保持当前环境变量、配置文件/etc/sudoers
  • 【线性代数】【二】2.7 矩阵的秩