当前位置: 首页 > news >正文

RabbitMq 使用说明

1. 声明交换机和队列,以及交换机和队列绑定

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class CeshiQueue {//延迟交换机fanoutpublic static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";//延时队列Apublic static final String DELAY_QUEUE_A_NAME = "delay_queue_a_name";//延时队列Bpublic static final String DELAY_QUEUE_B_NAME = "delay_queue_b_name";//延时队列A路由Keypublic static final String DELAY_ROUTING_KEY_A_NAME = "delay_routing_key_a_name";//延时队列B路由Keypublic static final String DELAY_ROUTING_KEY_B_NAME = "delay_routing_key_b_name";//死信交换机public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange_name";//死信队列Apublic static final String DEAD_LETTER_QUEUE_A_NAME = "dead_letter_queue_a_name";//死信队列Bpublic static final String DEAD_LETTER_QUEUE_B_NAME = "dead_letter_queue_b_name";//死信队列A路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_A_NAME = "dead_letter_routing_key_a_name";//死信队列B路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_B_NAME = "dead_letter_routing_key_b_name";/*** 声明延时交换机 fanout** @return*/@Bean(DELAY_EXCHANGE_NAME)public Exchange DELAY_EXCHANGE_NAME() {// return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.directExchange(DELAY_EXCHANGE_NAME).durable(true).build();}/*** 声明死信交换机 direct** @return*/@Bean(DEAD_LETTER_EXCHANGE_NAME)public Exchange DEAD_LETTER_EXCHANGE_NAME() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME).durable(true).build();}/*** 声明延时队列A* 并绑定死信交换机 和 死信队列A  根据路由key** @return*/@Bean(DELAY_QUEUE_A_NAME)public Queue DELAY_QUEUEA_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_NAME);//设置此队列延时时间 6秒map.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(map).build();}/*** 声明延时队列B* 并绑定死信交换机 和 死信队列B  根据路由key** @return*/@Bean(DELAY_QUEUE_B_NAME)public Queue DELAY_QUEUEB_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_NAME);//设置此队列延时时间 12秒map.put("x-message-ttl", 12000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(map).build();}/*** 声明死信队列A** @return*/@Bean(DEAD_LETTER_QUEUE_A_NAME)public Queue DEAD_LETTER_QUEUEA_NAME() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}/*** 声明死信队列B** @return*/@Bean(DEAD_LETTER_QUEUE_B_NAME)public Queue DEAD_LETTER_QUEUEB_NAME() {return new Queue(DEAD_LETTER_QUEUE_B_NAME);}/*** 延时队列A绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueABinding(@Qualifier(DELAY_QUEUE_A_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_A_NAME).noargs();}/*** 延时队列B绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueBBinding(@Qualifier(DELAY_QUEUE_B_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_B_NAME).noargs();}/*** 死信队列A绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueABinding(@Qualifier(DEAD_LETTER_QUEUE_A_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_NAME).noargs();}/*** 死信队列B绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueBBinding(@Qualifier(DEAD_LETTER_QUEUE_B_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_NAME).noargs();}}

2. 生产者发送消息

import com.core.rabbitmq.queue.CeshiQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@AutowiredRabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/ceshiDelay1")public void ceshi() {System.out.println("provider ================" + new Date());//发送消息Map map = new HashMap();map.put("name", "老六");map.put("age", "18");//第一个参数发送给哪个交换机   第二个路由key  我们延时交换机是fanout所以路由key为空   第三个发送对象
//        rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, "", map);rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, CeshiQueue.DELAY_ROUTING_KEY_A_NAME, map
//                ,message -> {
//                    message.getMessageProperties().setExpiration(1000 * 6 + "");
//                    message.getMessageProperties().setDelay(6 * 1000);
//                    return message;
//                }););System.out.println("订单提交成功,请及时付款");}

3. 消费者消费消息

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;
import java.util.Map;/*** 小程序订单超时未支付关闭订单** @date 2023/03/03*/
@Component
@Slf4j
//@RequiredArgsConstructor
public class ConsumerListener {@RabbitListener(queues = CeshiQueue.DEAD_LETTER_QUEUE_A_NAME)public void process2(Map order, Message message, @Headers Map<String, Object> headers, Channel channel) {log.info("消費者 订单号消息", order);long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("订单 延时队列,消费者 订单号消息【{}】", order);log.info("订单 延时队列,消费者 订单号消息【{}】", headers);log.info("订单 延时队列,消费者 订单号消息【{}】", message);log.info("订单 延时队列,消费者 订单号消息【{}】", channel);log.info("订单 延时队列,消费者 订单号消息【{}】", deliveryTag);System.out.println(new Date() + "消费者 执行结束...." + message);try {      // 如果没有错误,则执行此步,表示成功签收; 手动签收,第一个参数表示消息的deliveryTag,第二个参数表示是否允许多条消息同时被签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// requeue为true则重新入队列,否则丢弃或者进入死信队列  /*** @param deliveryTag 消息序号* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)*/channel.basicReject(deliveryTag, false);log.error("订单  超时支付异常,系统重新关闭订单【{}】", deliveryTag);}}
}

        注意:生产者发送延迟队列里,用延迟路由key,想要得到延迟效果,需要用死信队列来监听,如果用延迟队列监听就得不到延迟效果,延迟队列就会立即受到消息(初学者会遇到的坑)。

4. BasicReject 

        拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。 接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。

BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);

        BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。

        BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。

channel.BasicNack(3, true, false);

        在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。

5. RabbitMQ 消息确认机制ACK

ack 机制保证的是broker和消费者之间的可靠性

ack 表示的是消费端收到消息后的确认方式,有三种确认方式

        自动确认:acknowledge="none"(默认)
        手动确认:acknowledge="manual"
        根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,不作讲解)
自动确认的解释:

        当消息一旦被 Consumer 接收到,则自动确认收到,并将相应消息从 RabbitMQ 的消息缓存中移除
手动确认的解释:

        在实际业务处理中,很可能消费端收到消息后业务处理出现异常,那么该消息就会丢失;
        如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() 手动签收;如果出现异常,则调用 channel.basicNack()方法,让 broker 自动重新发送消息给消费端。

rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景_出一个场景问mq的用法(延迟、死信)

rabbitmq-BasicReject

RabbitMQ ACK消息确认机制 快速入门

http://www.lryc.cn/news/36386.html

相关文章:

  • Vue(10-20)
  • C++-对四个智能指针:shared_ptr,unique_ptr,weak_ptr,auto_ptr的理解
  • uni-app中使用vue3语法详解
  • 三十四、MongoDB PHP
  • 浅拷贝和深拷贝的区别
  • 6个常用Pycharm插件推荐,老手100%都用过
  • TCP的11种状态
  • new 指令简单过程 / 类加载简单过程初始化
  • Asan基本原理及试用
  • 深度学习应用技巧4-模型融合:投票法、加权平均法、集成模型法
  • 【并发编程】深入理解Java内存模型及相关面试题
  • C++编程语言STL之queue介绍
  • ACO优化蚁群算法
  • SwiftUI 常用组件和属性(SwiftUI初学笔记)
  • Centos 中设置代理的两种方法
  • 高速PCB设计指南系列(一)
  • 云端IDE:TitanIDE v2.6.0 正式发布
  • 【Python】tqdm 模块
  • 论文阅读:Adversarial Cross-Modal Retrieval对抗式跨模式检索
  • 计算机网络复习
  • unity动画--动画绑定,转换,用脚本触发
  • 车载汽车充气泵PCBA方案
  • Android 连接 MySQL 数据库教程
  • tmall.item.update.schema.get( 天猫编辑商品规则获取 )
  • Leetcode 2379. 得到 K 个黑块的最少涂色次数
  • [深入理解SSD系列 闪存实战2.1.3] 固态硬盘闪存的物理学原理_NAND Flash 的读、写、擦工作原理
  • 总结:Linux内核相关
  • flutter工程创建过程中遇到一些问题。
  • 记录实现操作系统互斥锁的一次思考
  • 计算机SCI期刊的分值是什么意思? - 易智编译EaseEditing