RabbitMq 使用说明
1. 声明交换机和队列,以及交换机和队列绑定
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;@Slf4j
@Configuration
public class CeshiQueue {//延迟交换机fanoutpublic static final String DELAY_EXCHANGE_NAME = "delay_exchange_name";//延时队列Apublic static final String DELAY_QUEUE_A_NAME = "delay_queue_a_name";//延时队列Bpublic static final String DELAY_QUEUE_B_NAME = "delay_queue_b_name";//延时队列A路由Keypublic static final String DELAY_ROUTING_KEY_A_NAME = "delay_routing_key_a_name";//延时队列B路由Keypublic static final String DELAY_ROUTING_KEY_B_NAME = "delay_routing_key_b_name";//死信交换机public static final String DEAD_LETTER_EXCHANGE_NAME = "dead_letter_exchange_name";//死信队列Apublic static final String DEAD_LETTER_QUEUE_A_NAME = "dead_letter_queue_a_name";//死信队列Bpublic static final String DEAD_LETTER_QUEUE_B_NAME = "dead_letter_queue_b_name";//死信队列A路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_A_NAME = "dead_letter_routing_key_a_name";//死信队列B路由Keypublic static final String DEAD_LETTER_ROUTING_KEY_B_NAME = "dead_letter_routing_key_b_name";/*** 声明延时交换机 fanout** @return*/@Bean(DELAY_EXCHANGE_NAME)public Exchange DELAY_EXCHANGE_NAME() {// return ExchangeBuilder.fanoutExchange(DELAY_EXCHANGE_NAME).durable(true).build();return ExchangeBuilder.directExchange(DELAY_EXCHANGE_NAME).durable(true).build();}/*** 声明死信交换机 direct** @return*/@Bean(DEAD_LETTER_EXCHANGE_NAME)public Exchange DEAD_LETTER_EXCHANGE_NAME() {return ExchangeBuilder.directExchange(DEAD_LETTER_EXCHANGE_NAME).durable(true).build();}/*** 声明延时队列A* 并绑定死信交换机 和 死信队列A 根据路由key** @return*/@Bean(DELAY_QUEUE_A_NAME)public Queue DELAY_QUEUEA_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_NAME);//设置此队列延时时间 6秒map.put("x-message-ttl", 6000);return QueueBuilder.durable(DELAY_QUEUE_A_NAME).withArguments(map).build();}/*** 声明延时队列B* 并绑定死信交换机 和 死信队列B 根据路由key** @return*/@Bean(DELAY_QUEUE_B_NAME)public Queue DELAY_QUEUEB_NAME() {Map map = new HashMap();map.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);map.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_NAME);//设置此队列延时时间 12秒map.put("x-message-ttl", 12000);return QueueBuilder.durable(DELAY_QUEUE_B_NAME).withArguments(map).build();}/*** 声明死信队列A** @return*/@Bean(DEAD_LETTER_QUEUE_A_NAME)public Queue DEAD_LETTER_QUEUEA_NAME() {return new Queue(DEAD_LETTER_QUEUE_A_NAME);}/*** 声明死信队列B** @return*/@Bean(DEAD_LETTER_QUEUE_B_NAME)public Queue DEAD_LETTER_QUEUEB_NAME() {return new Queue(DEAD_LETTER_QUEUE_B_NAME);}/*** 延时队列A绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueABinding(@Qualifier(DELAY_QUEUE_A_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {
// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_A_NAME).noargs();}/*** 延时队列B绑定交换机** @param queue* @param exchange* @return*/@Beanpublic Binding delayQueueBBinding(@Qualifier(DELAY_QUEUE_B_NAME) Queue queue, @Qualifier(DELAY_EXCHANGE_NAME) Exchange exchange) {// return BindingBuilder.bind(queue).to(exchange).with("").noargs();return BindingBuilder.bind(queue).to(exchange).with(DELAY_ROUTING_KEY_B_NAME).noargs();}/*** 死信队列A绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueABinding(@Qualifier(DEAD_LETTER_QUEUE_A_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_A_NAME).noargs();}/*** 死信队列B绑定交换机* @param queue* @param exchange* @return*/@Beanpublic Binding deadLetterQueueBBinding(@Qualifier(DEAD_LETTER_QUEUE_B_NAME)Queue queue, @Qualifier(DEAD_LETTER_EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUE_B_NAME).noargs();}}
2. 生产者发送消息
import com.core.rabbitmq.queue.CeshiQueue;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.Date;
import java.util.HashMap;
import java.util.Map;@Slf4j
@RestController
@RequestMapping("/provider")
public class ProviderController {@AutowiredRabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法@GetMapping("/ceshiDelay1")public void ceshi() {System.out.println("provider ================" + new Date());//发送消息Map map = new HashMap();map.put("name", "老六");map.put("age", "18");//第一个参数发送给哪个交换机 第二个路由key 我们延时交换机是fanout所以路由key为空 第三个发送对象
// rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, "", map);rabbitTemplate.convertAndSend(CeshiQueue.DELAY_EXCHANGE_NAME, CeshiQueue.DELAY_ROUTING_KEY_A_NAME, map
// ,message -> {
// message.getMessageProperties().setExpiration(1000 * 6 + "");
// message.getMessageProperties().setDelay(6 * 1000);
// return message;
// }););System.out.println("订单提交成功,请及时付款");}
3. 消费者消费消息
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.stereotype.Component;import java.io.IOException;
import java.util.Date;
import java.util.Map;/*** 小程序订单超时未支付关闭订单** @date 2023/03/03*/
@Component
@Slf4j
//@RequiredArgsConstructor
public class ConsumerListener {@RabbitListener(queues = CeshiQueue.DEAD_LETTER_QUEUE_A_NAME)public void process2(Map order, Message message, @Headers Map<String, Object> headers, Channel channel) {log.info("消費者 订单号消息", order);long deliveryTag = message.getMessageProperties().getDeliveryTag();log.info("订单 延时队列,消费者 订单号消息【{}】", order);log.info("订单 延时队列,消费者 订单号消息【{}】", headers);log.info("订单 延时队列,消费者 订单号消息【{}】", message);log.info("订单 延时队列,消费者 订单号消息【{}】", channel);log.info("订单 延时队列,消费者 订单号消息【{}】", deliveryTag);System.out.println(new Date() + "消费者 执行结束...." + message);try { // 如果没有错误,则执行此步,表示成功签收; 手动签收,第一个参数表示消息的deliveryTag,第二个参数表示是否允许多条消息同时被签收channel.basicAck(deliveryTag, false);} catch (Exception e) {// requeue为true则重新入队列,否则丢弃或者进入死信队列 /*** @param deliveryTag 消息序号* @param multiple 是否批量处理(true:批量拒绝所有小于deliveryTag的消息;false:只处理当前消息)* @param requeue 拒绝是否重新入队列 (true:消息重新入队;false:禁止消息重新入队)*/channel.basicReject(deliveryTag, false);log.error("订单 超时支付异常,系统重新关闭订单【{}】", deliveryTag);}}
}
注意:生产者发送延迟队列里,用延迟路由key,想要得到延迟效果,需要用死信队列来监听,如果用延迟队列监听就得不到延迟效果,延迟队列就会立即受到消息(初学者会遇到的坑)。
4. BasicReject
拒收,是接收端在收到消息的时候响应给RabbitMQ服务的一种命令,告诉服务器不应该由我处理,或者拒绝处理,扔掉。 接收端在发送reject命令的时候可以选择是否要重新放回queue中。如果没有其他接收者监控这个queue的话,要注意一直无限循环发送的危险。
BasicDeliverEventArgs ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
channel.BasicReject(ea.DeliveryTag, false);
BasicReject方法第一个参数是消息的DeliveryTag,对于每个Channel来说,每个消息都会有一个DeliveryTag,一般用接收消息的顺序来表示:1,2,3,4 等等。第二个参数是是否放回queue中,requeue。
BasicReject一次只能拒绝接收一个消息,而BasicNack方法可以支持一次0个或多个消息的拒收,并且也可以设置是否requeue。
channel.BasicNack(3, true, false);
在第一个参数DeliveryTag中如果输入3,则消息DeliveryTag小于等于3的,这个Channel的,都会被拒收。
5. RabbitMQ 消息确认机制ACK
ack 机制保证的是broker和消费者之间的可靠性
ack 表示的是消费端收到消息后的确认方式,有三种确认方式
自动确认:acknowledge="none"(默认)
手动确认:acknowledge="manual"
根据异常情况确认:acknowledge="auto"(这种方式使用麻烦,不作讲解)
自动确认的解释:
当消息一旦被 Consumer 接收到,则自动确认收到,并将相应消息从 RabbitMQ 的消息缓存中移除
手动确认的解释:
在实际业务处理中,很可能消费端收到消息后业务处理出现异常,那么该消息就会丢失;
如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() 手动签收;如果出现异常,则调用 channel.basicNack()方法,让 broker 自动重新发送消息给消费端。
rabbitMQ消息中间件的延时队列以及死信队列的使用和应用场景_出一个场景问mq的用法(延迟、死信)
rabbitmq-BasicReject