当前位置: 首页 > news >正文

RabbitMQ实现秒杀场景示例

本文章通过MQ队列来实现秒杀场景

整体的设计如下图,整个流程中对于发送发MQ失败和发送到死信队列的数据未做后续处理

1、首先先创建MQ的配置文件

@Configuration
public class RabbitConfig  {public static final String DEAD_LETTER_EXCHANGE = "deadLetterExchange";public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.#";public static final String DEAD_LETTER_QUEUEA_NAME = "deadQueue";@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ConnectionFactory connectionFactory;@Beanpublic TopicExchange topicExchange(){return new TopicExchange("seckill_topic",true,false);}// 声明死信Exchange@Bean("deadLetterExchange")public DirectExchange deadLetterExchange(){return new DirectExchange(DEAD_LETTER_EXCHANGE);}@Bean("seckillQueue")public Queue seckillQueue(){Map<String,Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);//  x-dead-letter-routing-key  这里声明当前队列的死信路由keyargs.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);return QueueBuilder.durable("seckillQueue").withArguments(args).build();}@Bean("deadQueue")public Queue binding(){return new Queue(DEAD_LETTER_QUEUEA_NAME);}@Beanpublic Binding bindingExchange(){return BindingBuilder.bind(seckillQueue()).to(topicExchange()).with("seckill.#");}// 声明死信队列绑定关系@Beanpublic Binding deadLetterBinding(@Qualifier("deadQueue") Queue queue,@Qualifier("deadLetterExchange") DirectExchange exchange){return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);}//配置会覆盖yml的重试次数//RabbitMQ监听容器/*@Beanpublic SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);//设置并发factory.setConcurrentConsumers(1);SimpleMessageListenerContainer s=new SimpleMessageListenerContainer();//最大并发factory.setMaxConcurrentConsumers(1);//消息接收——手动确认factory.setAcknowledgeMode(AcknowledgeMode.AUTO);//设置超时factory.setReceiveTimeout(2000L);//设置重试间隔factory.setFailedDeclarationRetryInterval(3000L);//监听自定义格式转换//factory.setMessageConverter(jsonMessageConverter);return factory;}*/
}

2、配置yml文件

spring:redis:database: 0host: xxxport: 6379password: xxxtimeout: 60jedis:pool:max-active: 8max-wait: -1max-idle: 8min-idle: 0rabbitmq:username: adminpassword: adminvirtual-host: /host: xxxxport: 12345publisher-confirms: truepublisher-returns: truetemplate:mandatory: truelistener:simple:concurrency: 1max-concurrency: 3# 消费者预取1条数据到内存,默认为250条prefetch: 1# 确定机制acknowledge-mode: manualretry:enabled: true #是否支持重试max-attempts: 2# 重试间隔(ms)initial-interval: 5000

这里有一点需要注意的是在做死信队列的时候如果Config文件中配置了监听容器,在yml文件中的一些属性要在容器里面进行配置,当时测试重试的时候发现没有在Config文件中配置,只在yml文件中配置了重试次数,结果会无限期的重试,MQ的默认方式就是无限期的重试,所以这点很容易踩坑

3、实现交换机的ACK,实现 RabbitTemplate.ConfirmCallback接口

@Component
public class ConfirmCallBackHandler implements RabbitTemplate.ConfirmCallback {@Autowiredprivate RabbitMessageMapper rabbitMessageMapper;@Autowiredprivate RabbitTemplate rabbitTemplate;//注入//PostConstruct注解会在Component、Autowired注解完成后再执行@PostConstructpublic void init(){rabbitTemplate.setConfirmCallback(this);}@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {if(!ack){RabbitMessage rabbitMessage = new RabbitMessage();rabbitMessage.setUniqueKey(correlationData.getId().toString());rabbitMessage.setSuccessFlag("N");rabbitMessageMapper.updateSuccessFlag(rabbitMessage);System.out.println("失败原因:"+cause);}}
}

4、实现队列的ACK,实现 RabbitTemplate.ReturnCallback

@Component
public class ReturnCallBackHandler implements RabbitTemplate.ReturnCallback {@Autowiredprivate RabbitTemplate rabbitTemplate;//注入//PostConstruct注解会在Component、Autowired注解完成后再执行@PostConstructpublic void init(){rabbitTemplate.setReturnCallback(this);}@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {System.out.println("消息主体 message:"+message);System.out.println("应答码 replyCode: :"+replyCode);System.out.println("原因描述 replyText:"+replyText);System.out.println("交换机 exchange:"+exchange);System.out.println("消息使用的路由键 routingKey:"+routingKey);}
}

5、消费者方面,实现 ChannelAwareMessageListener 接口

@Component
public class AckListener implements ChannelAwareMessageListener {@Autowiredprivate RabbitMqService rabbitMqService;@RabbitListener(queues = "seckillQueue")@Overridepublic void onMessage(Message messagex, Channel channel) throws Exception {try {String result = new String(messagex.getBody(),"utf-8");rabbitMqService.receive(result);channel.basicAck(messagex.getMessageProperties().getDeliveryTag(), false);}catch (Exception exception){channel.basicNack(messagex.getMessageProperties().getDeliveryTag(), false, false);}}
}

http://www.lryc.cn/news/175921.html

相关文章:

  • 如何提升网站排名优化(百度SEO优化,轻松提升排名)
  • CountDownLatch 和 CyclicBarrier 用法以及区别
  • 9.9喝遍“茶、奶、果、酒”,茶饮价格战是因为“无活可整”?
  • echarts 学习网址
  • android源码编译
  • 盘点双电机驱动技术
  • ubuntu下用pycharm专业版连接AI服务器及其docker环境
  • IntentFilter笔记
  • 【二叉树】——链式结构(快速掌握递归与刷题技巧)
  • 项目管理—项目普遍存在的问题
  • Ubuntu Seata开机自启动服务
  • 腾讯mini项目-【指标监控服务重构】2023-08-26
  • 《Essential C++》之(面向过程泛型编程)
  • 机器学习笔记:adaBoost
  • Anchor DETR
  • 适合在家做的副业 整理5个,有电脑就行
  • Android WebSocket
  • Android 按键流程
  • C语言——运算符
  • MySQL数据库入门到精通8--进阶篇( MySQL管理)
  • 硬件基本功--MOS管
  • xdebug3开启profile和trace
  • EfficientFormer:高效低延迟的Vision Transformers
  • 【咕咕送书第二期】| 计算机网络对于考研的重要性?
  • 【力扣】58. 最后一个单词的长度
  • Java编程的精髓:深入理解JVM和性能优化
  • 易云维®智慧工厂数字化管理平台助推工业制造企业数字化转型新动能
  • 0.基本概念——数据结构学习
  • Redis可视化工具-Another Redis Desktop Manager 安装
  • ETLCloud工具让美团数据管理更简单