当前位置: 首页 > news >正文

spring boot rabbitmq常用配置

直接上代码

package com.example.demo;import org.aopalliance.aop.Advice;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.retry.interceptor.RetryInterceptorBuilder;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;@Configuration
public class RabbitMqConfig implements RabbitListenerConfigurer {public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, BeforePublishPostProcessor messagePostProcessor){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());rabbitTemplate.setBeforePublishPostProcessors(messagePostProcessor);return rabbitTemplate;}public Jackson2JsonMessageConverter producerJackson2MessageConverter(){return new Jackson2JsonMessageConverter();}@Overridepublic void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());}public MessageHandlerMethodFactory messageHandlerMethodFactory(){DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());return messageHandlerMethodFactory;}public MappingJackson2MessageConverter consumerJackson2MessageConverter(){return new MappingJackson2MessageConverter();}/*** 使用的时候在* @RabbitListener(containerFactory="containerFactory")*/@Bean("containerFactory")public SimpleRabbitListenerContainerFactory containerFactory(ConnectionFactory connectionFactory,AfterReceivePostProcessor messagePostProcessor,RabbitErrorHandler rabbitErrorHandler){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAfterReceivePostProcessors(messagePostProcessor);factory.setErrorHandler(rabbitErrorHandler);factory.setDefaultRequeueRejected(false);factory.setAdviceChain(new Advice[]{getRetryOperationsInterceptor()});return factory;}/*** 防止队列出错无限次重试*/private RetryOperationsInterceptor getRetryOperationsInterceptor(){return RetryInterceptorBuilder.stateless().maxAttempts(3).backOffOptions(10000,2,30000).build();}/*** 配置了交换机队列以及绑定的配置类上加@DependsOn("rabbitAdmin")* 可以防止交换机队列以及绑定无法创建的问题* rabbitAdmin创建一定要在前*/public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}
}

AfterReceivePostProcessor

package com.example.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class AfterReceivePostProcessor implements MessagePostProcessor {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {log.info("接收消息属性={},body={}",message.getMessageProperties(),new String(message.getBody()));return message;}
}

BeforePublishPostProcessor

package com.example.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Correlation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class BeforePublishPostProcessor implements MessagePostProcessor {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {return message;}@Overridepublic Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {log.info("发送的消息MessageProperties={},body={},correlation={},exchange={},routingKey={}",message.getMessageProperties(),message.getBody(),correlation,exchange,routingKey);return MessagePostProcessor.super.postProcessMessage(message, correlation, exchange, routingKey);}
}

RabbitErrorHandler

package com.example.demo;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.util.ErrorHandler;@Slf4j
@Component
public class RabbitErrorHandler implements ErrorHandler {@Overridepublic void handleError(Throwable t) {log.error(t.getMessage(),t);}
}
http://www.lryc.cn/news/302611.html

相关文章:

  • MySQL学习记录——십삼 视图及用户、权限管理
  • PyCharm 自动添加文件头注释
  • 用HTML Canvas和JavaScript创建美丽的花朵动画效果
  • java----js常用的api
  • unity 使用VS Code 开发,VS Code配置注意事项
  • 领域驱动设计(Domain Driven Design)
  • CF778A String Game 题解
  • 【工具插件类教学】Unity运行时监控变量,属性,事件等的值和调用Runtime Monitoring
  • 实际生产中的一次非典型的基于jmeter的接口自动化实践
  • 新能源汽车软件开发设计规范
  • Linux:grep进阶(11)
  • 【实战】二、Jest难点进阶(一) —— 前端要学的测试课 从Jest入门到TDD BDD双实战(五)
  • 8.2 新特性 - 透明的读写分离
  • 关于三维GIS开发成长路线的一些思考
  • git操作---> 使用git push,和使用git push origin HEAD:[分支名]有什么区别呢?
  • 基于Java的大学社团管理平台
  • 1.函数模板基础
  • 22-k8s中pod的调度-亲和性affinity
  • 通俗易懂,Spring Bean生命周期管理的理解
  • 找座位 - 华为OD统一考试(C卷)
  • npm run dev运行出现NODE_OPTIONS=--max_old_space_size=4096 vite --mode dev --host?
  • 钠离子电池技术
  • 第三十六天| 435. 无重叠区间、763.划分字母区间、56. 合并区间
  • React setState同步还是异步
  • Docker安装和使用Redis
  • 四分位距IQR_ interquartile range
  • Vision Transformer - VIT
  • HTTP与HTTPS:网络安全之门户
  • 头歌:共享单车之数据分析
  • MySQL的数据类型和细节