当前位置: 首页 > news >正文

RabbitMQ解决消息积压的方法

目录

减少发送mq的消息体内容

增加消费者数量

批量消费消息

临时队列转移

监控和预警机制

分阶段实施

最后还有一个方法就是开启队列的懒加载


这篇文章总结一下自己知道的解决消息积压得方法。

减少发送mq的消息体内容

像我们没有必要知道一个的中间状态,只需知道一个最终状态就可以了。
发送的消息体只用包含:id和状态等关键信息,不用发送一个完整的对象内容。
消费者收到消息之后,通过id调用原服务再将完整的消息对象内容查询出来即可,最后再进行消费处理。

增加消费者数量

采用动态增加消费者的数量

@Configuration
public class RabbitMQConfig {@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 设置并发消费者数量factory.setConcurrentConsumers(5);        // 初始消费者数量factory.setMaxConcurrentConsumers(20);    // 最大消费者数量// 动态调整消费者数量factory.setConsumerTagStrategy(queue -> "consumer-" + UUID.randomUUID());return factory;}
}
@Service
public class ConsumerManagerService {@Autowiredprivate RabbitListenerEndpointRegistry registry;public void adjustConsumerCount(String queueName, int count) {MessageListenerContainer container = registry.getListenerContainer(queueName);if (container instanceof SimpleMessageListenerContainer) {SimpleMessageListenerContainer simpleContainer = (SimpleMessageListenerContainer) container;simpleContainer.setConcurrentConsumers(count);}}
}

批量消费消息

@Service
public class BatchMessageConsumer {@RabbitListener(queues = "myQueue", containerFactory = "batchFactory")public void processBatch(List<Message> messages, Channel channel) {try {// 批量处理消息List<MessageDTO> dtos = messages.stream().map(this::convertToDTO).collect(Collectors.toList());// 批量保存到数据库batchSaveToDatabase(dtos);// 获取最后一条消息的deliveryTaglong lastDeliveryTag = messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag();// 批量确认channel.basicAck(lastDeliveryTag, true);} catch (Exception e) {// 批量拒绝handleBatchError(messages, channel);}}
}// 配置批量消费
@Configuration
public class BatchConsumerConfig {@Beanpublic SimpleRabbitListenerContainerFactory batchFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);// 启用批量消费factory.setBatchListener(true);// 批量大小factory.setBatchSize(100);// 批量超时时间factory.setReceiveTimeout(1000L);return factory;}
}

临时队列转移

@Service
public class MessageTransferService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void transferMessages(String sourceQueue, String tempQueue, int batchSize) {while (true) {// 从源队列批量获取消息List<Message> messages = new ArrayList<>();for (int i = 0; i < batchSize; i++) {Message message = rabbitTemplate.receive(sourceQueue);if (message == null) break;messages.add(message);}if (messages.isEmpty()) break;// 转移到临时队列messages.forEach(msg -> rabbitTemplate.send(tempQueue, msg));}}
}// 临时队列的消费者
@Component
public class TempQueueConsumer {@RabbitListener(queues = "#{tempQueue.name}")public void processMessage(Message message) {// 使用更高效的处理方式fastProcessMessage(message);}@Beanpublic Queue tempQueue() {return new Queue("temp-queue-" + UUID.randomUUID(), false, false, true);}
}

监控和预警机制

@Service
@Slf4j
public class QueueMonitorService {@Autowiredprivate RabbitTemplate rabbitTemplate;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorQueueSize() {String queueName = "myQueue";// 获取队列信息Properties properties = rabbitTemplate.execute(channel -> channel.queueDeclarePassive(queueName));// 获取消息数量int messageCount = properties.getMessageCount();// 检查消息堆积if (messageCount > threshold) {// 发送告警sendAlert(queueName, messageCount);// 动态调整消费者adjustConsumers(queueName, messageCount);}}private void adjustConsumers(String queueName, int messageCount) {// 根据消息数量动态调整消费者数量int newConsumerCount = calculateConsumerCount(messageCount);consumerManagerService.adjustConsumerCount(queueName, newConsumerCount);}
}

分阶段实施

@Service
public class MessageHandlingStrategy {public void handleMessageBacklog() {// 1. 首先增加消费者数量adjustConsumerCount();// 2. 如果仍然堆积,启用批量处理if (isStillBacklogged()) {enableBatchProcessing();}// 3. 如果问题持续,使用临时队列if (isEmergency()) {transferToTemporaryQueue();}}
}

最后还有一个方法就是开启队列的懒加载

http://www.lryc.cn/news/519352.html

相关文章:

  • Android 网络层相关介绍
  • 2025年第三届“华数杯”国际赛B题解题思路与代码(Matlab版)
  • 小米路由器IPv6 功能使用指南
  • k8s dashboard离线部署步骤
  • Wireshark抓包教程(2024最新版个人笔记)
  • 稀疏矩阵:BM25;稠密矩阵:RoBERTa - wwm - ext顺序
  • C# 结构体(Struct)
  • Homestyler 和 Tripo AI 如何利用人工智能驱动的 3D 建模改变定制室内设计
  • Python的pandas库基础知识(超详细教学)
  • 【数据库】一、数据库系统概述
  • 大数据智能选课系统
  • esp32开发笔记之一:esp32开发环境搭建vscode+ubuntu
  • 赛灵思(Xilinx)公司Artix-7系列FPGA
  • Trie树算法
  • NLTK分词以及处理方法
  • vue3树形组件+封装+应用
  • kotlin项目无法访问Java类的问题
  • 计算机网络 (30)多协议标签交换MPLS
  • qt-C++笔记之自定义继承类初始化时涉及到parents的初始化
  • 人才选拔中,如何优化面试流程
  • 2501wtl,皮肤技术
  • 【面试题】技术场景 6、Java 生产环境 bug 排查
  • word论文排版常见问题汇总
  • 传奇3仿韩服单机版安装教程+GM管理面板
  • 第26章 汇编语言--- 内核态与用户态
  • Spring bean的生命周期和扩展
  • 计算机网络 (33)传输控制协议TCP概述
  • Python3 JSON
  • Leetcode 698 Partition to K Equal Sum Subsets
  • 可靠的人形探测,未完待续(III)