当前位置: 首页 > news >正文

SpringBoot mq快速上手

1.依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.示例代码

基础信息配置

package com.example.demo.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitConfig {public static final String EXCHANGE = "boot-EXCHANGE";public static final String QUEUE = "boot-queue";public static final String routingKey = "*.black.*";@Beanpublic Queue bootQueue(){return QueueBuilder.durable(QUEUE).build();}@Beanpublic Exchange bootExchange(){return ExchangeBuilder.topicExchange(EXCHANGE).build();}@Beanpublic Binding binding(Queue bootQueue,Exchange bootExchange){return BindingBuilder.bind(bootQueue).to(bootExchange).with(routingKey).noargs();}
}

实例配置代码如上

实操代码如下:

  rabbitTemplate.convertAndSend(EXCHANGE,"white.black.hello","nihao");System.out.println("消息发送成功");

mq确保生产者发送到交换机链路正常:

基本:

  //设置回调(保证消息发到交换机上)channel.confirmSelect();//设置回调函数channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息成功发送到交换机");}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("消息没有发送到交换机,请重试");}});

SpringBoot

 // publisher-confirm-type: correlated  # 新版本 spring配置rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(ack){System.out.println("消息成功发送到交换机");}else{System.out.println("消息没有发送到交换机");}}});

该方法中的ack为true,或false代表如上,该方法会被异步调用,不会阻塞主方法,效率高,可以在这里记下来,记到数据库中,事后进行补充。。 

交换机路由到队列正常

//保证消息发到Queuechannel.addReturnListener(new ReturnCallback() {@Overridepublic void handle(Return returnMessage) {System.out.println("消息未发送到交换机,请进行相关处理");//111}});

该方法在路由队列失败时进行回调

  // publisher-returns: true # 开启Return机制rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {String msg = new String(returned.getMessage().getBody());System.out.println("msg路由队列失败,请进行补救操作"+msg);}});

 该方法只会在队列路由失败时被调用,属于回调方法

设置队列持久化(不是指durable设置为true,是指mq重启后,队列依然有消息)

  //设置消息持久化AMQP.BasicProperties prop =new AMQP.BasicProperties().builder().deliveryMode(2).build();//6.发送消息channel.basicPublish(Exchange,"",prop,"nihao".getBytes(StandardCharsets.UTF_8));

设置属性,并把这个属性带上,deliveryMode设置为2即可

  rabbitTemplate.convertAndSend(EXCHANGE, "big.black.dog", "nihao", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;}});

springboot中,需要用 MessagePostProcessor里的message设置deleveryMode 并且设置为枚举类型的PERSISTENT类型,并且记得返回消息,不然无法设置队列类型为持久化

保证消息被消费者正常消费

  //4.监听消息DefaultConsumer callback=new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {System.out.println("消费者1好收到消息"+new String(body,"utf-8"));channel.basicAck(envelope.getDeliveryTag(),false);}};channel.basicConsume(QUEUE,false,callback);

mq在原始代码的时候需要在消费者端设置自动ack为false来进行处理,并在方法里进行手动ack。 

  listener:simple:acknowledge-mode: manual #开启手动ACKprefetch: 10 #消费者一次拿走10个消息配置如上@Component
public class Consumer {@RabbitListener(queues = RabbitConfig.QUEUE)public void Consume(String msg, Channel channel, Message message) throws IOException {System.out.println("msg"+msg);System.out.println("标示符"+message.getMessageProperties().getCorrelationId());channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
}

springboot需要在配置文件里设置值为manual,prefetch表示一次可以取走的消息


 

http://www.lryc.cn/news/497251.html

相关文章:

  • 图像处理网络中的模型水印
  • Halcon 瑕疵检测原理及应用
  • JAVA 架构师面试 100套含答案:JVM+spring+ 分布式 + 并发编程》...
  • 多模态学习详解
  • C#应用开发:基于C# WPF界面实现本机网络通讯状态(下载速度)的显示
  • Octo—— 基于80万个机器人轨迹的预训练数据集用于训练通用机器人,可在零次拍摄中解决各种任务
  • 2022高等代数下【南昌大学】
  • UDP编程
  • 论文阅读:Omnidirectional Image Super-resolution via Bi-projection Fusion
  • Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:智行无忧停车场管理系统(前后端源码 + 数据库 sql 脚本)
  • 微服务的负载均衡可以通过哪些组件实现
  • Spring Boot 支持哪些云环境?
  • 第31天:安全开发-JS应用WebPack打包器第三方库JQuery安装使用安全检测
  • word如何快速创建目录?
  • 关于linux 下的中断
  • 两个畸变矩阵相乘后还是一个2*2的矩阵,有四个畸变元素。1、畸变矩阵吸收了法拉第矩阵。2、畸变矩阵也给法拉第旋转角带来模糊(求解有多种可能)
  • MCU利用单总线协议(1-wire)读取DHT11温湿度
  • [保姆式教程]使用目标检测模型YOLO11 OBB进行旋转目标检测:训练自己的数据集(基于卫星和无人机的农业大棚数据集)
  • 【网络安全】网站常见安全漏洞 - 网站基本组成及漏洞定义
  • Redis——个人笔记留存
  • 人工智能_大模型091_大模型工作流001_使用工作流的原因_处理复杂问题_多轮自我反思优化ReAct_COT思维链---人工智能工作笔记0236
  • linux上jdk1.8安装elasticsearch6.8.5踩坑总结
  • Three.js教程_02场景、相机与渲染器全面解析
  • 【数据结构】动态规划-基础篇
  • opencv读取展示图片
  • 网站访问统计A/B测试与数据分析
  • 前端开发 之 15个页面加载特效下【附完整源码】
  • 详解八大排序(六)------(三路划分,自省排序,归并排序外排序)
  • 【Java从入门到放弃 之 从字节码的角度异常处理】
  • Java虚拟机(JVM)中的元空间(Metaspace)一些关键点的总结