当前位置: 首页 > news >正文

19.延迟队列优化

问题

前面所讲的延迟队列有一个不足之处,比如现在有一个需求需要延迟半个小时的消息,那么就只有添加一个新的队列。那就意味着,每新增一个不同时间需求,就会新创建一个队列。

解决方案

应该讲消息的时间不要跟队列绑定,应该交给消息的生产者,由发送消息来指定延迟时间。这样就可以定义个通用的队列。

方案图

代码

配置类

package com.xkj.org.config;import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** TTL队列,配置文件代码*/
@Configuration
public class TtlQueueConfig {//普通交换机public static final String X_EXCHANGE = "X";//死信交换机public static final String Y_HEAD_LETTER_EXCHANGE = "Y";//普通队列public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";public static final String QUEUE_C = "QC";//死信队列public static final String DEAD_LETTER_QUEUE = "QD";/*** 声明普通交换机X,bean的别名xExchange* @return*/@Bean("xExchange")public DirectExchange xExchange() {return new DirectExchange(X_EXCHANGE);}/*** 声明死信交换机Y,bean的别名yExchange* @return*/@Bean("yExchange")public DirectExchange yExchange() {return new DirectExchange(Y_HEAD_LETTER_EXCHANGE);}/*** 声明普通队列QA* @return*/@Bean("queueA")public Queue queueA() {Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//声明死信的routingeyarguments.put("x-dead-letter-routing-key", "YD");//设置消息过期时间ttl为10sarguments.put("x-message-ttl", 10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}/*** 声明普通队列QB* @return*/@Bean("queueB")public Queue queueB() {Map<String, Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//声明死信的routingKeyarguments.put("x-dead-letter-routing-key", "YD");//设置消息过期时间ttl为40sarguments.put("x-message-ttl", 40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}/*** 声明普通队列QC* @return*/@Bean("queueC")public Queue queueC() {Map<String, Object> arguments = new HashMap<>(2);//设置死信交换机arguments.put("x-dead-letter-exchange", Y_HEAD_LETTER_EXCHANGE);//设置死信的routingKeyarguments.put("x-dead-letter-routing-key", "YD");//这里不要设置TTL时间return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();}/*** 声明死信队列QD* @return*/@Bean("queueD")public Queue queueD() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}/*** 将队列QA绑定到交换机X上,指定routingKey为XA* @param queueA* @param xExchange* @return*/@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA, @Qualifier("xExchange")DirectExchange xExchange) {return BindingBuilder.bind(queueA).to(xExchange).with("XA");}/*** 将队列QB绑定到交换机X上,指定routingKey为XB* @param queueB* @param xExchange* @return*/@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueB).to(xExchange).with("XB");}/*** 将队列QC绑定到交换机X上,指定routingKey为XC* @param queueC* @param xExchange* @return*/@Beanpublic Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) {return BindingBuilder.bind(queueC).to(xExchange).with("XC");}/*** 将队列QD绑定到交换机Y上,指定routingKey为YD* @param queueD* @param yExchange* @return*/@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) {return BindingBuilder.bind(queueD).to(yExchange).with("YD");}}

生产者

package com.xkj.org.controller;import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;/*** 消息生产者*/
@Slf4j
@RestController
@RequestMapping("/ttl")
@Api(tags = "消息生产者", description = "消息生产者控制器")
public class MessageController {@Autowiredprivate RabbitTemplate rabbitTemplate;@ApiOperation("消息发送测试")@GetMapping("/sendMsg/{msg}")public void sendMsg(@ApiParam(value = "发送的消息内容", required = true) @PathVariable("msg") String message) {log.info("当前时间{},发送一条消息给两个队列:{}", new Date().toString(), message);rabbitTemplate.convertAndSend("X", "XA", "ttl=10s的消息:" + message);rabbitTemplate.convertAndSend("X", "XB", "ttl=40s的消息:" + message);}@ApiOperation("发送带过期时间的消息")@GetMapping("/sendExpiredMsg/{msg}/{ttl}")public void sendMsgExpired(@ApiParam(value = "消息内容", required = true)@PathVariable("msg") String message,@ApiParam(value = "ttl时间", required = true)@PathVariable("ttl") String ttlTime) {log.info("当前时间{},发送一条消息给队列QC:{},ttl={}", new Date().toString(), message, ttlTime);rabbitTemplate.convertAndSend("X", "XC", message, msg -> {msg.getMessageProperties().setExpiration(ttlTime);return msg;});}}

消费者

package com.xkj.org.listener;import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Date;/*** 消费者*/
@Slf4j
@Componentpublic class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")public void receiveD(Message message, Channel channel) throws Exception {String msg = new String(message.getBody(), "UTF-8");log.info("当前时间:{},收到死信队列的消息:{}", new Date().toString(), msg);}
}

问题

改造代码后,发送两条消息,一条ttl为2s,另一条ttl为10s。ttl为2s的消息也要等到10s后才会收到。 

原因

因为队列是先进先出的,消息需要排队,第一条消息10s才会发出去,第二条消息2s发出去,但是由于10s的消息没有发出去,2s的消息就只有等待。

解决方案

使用rabbitmq插件来解决,请看后续博文。

http://www.lryc.cn/news/410960.html

相关文章:

  • P10477 Subway tree systems 题解,c++ 树相关题目
  • 18.jdk源码阅读之CopyOnWriteArrayList
  • 美股:AMD展现乐观前景,挑战AI加速器市场霸主
  • 如何提高计算机视觉技术在复杂环境和低光照条件下的物体识别准确率?
  • ubuntu cmake使用自己版本的qt
  • Python基础知识笔记---保留字
  • Python面试整理-Web开发
  • 民大食堂用餐小程序的设计
  • Linux系统编程(4):消息队列
  • 【初阶数据结构篇】单链表的实现(赋源码)
  • LeetCode 2844.生成特殊数字的最少操作(哈希表 + 贪心)
  • 昇思MindSpore 应用学习-基于 MindSpore 实现 BERT 对话情绪识别
  • 【初阶数据结构篇】顺序表和链表算法题
  • 使用weex进行APP混合开发
  • C++stl大根堆/小根堆的创建与记忆
  • visual studio性能探测器使用案列
  • redis的代码开发
  • 嗷呜,就问你接不接?
  • 避免过拟合,参数大模型强,正则让模型不要走偏
  • vue+element-ui的列表查询条件/筛选条件太多以下拉选择方式动态添加条件(支持全选、反选、清空)
  • LLM的训练与推断
  • uniapp使用WebSocket uniapp使用WebSocket Uniapp整合WebSocket uniapp使用 websocket
  • SSH Exporter:基于Prometheus的远程系统性能监控神器
  • Docker基础概念
  • 小白进阶为大神
  • 2024最新Python和PyCharm的安装教程
  • 数据库死锁:深入解析与应对策略
  • Python入门宝藏《看漫画学Python》,495页漫画带你弄清python知识点!简单易懂 | 附PDF全彩版
  • Webshell管理工具:AntSword(中国蚁剑)
  • Java 中的File类