当前位置: 首页 > news >正文

使用RabbitMQ实现延时消息自动取消的简单案例

一、流程图

二、导包

<!--消息队列 AMQP依赖,包含RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

三、配置文件

#消息队列
spring:rabbitmq:host: 192.168.88.130port: 5672virtual-host: my_vhost #使用的虚拟主机username: rootpassword: rootlistener:simple:acknowledge-mode: manual #开启手动应答

四、配置类

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Configuration
public class RabbitMQConfig {/*** 订单交换机*/public static final String ORDER_EXCHANGE = "order_exchange";/*** 订单队列*/public static final String ORDER_QUEUE = "order_queue";/*** 订单路由键*/public static final String ORDER_ROUTING = "order_routing";/*** 死信交换机*/public static final String ORDER_DEAD_EXCHANGE = "order_dead_exchange";/*** 死信队列*/public static final String ORDER_DEAD_QUEUE = "order_dead_queue";/*** 死信路由键*/public static final String ORDER_DEAD_ROUTING = "order_dead_routing";/*** 订单交换机*/@Bean("orderExchange")public Exchange getOrderExchange() {return new DirectExchange(ORDER_EXCHANGE);}/*** 订单队列*/@Bean("orderQueue")public Queue getOrderQueue() {Map<String, Object> map = new HashMap<>(3);map.put("x-dead-letter-exchange", ORDER_DEAD_EXCHANGE);//死信交换机map.put("x-dead-letter-routing-key", ORDER_DEAD_ROUTING);//死信路由键map.put("x-message-ttl", 1000 * 60 * 15);//队列过期时间return QueueBuilder.durable(ORDER_QUEUE).withArguments(map).build();}/*** 将订单交换机与订单队列绑定*/@BeanBinding orderExchangeBindingOrder(@Qualifier("orderExchange") Exchange exchange,@Qualifier("orderQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_ROUTING).noargs();}/*** 死信交换机*/@Bean("orderDeadExchange")public Exchange getOrderDeadExchange() {return new DirectExchange(ORDER_DEAD_EXCHANGE);}/*** 死信队列*/@Bean("orderDeadQueue")public Queue getOrderDeadQueue() {return new Queue(ORDER_DEAD_QUEUE,//队列名true,//是否持久化false,//是否具有排他性,只在首次声明时可见,不允许其他用户访问,连接断开时自动删除false,//是否自动删除,经历过至少一次连接后,所有消费者都断开了连接,此队列会自动删除null);}/*** 将死信交换机与死信队列绑定*/@BeanBinding deadExchangeBindingDeadQueue(@Qualifier("orderDeadExchange") Exchange exchange,@Qualifier("orderDeadQueue") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_ROUTING).noargs();}
}

五、发送消息的类

import com.sky.configuration.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;/*** 消息队列发送消息*/
@Component
public class SendRabbitMQ {@Autowiredprivate RabbitTemplate rabbitTemplate;/*** @param orderId 15分钟后要检查的订单编号*/public void sendDelayOrder(Long orderId) {rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE,//订单交换机RabbitMQConfig.ORDER_ROUTING,//订单路由键orderId//要取消的订单编号);}
}

六、接收消息的类

import com.rabbitmq.client.Channel;
import com.sky.configuration.RabbitMQConfig;
import com.sky.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;/*** 消息队列接收消息*/
@Component
public class ReceiveRabbitMQ {@Autowiredprivate OrderMapper orderMapper;/*** @param orderId 要取消的订单的编号* @param msg     包含了要回复的队列* @param channel 有回复功能的参数*/@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_QUEUE)public void ReceiveDeadOrder(Long orderId, Channel channel, Message msg) throws IOException {orderMapper.delCancelOrder(orderId);//查询数据库,订单是否付款,未付款:改为已取消channel.basicAck(msg.getMessageProperties().getDeliveryTag(),//应答的消息false//是否批量应答);}
}

七、在业务代码中注入发送类,并调用发送类的发送方法

@Service
public class OrderServiceImpl implements OrderService {@Autowiredprivate SendRabbitMQ sendRabbitMQ;/*** 用户下单*/public void submitOrder(OrdersSubmitDTO ordersSubmitDTO) {sendRabbitMQ.sendDelayOrder(order.getId());//发送延迟消息到消息队列}
}

http://www.lryc.cn/news/312326.html

相关文章:

  • Docker部署(ruoyi案例接上篇Docker之部署前后端分离项目)实施必会!!!!
  • 电脑中已经有多个模组压缩文件,如何通过小火星露谷管理器批量安装
  • [Linux]如何理解kernel、shell、bash
  • C++:Vector的使用
  • Redis之事务(详细解析)
  • Java项目:39 springboot007大学生租房平台的设计与实现
  • 安卓内存信息查看
  • Positional Encoding 位置编码
  • MySql、Navicat 软件安装 + Navicat简单操作(建数据库,表)
  • 逆向案例五、爬取b站评论,表单MD5加密
  • 010-原型链
  • Electron-builder打包安装包——编译篇
  • Red Hat系统升级内核版本
  • Java集合set之HashSet、LinkedHashSet、TreeSet的区别?
  • 全方位碾压chatGPT4的全球最强模型Claude 3发布!速通指南在此!保姆级教学拿脚都能学会!
  • upload-Labs靶场“11-15”关通关教程
  • linux-rpm命令
  • 如何利用python实现自己的modbus-tcp库
  • linux系统-----------搭建LNMP 架构
  • C++中boost库的安装及使用(Windows)
  • CPP编程-CPP11中的内存管理策略模型与名称空间管理探幽(时隔一年,再谈C++抽象内存模型)
  • springboot项目整合minio实现文件的分布式存储
  • 微信小程序开发学习笔记《19》uni-app框架-配置小程序分包与轮播图跳转
  • Python内置模块
  • WordPress建站入门教程:小皮面板phpstudy如何安装PHP和切换php版本?
  • 用友 NC saveDoc.ajax 任意文件上传漏洞复现
  • 如何使用达摩盘
  • 网络编程的学习
  • 【Mining Data】收集数据(使用 Python 挖掘 Twitter 数据)
  • 2024京津冀光伏展