当前位置: 首页 > news >正文

SpringBoot集成多个rabbitmq

1、pom文件

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.4.9</version>
</dependency>

2、rabbitmq的连接配置文件

spring:rabbitmq:mq1:host: xxx.xxx.xxx.xxxport: 5672username: xxxxpassword: xxxxxenable: truemq2:host: xxx.xxx.xxx.xxxport: 5672username: xxxxxpassword: xxxxxenable: true

3、mq1的相关代码  MQ1RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;@Data
@Component("mq1RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq1") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq1.enable", havingValue = "true") //是否启用
public class MQ1RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack1 returnCallBack1;@Autowiredprivate ConfirmCallBack1 confirmCallBack1;@Bean(name = "mq1ConnectionFactory")//命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此@Primarypublic ConnectionFactory createConnectionFactory() {//消息队列1的连接CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq1RabbitTemplate")//命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此@Primarypublic RabbitTemplate brainRabbitTemplate(@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {//消息生产RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack1);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack1);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory1")@Primarypublic SimpleRabbitListenerContainerFactory firstFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq1ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}@Bean(name = "subQueue01")public Queue firstQueue() {return new Queue("subQueue01");}@Bean(name = "subQueue02")public Queue secondQueue() {return new Queue("subQueue02");}@Bean(name = "subQueue03")public Queue thirdQueue() {return new Queue("subQueue03", true);}@Bean(name = "subQueue04")public Queue fourQueue() {return new Queue("subQueue04", true);}@Bean(name = "topicExchangeOne")public TopicExchange topicExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new TopicExchange("topicExchangeOne");}@Bean(name = "binding1")public Binding binding1(@Qualifier("subQueue01") Queue queue, TopicExchange exchange) {//绑定队列1到TopicExchange  routingKey是队列1的队列名return BindingBuilder.bind(queue).to(exchange).with("subQueue01");}@Bean(name = "fanoutExchangeOne")public FanoutExchange fanoutExchange() {
//        Direct exchange(直连交换机)
//        Fanout exchange(扇型交换机)
//        Topic exchange(主题交换机)
//        Headers exchange(头交换机)
//        Dead Letter Exchange(死信交换机)return new FanoutExchange("fanoutExchangeOne");}@Bean(name = "binding3")public Binding binding3(@Qualifier("subQueue03") Queue queue, FanoutExchange exchange) {//绑定队列3到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}@Bean(name = "binding4")public Binding binding4(@Qualifier("subQueue04") Queue queue, FanoutExchange exchange) {//绑定队列4到fanoutExchange  队列3和队列4都能消费fanoutExchange的消息return BindingBuilder.bind(queue).to(exchange);}}

ConfirmCallBack1 .java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack1 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack1消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack1消息发送交换机成功");}}
}
ReturnCallBack1.java
package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack1 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack1消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

4、mq2的相关代码

  MQ2RabbitConfiguration.java

package com.pojo.config;import lombok.Data;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Data
@Component("mq2RabbitmqConfig")
@ConfigurationProperties(prefix = "spring.rabbitmq.mq2") //读取mq1的配置信息
@ConditionalOnProperty(name = "spring.rabbitmq.mq2.enable", havingValue = "true") //是否启用
public class MQ2RabbitConfiguration {private String host;private Integer port;private String username;private String password;@Autowiredprivate ReturnCallBack2 returnCallBack2;@Autowiredprivate ConfirmCallBack2 confirmCallBack2;@Bean(name = "mq2ConnectionFactory")   //命名mq1的ConnectionFactory,如果项目中只有一个mq则不必如此public ConnectionFactory createConnectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setHost(host);connectionFactory.setPort(port);connectionFactory.setUsername(username);connectionFactory.setPassword(password);//开启发送到交换机和队列的回调connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);return connectionFactory;}@Bean(name = "mq2RabbitTemplate") //命名mq1的RabbitTemplate,如果项目中只有一个mq则不必如此public RabbitTemplate brainRabbitTemplate(@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//发送消息时设置强制标志,仅当提供了returnCallback时才适用rabbitTemplate.setMandatory(true);//确保消息是否发送到交换机,成功与失败都会触发rabbitTemplate.setConfirmCallback(confirmCallBack2);//确保消息是否发送到队列,成功发送不触发,失败触发rabbitTemplate.setReturnsCallback(returnCallBack2);return rabbitTemplate;}@Bean(name = "simpleRabbitListenerContainerFactory2")public SimpleRabbitListenerContainerFactory secondFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,@Qualifier("mq2ConnectionFactory") ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();configurer.configure(factory, connectionFactory);return factory;}}

ConfirmCallBack2.java

package com.pojo.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ConfirmCallBack2 implements RabbitTemplate.ConfirmCallback {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String s) {if (!ack) {log.info("ConfirmCallBack2消息发送交换机失败:{}", s);} else {log.info("ConfirmCallBack2消息发送交换机成功");}}
}

ReturnCallBack2.java

package com.pojo.config;import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class ReturnCallBack2 implements RabbitTemplate.ReturnsCallback {@Overridepublic void returnedMessage(ReturnedMessage returnedMessage) {log.info("ReturnCallBack2消息发送队列失败:{}", JSON.toJSON(returnedMessage));}
}

5、消息生产者

package com.pojo.prj.controller;import com.pojo.common.anno.NoNeedLogin;
import com.pojo.common.base.ApplicationContextUtils;
import com.pojo.common.base.BaseController;
import com.pojo.util.ResponseResult;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;/*** <p>* 项目表 前端控制器* </p>** @author zhushangjin* @menu 项目管理* @since 2022-11-14*/
@RestController
@Slf4j
public class ProjectController extends BaseController {@Resource(name = "mq1RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq1RabbitTemplate;@Resource(name = "mq2RabbitTemplate")//初始化mq1的RabbitTemplate对象private RabbitTemplate mq2RabbitTemplate;/*** 获取项目下拉列表** @return* @status done*/@GetMapping("/prj/project/list")@NoNeedLoginpublic ResponseResult<String> list() {String active = ApplicationContextUtils.getActiveProfile();logger.error(ApplicationContextUtils.getActiveProfile());return ResponseResult.ok("ReturnCallBack2");}@GetMapping("/prj/project/test1")public ResponseResult test1() {//发送到topicExchangeOne类型的交换机,根据routekey去找发送到哪个队列里,// 只有这一个队列才能收到这条消息String str = "test1test1test1test1test1";mq1RabbitTemplate.convertAndSend("topicExchangeOne","subQueue01", str);return buildResponseResult(true);}@GetMapping("/prj/project/test2")public ResponseResult test2() {//发送到direct类型的交换机,根据routekey去找发送到哪个队列里,//只有这一个队列才能收到这条消息mq2RabbitTemplate.convertAndSend("subQueue02", "test2test2test2test2test2");return buildResponseResult(true);}@GetMapping("/prj/project/test3")public ResponseResult test3() {//发送到fanout类型的交换机,跟这个交换机绑定的队列都会收到这一条消息,// 故第二个参数routekey无需填写mq1RabbitTemplate.convertAndSend("fanoutExchangeOne", null, "test3test3test3test3test3");return buildResponseResult(true);}}

6、消息消费者

Receiver1.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue01", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver1 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver1: " + hello);}}

Receiver2.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue02", containerFactory = "simpleRabbitListenerContainerFactory2")
public class Receiver2 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver2: " + hello);}}

Receiver3.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue03", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver3 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver3 : " + hello);}}

Receiver4.java

package com.pojo.config;import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RabbitListener(queues = "subQueue04", containerFactory = "simpleRabbitListenerContainerFactory1")
public class Receiver4 {@RabbitHandler(isDefault = true)public void process(String hello) {System.out.println("Receiver4 : " + hello);}}

创建队列

@Bean(name = "uavTopicQueue")public Queue topicQueue() {Map<String, Object> argsMap = new HashMap<String, Object>();argsMap.put("x-max-priority", 5);Queue queue = new Queue(UAV_QUEUE, true, false, false, argsMap);return queue;}

http://www.lryc.cn/news/491060.html

相关文章:

  • 从零开始学习数据库 day0(基础)
  • MongoDB相关问题
  • linux基本命令(1)
  • 【机器学习】超简明Python基础教程
  • 基于信创环境的信息化系统运行监控及运维需求及策略
  • 【Mysql】视图--介绍和作用 视图的创建
  • 【JavaEE初阶 — 多线程】定时器的应用及模拟实现
  • Win10系统开启了文件夹管控(文件夹限制访问)导致软件向系统公共文档目录写入失败的问题排查分享
  • 大数据的数据整合
  • 回溯法经典难题解析
  • LLM的原理理解6-10:6、前馈步骤7、使用向量运算进行前馈网络的推理8、注意力层和前馈层有不同的功能9、语言模型的训练方式10、GPT-3的惊人性能
  • Electron开发构建工具electron-vite(alex8088)添加VueDevTools(VitePlugin)
  • 【C++】static修饰的“静态成员函数“--静态成员在哪定义?静态成员函数的作用?
  • =computed() =ref()
  • webgl threejs 云渲染(服务器渲染、后端渲染)解决方案
  • 【shell编程】函数、正则表达式、文本处理工具
  • 解决 npm xxx was blocked, reason: xx bad guy, steal env and delete files
  • 如何进行高级红队测试:OpenAI的实践与方法
  • Java:二维数组
  • Android 天气APP(三十七)新版AS编译、更新镜像源、仓库源、修复部分BUG
  • Xilinx IP核(3)XADC IP核
  • 计算机网络socket编程(2)_UDP网络编程实现网络字典
  • c#窗体列表框(combobox)应用——省市区列表选择实例
  • Nginx 架构与设计
  • python Flask指定IP和端口
  • 多线程 相关面试集锦
  • 【数据结构】—— 线索二叉树
  • uni-app 发布媒介功能(自由选择媒介类型的内容) 设计
  • How to update the content of one column in Mysql
  • URL在线编码解码- 加菲工具