当前位置: 首页 > news >正文

springboot框架使用RabbitMQ举例代码

以前分享过一个理论有兴趣的小伙伴可以看下
https://blog.csdn.net/Drug_/article/details/138164180

不多说 还是直接上代码

第一步:引入依赖 可以不指定版本

     <!--        amqp --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

第二步 配置文件

  #配置rabbitMq 服务器rabbitmq:host: ${rabbitmq.rabbitmqHost}port: ${rabbitmq.rabbitmqPort}username: ${rabbitmq.rabbitmqUsername}password: ${rabbitmq.rabbitmqPassword}virtual-host: ${rabbitmq.rabbitmqVhost}connection-timeout: 5000#消费者配置listener:simple:
# 不建议使用 自带的重试配置  因为有几种情况会失效  在网上摘抄的 网友的测试
#  重试机制使用场景:
#  1. 如果是业务代码,比如空指针之类的异常那重试机制其实没什么用
#  2. 代码中不能使用try/catch捕获异常,否则重试机制失效
#  我在消费者 使用了 try 发现 确实失效了  所以 我觉得 需要手动在消费者里累计重试次数    自行处理异常
#        retry:
#          enabled: true  #开启消费者retry重试机制
#          max-attempts: 3  # 最大重试次数
#          multiplier: 2.0 # 重试间隔时间倍数
#          initial-interval: 1000 # 初始重试间隔时间(毫秒)
#          max-interval: 10000 # 最大重试间隔时间(毫秒)acknowledge-mode: manual # 手动确认消息,防止消息丢失 auto manual手动确认模式default-requeue-rejected: true #是否将拒绝的消息重新入队。默认是 true,即拒绝的消息会重新入队。 配合手动确认模式concurrency: 1 #: 消费者线程池的并发数。设置同时处理的消费者数量max-concurrency: 1 #最大并发消费者数量prefetch: 1 # 限制每个消费者一次可以获取的消息数量,防止消息在某个消费者身上发生阻塞#生产者配置
#    publisher-returns: true  # 启用发布者返回模式。设置为 true 启用,确保如果消息无法路由到目标队列,则会返回给生产者。# none: 不启用发布者确认。# correlated: 启用发布者确认并使用 CorrelationData 对象,可以在回调中进行处理。#: 启用简单的发布者确认模式,不带 CorrelationData。
#    publisher-confirm-type: none

第三步定义常量 :

package com.testweb.testweb.rabbitmq.web;/*** User:Json* Date: 2024/9/3**/
public class MqConstant {public static final String TestDirectRouting = "rabbitmq.TestDirectRouting";public static final String TestDirectQueue = "rabbitmq.TestDirectQueue";public static final String TestDirectExchange = "rabbitmq.TestDirectExchange";
}

第四步 消费者定义:

package com.testweb.testweb.rabbitmq.web.consumer;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;import java.util.HashMap;
import java.util.Map;/*** User:Json* Date: 2024/9/3* 消费者**/
@Configuration
public class TestConsumer {//队列@Beanpublic Queue TestDirectQueue() {Map<String, Object> args = new HashMap<>();args.put("x-ha-policy", "all"); //将队列设置为在集群中的所有节点上都可用return new Queue(MqConstant.TestDirectQueue, true, false, false, args);}@Beanpublic DirectExchange TestDirectExchange() {return new DirectExchange(MqConstant.TestDirectExchange, true, false);}@Beanpublic Binding bindingDirect() {return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(MqConstant.TestDirectRouting);}@RabbitListener(queues = MqConstant.TestDirectQueue)public void process1(Message testMessage, Channel channel) {// 消息的唯一标识idlong deliveryTag = testMessage.getMessageProperties().getDeliveryTag();//重试次数Integer retryCount =(Integer) testMessage.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);try {// 处理消息的业务逻辑System.out.println("Received order message: " + new String(testMessage.getBody()));//假装异常int a=  1/0;// 手动确认消息// deliveryTag 唯一标识// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息channel.basicAck(deliveryTag, false);} catch (Exception e) {if (retryCount < 3) { // 设置最大重试次数try {System.out.println("处理失败,拒绝消息并重新入队 :" + testMessage);MessageProperties messageProperties = new MessageProperties();messageProperties.setHeader("retryCount", retryCount + 1);Message newMessage = new Message(testMessage.getBody(), messageProperties);// 重新入队  未写完channel.basicPublish(MqConstant.TestDirectExchange, MqConstant.TestDirectRouting, null, newMessage.getBody());// 手动确认原消息,防止死循环channel.basicAck(deliveryTag, false);// 处理失败,拒绝消息并重新入队 方式1// 消息标识 deliveryTag,// multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息,,// requeue  是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃//    channel.basicNack(deliveryTag, false, true);// 处理失败,拒绝消息并重新入队 方式2// 消息标识 deliveryTag// requeue 是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃。//channel.basicReject(long deliveryTag, boolean requeue);
//                3. 使用场景
//                basicNack:
//                当你需要拒绝一批消息时,使用 basicNack 是更好的选择,尤其是当你想在消费失败时批量拒绝多条消息。
//                适用于更复杂的场景,比如一次性处理多个未确认的消息。
//                basicReject:
//                当你只想拒绝当前消息时,basicReject 是一个简化的选择。它通常用于更简单的场景,只需处理当前消息即可。
//                适合处理单个消息的拒绝。//如果你在消费者 里 只写了 消息确定 没有写 如果异常后 的处理 默认是不会把消息重新放回队列的} catch (Exception nackException) {System.out.println("重新入队失败!!!");// 处理 nack 失败的情况nackException.printStackTrace();}} else {System.out.println("达到最大重试次数 将消息发送到死信队列或进行其他处理!!!");try {channel.basicReject(deliveryTag, false); // 丢弃消息或转发到死信队列} catch (Exception rejectException) {rejectException.printStackTrace();}}}}}

第五步:生产者

package com.testweb.testweb.rabbitmq.web.producer;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** User:Json* Date: 2024/9/3* 生产者**/
@Component
public class TestProducer {@ResourceAmqpTemplate amqpTemplate;public <T> void produce(T payload){amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,MqConstant.TestDirectRouting, payload);}
}

第六步 测试:

package com.testweb.testweb.rabbitmq.web.controller;import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.*;import javax.annotation.Resource;/*** User:Json* Date: 2024/9/3**/
@RestController
@RequestMapping("/testMq")
public class TestMqController {@ResourceAmqpTemplate amqpTemplate;@GetMapping("test")@CrossOrigin(origins = "*")public void test(@RequestParam String msg){amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,MqConstant.TestDirectRouting, msg);}}
http://www.lryc.cn/news/479046.html

相关文章:

  • Java实现一个延时队列
  • 为什么说vue是双向数据流
  • 创造属于你的 Claude Prompt 和个性化 SVG 卡片|对李继刚老师提示词的浅浅解析与总结
  • redis与本地缓存
  • git撤销commit和add
  • 【361】基于springboot的招生宣传管理系统
  • 【一些关于Python的信息和帮助】
  • creo toolkit二次开发学习之程序集(ProAsmcomp)和装配体组件路径对象(ProAsmcomppath)
  • 深入浅出 Spring Boot 与 Shiro:构建安全认证与权限管理框架
  • 外包干了三年,精神严重内耗...
  • ruoyi-vue集成tianai-captcha验证码
  • Django安装
  • Ubuntu 20.04 安装 QGC v4.3 开发环境
  • WPF+MVVM案例实战(二十一)- 制作一个侧边弹窗栏(AB类)
  • linux中怎样登录mysql数据库
  • 深入理解 Linux 内存管理:free 命令详解
  • 指针万字超级最强i解析与总结!!!!!
  • 告别生硬电子音,这款TTS软件让语音转换更自然动听
  • CORS(跨域资源共享)和SOP(同源策略)
  • 【系统设计】数据库压缩技术详解:从基础到实践(附Redis内存优化实战案例)
  • 基于SpringBoot的“乐校园二手书交易管理系统”的设计与实现(源码+数据库+文档+PPT)
  • debian11安装最新rabbitmq
  • 三十三、Python基础语法(面向对象其他语法-下)
  • 简单又便宜的实现电脑远程开机唤醒方法
  • Flutter鸿蒙next 状态管理框架对比分析
  • Vue Router进阶详解
  • 进程的控制
  • 基于C语言实现的图书管理系统
  • 删除 需要来自XXXX的权限才能对此文件夹进行更改 文件的解决办法
  • ARM base instruction -- ccmp (immediate)