当前位置: 首页 > news >正文

springBoot对接多个mq并且实现延迟队列---未完待续

mq调用流程

创建消息转换器

package com.wd.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqMessageConvertConfig {/*** 公共的消息转换器** @return MessageConverter*/@Beanpublic MessageConverter messageConverter() {return new Jackson2JsonMessageConverter();}}

创建exchange交换机:普通交换机、延迟交换机、死信交换机

package com.wd.config;import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqExchangeConfig {public static final int DELAY_TIME = 20 * 1000;/*** 普通交换机名称*/public static final String EXCHANGE_NAME = "wd_exchange";/*** 延迟交换机名称*/public static final String DELAY_EXCHANGE_NAME = "wd_delay_exchange";/*** 死信交换机*/public static final String DEAD_EXCHANGE_NAME = "wd_dead_exchange";@Beanpublic DirectExchange exchange() {return new DirectExchange(EXCHANGE_NAME, true, false);}@Beanpublic DirectExchange delayExchange() {return new DirectExchange(DELAY_EXCHANGE_NAME, true, false);}@Beanpublic DirectExchange deadExchange() {return new DirectExchange(DEAD_EXCHANGE_NAME, true, false);}
}

创建master的connection

package com.wd.config.master;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqMasterConnectionConfig {@Value("${rabbitmq.master.vhost}")private String vhost;@Value("${rabbitmq.master.addresses}")private String addresses;@Value("${rabbitmq.master.username}")private String username;@Value("${rabbitmq.master.password}")private String password;@Beanpublic ConnectionFactory masterConnectionFactory() {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setAddresses(addresses);cachingConnectionFactory.setVirtualHost(vhost);cachingConnectionFactory.setUsername(username);cachingConnectionFactory.setPassword(password);return cachingConnectionFactory;}@Beanpublic RabbitTemplate rabbitTemplate(@Qualifier("masterConnectionFactory") ConnectionFactory masterConnectionFactory,MessageConverter messageConverter){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(masterConnectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}}

创建slave的connection

package com.wd.config.slave;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqSlaveConnectionConfig {@Value("${rabbitmq.slave.vhost}")private String vhost;@Value("${rabbitmq.slave.addresses}")private String addresses;@Value("${rabbitmq.slave.username}")private String username;@Value("${rabbitmq.slave.password}")private String password;@Beanpublic ConnectionFactory slaveConnectionFactory() {CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();cachingConnectionFactory.setAddresses(addresses);cachingConnectionFactory.setVirtualHost(vhost);cachingConnectionFactory.setUsername(username);cachingConnectionFactory.setPassword(password);return cachingConnectionFactory;}@Beanpublic RabbitTemplate slaveRabbitTemplate(@Qualifier("slaveConnectionFactory") ConnectionFactory slaveConnectionFactory,MessageConverter messageConverter){RabbitTemplate rabbitTemplate = new RabbitTemplate();rabbitTemplate.setConnectionFactory(slaveConnectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setMessageConverter(messageConverter);return rabbitTemplate;}}

创建队列A:  分为普通队列、延迟队列、死信队列

package com.wd.config.queue;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;@Configuration
public class QueueAConfig {private static final String QUEUE_A_NAME = "wd_queue_a";private static final String DELAY_QUEUE_A_NAME = "wd_delay_queue_a";private static final String DEAD_QUEUE_A_NAME = "wd_dead_queue_a";private static final String QUEUE_A_ROUTING_KEY = "queue_A_routing_key";private static final String DELAY_QUEUE_A_ROUTING_KEY = "delay_queue_a_routing_key";private static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead_letter_queue_A_routing_key";@Beanpublic Queue queueA() {return new Queue(QUEUE_A_NAME, true);}@Beanpublic Binding queueABinding(@Qualifier("queueA") Queue queueA,@Qualifier("exchange") DirectExchange exchange) {return BindingBuilder.bind(queueA).to(exchange).with(QUEUE_A_ROUTING_KEY);}@Beanpublic Queue delayQueueA() {Map<String, Object> args = new HashMap<>();//设置延迟队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//设置延迟队列绑定的死信路由键args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);//设置延迟队列的 TTL 消息存活时间args.put("x-message-ttl", DELAY_TIME);return new Queue(DELAY_QUEUE_A_NAME, true, false, false, args);}@Beanpublic Binding delayQueueABinding(@Qualifier("delayQueueA") Queue delayQueueA,@Qualifier("delayExchange") DirectExchange delayExchange) {return BindingBuilder.bind(delayQueueA).to(delayExchange).with(DELAY_QUEUE_A_ROUTING_KEY);}@Beanpublic Queue deadQueueA() {return new Queue(DEAD_QUEUE_A_NAME, true);}@Beanpublic Binding deadQueueABinding(@Qualifier("deadQueueA") Queue deadQueueA,@Qualifier("deadExchange") DirectExchange deadExchange) {return BindingBuilder.bind(deadQueueA).to(deadExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);}
}

创建队列B: 分为普通队列、延迟队列、死信队列

package com.wd.config.queue;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;@Configuration
public class QueueBConfig {private static final String QUEUE_B_NAME = "wd_queue_b";private static final String DELAY_QUEUE_B_NAME = "wd_delay_queue_b";private static final String DEAD_QUEUE_B_NAME = "wd_dead_queue_b";private static final String QUEUE_B_ROUTING_KEY = "queue_b_routing_key";private static final String DELAY_QUEUE_B_ROUTING_KEY = "delay_queue_b_routing_key";private static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead_letter_queue_b_routing_key";@Beanpublic Queue queueB() {return new Queue(QUEUE_B_NAME, true);}@Beanpublic Binding queueBBinding(@Qualifier("queueB") Queue queueB,@Qualifier("exchange") DirectExchange exchange) {return BindingBuilder.bind(queueB).to(exchange).with(QUEUE_B_ROUTING_KEY);}@Beanpublic Queue delayQueueB() {Map<String, Object> args = new HashMap<>();//设置延迟队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//设置延迟队列绑定的死信路由键args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);//设置延迟队列的 TTL 消息存活时间args.put("x-message-ttl", DELAY_TIME);return new Queue(DELAY_QUEUE_B_NAME, true, false, false, args);}@Beanpublic Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue delayQueueB,@Qualifier("delayExchange") DirectExchange delayExchange) {return BindingBuilder.bind(delayQueueB).to(delayExchange).with(DELAY_QUEUE_B_ROUTING_KEY);}@Beanpublic Queue deadQueueB() {return new Queue(DEAD_QUEUE_B_NAME, true);}@Beanpublic Binding deadQueueABinding(@Qualifier("deadQueueB") Queue deadQueueB,@Qualifier("deadExchange") DirectExchange deadExchange) {return BindingBuilder.bind(deadQueueB).to(deadExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);}}

创建队列C: 分为普通队列、延迟队列、死信队列

package com.wd.config.queue;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;@Configuration
public class QueueCConfig {private static final String QUEUE_C_NAME = "wd_queue_c";private static final String DELAY_QUEUE_C_NAME = "wd_delay_queue_c";private static final String DEAD_QUEUE_C_NAME = "wd_dead_queue_c";private static final String QUEUE_C_ROUTING_KEY = "queue_c_routing_key";private static final String DELAY_QUEUE_C_ROUTING_KEY = "delay_queue_c_routing_key";private static final String DEAD_LETTER_QUEUE_C_ROUTING_KEY = "dead_letter_queue_c_routing_key";@Beanpublic Queue queueC() {return new Queue(QUEUE_C_NAME, true);}@Beanpublic Binding queueCBinding(@Qualifier("queueC") Queue queueC,@Qualifier("exchange") DirectExchange exchange) {return BindingBuilder.bind(queueC).to(exchange).with(QUEUE_C_ROUTING_KEY);}@Beanpublic Queue delayQueueC() {Map<String, Object> args = new HashMap<>();//设置延迟队列绑定的死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);//设置延迟队列绑定的死信路由键args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_C_ROUTING_KEY);//设置延迟队列的 TTL 消息存活时间args.put("x-message-ttl", DELAY_TIME);return new Queue(DELAY_QUEUE_C_NAME, true, false, false, args);}@Beanpublic Binding delayQueueCBinding(@Qualifier("delayQueueC") Queue delayQueueC,@Qualifier("delayExchange") DirectExchange delayExchange) {return BindingBuilder.bind(delayQueueC).to(delayExchange).with(DELAY_QUEUE_C_ROUTING_KEY);}@Beanpublic Queue deadQueueC() {return new Queue(DEAD_QUEUE_C_NAME, true);}@Beanpublic Binding deadQueueCBinding(@Qualifier("deadQueueC") Queue deadQueueC,@Qualifier("deadExchange") DirectExchange deadExchange) {return BindingBuilder.bind(deadQueueC).to(deadExchange).with(DEAD_LETTER_QUEUE_C_ROUTING_KEY);}}

创建master的消息监听RabbitListenerContainerFactory

后续使用注解 @RabbitListener 时指定ListenerContainerFactory

@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "masterListenerContainerFactory")
package com.wd.config.master;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqMasterListenerConfig {@Beanpublic SimpleRabbitListenerContainerFactory masterListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory,MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置消息转换器factory.setMessageConverter(messageConverter);// 关闭自动ACKfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, masterConnectionFactory);return factory;}
}

创建slave的消息监听RabbitListenerContainerFactory

后续使用注解 @RabbitListener 时指定ListenerContainerFactory

@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "slaveListenerContainerFactory")
package com.wd.config.slave;import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitmqSlaveListenerConfig {@Beanpublic SimpleRabbitListenerContainerFactory slaveListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory,MessageConverter messageConverter) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();// 设置消息转换器factory.setMessageConverter(messageConverter);// 关闭自动ACKfactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);configurer.configure(factory, slaveConnectionFactory);return factory;}
}

http://www.lryc.cn/news/170516.html

相关文章:

  • Pytorch从零开始实战04
  • 北大C++课后记录:文件读写的I/O流
  • 详解Linux的grep命令
  • spark6. 如何设置spark 日志
  • glibc: strlcpy
  • 如何在 Buildroot 中配置 Samba
  • SSM02
  • day3_QT
  • js-map方法中调用服务器接口
  • docker 已经配置了国内镜像源,但是拉取镜像速度还是很慢(gcr.io、quay.io、ghcr.io)
  • [linux(静态文件服务)] 部署vue发布后的dist网页到nginx
  • 智华计算机终端保护检查系统使用笔记
  • 前端面试话术集锦第 15 篇:高频考点(React常考进阶知识点)
  • 汽车电子——产品标准规范汇总和梳理(适应可靠性)
  • 计算机是如何工作的(上篇)
  • 数学建模| 优化入门+多目标规划
  • SSM整合Thymeleaf时,抽取公共页面并向其传递参数
  • 接口测试 —— requests 的基本了解
  • 2023年华为杯数学建模研赛D题思路解析+代码+论文
  • AB试验(三)一次试验的规范流程
  • ROI tracking by using OpenCV
  • (leetcode)二叉树最大深度
  • 【golang】调度系列之P
  • Vue3中watch用法
  • 组里来了一个实习生,一行代码引发了一个惨案
  • 随手笔记(四十五)——idea git冲突
  • chacha20 算法流程
  • 准备篇(三)Python 爬虫第三方库
  • 从零开始的PICO开发教程(4)-- VR世界 射线传送、旋转和移动
  • 防止攥改之水印功能组件