当前位置: 首页 > news >正文

RabbitMQ学习笔记(下):延迟队列,发布确认高级,备份交换机

十、延迟队列

延迟队列

概念:

延迟队列使用场景:

 

流程图:

延迟队列整合Springboot

导入依赖:

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

在java/com/atguigu/rabbitmq下创建config创建类SwaggerConfig,写入代码:

@Configuration
@EnableSwagger2
public class SwaggerConfig {@Beanpublic Docket webApiConfig(){return new Docket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}private ApiInfo webApiInfo() {return new ApiInfoBuilder().title("rabbitmq接口文档").description("本文档描述了rabbitmq微服务接口定义").version("1.0").contact(new Contact("enjoy6288","http://atguigu.com","1551388580@qq.com")).build();}
}

队列TTL代码框架图:

队列TTL(配置类代码):

@Configuration
public class TtlQueueConfig {//普通交换机的名称public static final String X_EXCHANGE = "X";//死信交换机的名称public static final String Y_DEAD_LETTER_EXCHANGE = "Y";//普通队列的名称public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB";//死信队列的名称public static final String DEAD_LETTER_QUEUE = "QD";//声明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE);}//声明yExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);}//声明普通队列A的TTL为10s@Bean("queueA")public Queue queueA(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();}//声明普通队列B的TTL为40s@Bean("queueB")public Queue queueB(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");//设置TTLarguments.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build();}//死信队列@Bean("queueD")public Queue queueD() {return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();}//绑定A-X@Beanpublic Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueA).to(xExchange).with("XA");}//绑定B-x@Beanpublic Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueB).to(xExchange).with("XB");}//绑定D-y@Beanpublic Binding queueDBindingY(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");}
}

队列TTL(生产者):

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendMsg/{message}")public void sendMsg(@PathVariable String message){log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date().toString(),message);rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);}
}

队列TTL(消费者)

@Slf4j
@Component
public class DeadLetterQueueConsumer {//接收消息@RabbitListener(queues="QD")public void receiveD(Message message, Channel channel) throws Exception{String msg = new String(message.getBody());log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);}
}

延迟队列优化

不能为需求增加队列,

写一个通用队列作为延迟队列:

配置类,在上面配置类中加入如下代码:

public static final String QUEUE_C = "QC";
//和死信交换机连接
@Bean("queueC")
public Queue queue(){Map<String,Object> arguments = new HashMap<>(3);//设置死信交换机arguments.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//设置死信RoutingKeyarguments.put("x-dead-letter-routing-key","YD");return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build();
}
//和普通交换机绑定
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,@Qualifier("xExchange") DirectExchange xExchange){return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}

在前面生产者的基础上写入如下代码: 

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列QC:{}",new Date().toString(),ttlTime,message);rabbitTemplate.convertAndSend("X","XC",message,msg->{//发送消息的时候延迟时长msg.getMessageProperties().setExpiration(ttlTime);return msg;});}
}

点击启动类重新启动,在网页端输入:localhost:8080/ttl/sendExpirationMsg/你好1/20000和localhost:8080/ttl/sendExpirationMsg/你好2/2000。

问题:延迟队列是排队的,当队列中有多条消息时,延迟队列的消息会根据前面最长时间发送。

安装延迟队列插件:

在https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases下载rabbitmq_delayed_message_exchange插件,解压放置到RabbitMQ的插件目录。

rabbitmq的插件在:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录下,上传插件到目录下,如果上传失败用sudo rz先获得权限。

输入:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启rabbitmq:systemctl restart rabbitmq-server

进入rabbitmq的交换机界面,查看下面是否出现,如果出现代表安装成功:

基于插件的延迟队列:

配置类

@Configuration
public class DelayedQueueConfig {//队列public static final String DELAYED_QUEUEE_NAME="delayed.queue";//交换机public static final String DELAYED_EXCHANGE_NAME="delayed.exchange";//routingKeypublic static final String DELAYED_ROUTING_KEY="delayed.routingkey";//声明队列@Beanpublic Queue delayedQueue(){return new Queue(DELAYED_QUEUEE_NAME);}//声明交换机,基于插件的@Beanpublic CustomExchange delayedExchange(){Map<String,Object> arguments = new HashMap<>();arguments.put("x-delayed-type","direct");//交换机名称,交换机类型,是否需要持久化,是否需要自动删除,其它参数return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);}//绑定@Beanpublic Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue,@Qualifier("delayedExchange") CustomExchange delayedExchange){return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}
}

生产者

//发消息,基于插件的消息及延迟时间
@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable  Integer delayTime){log.info("当前时间:{},发送一条时长{}毫秒信息给延迟队列delayed.queue:{}",new Date().toString(),delayTime,message);rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,DelayedQueueConfig.DELAYED_ROUTING_KEY,message,msg->{//发送消息时延迟时间(毫秒)msg.getMessageProperties().setDelay(delayTime);return msg;});
}

消费者,创建DelayQueueConsumer类:

@Slf4j
@Component
public class DelayQueueConsumer {//监听消息@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUEE_NAME)public void receiveDelayQueue(Message message){String msg = new String(message.getBody());log.info("当前时间:{},收到延迟队列的消息:{}",new Date().toString(),msg);}}

测试:localhost:8080/ttl/sendDelayMsg/come on baby1/20000。localhost:8080/ttl/sendDelayMsg/come on baby2/2000

十一、发布确认高级

加入交换机或者队列两者有其中一者宕掉,消息都会丢失。

配置类:

生产者及消费者:

回调接口:

交换机确认

回退消息

十二、备份交换机

配置类:

结果分析:

幂性性:

使用场景:

代码实现:

十三、集群

惰性队列:

集群原理:

搭建集群:

镜像队列:

实现高可用负载均衡

十四、Federation

Exchange原理:

Exchange实现:

Queue实现:

Shovel:

http://www.lryc.cn/news/183459.html

相关文章:

  • Python 无废话-基础知识面向对象编程详解
  • 凉鞋的 Unity 笔记 106. 第二轮循环场景视图Sprite Renderer
  • Vue中如何进行分布式路由配置与管理
  • Solidity 合约漏洞,价值 38BNB 漏洞分析
  • 【C++】:类和对象(2)
  • 【GIT版本控制】--提交更改
  • 解决高分屏DPI缩放PC端百度网盘界面模糊的问题
  • 全能视频工具 VideoProc Converter 4K for mac中文
  • Vue中实现自定义编辑邮件发送到指定邮箱(纯前端实现)
  • 计算机专业毕业设计项目推荐11-博客项目(Go+Vue+Mysql)
  • QT实现TCP
  • PostgreSQL ash —— pgsentinel插件
  • 【刷题笔记10.5】LeetCode:排序链表
  • 三、【色彩模式与颜色填充】
  • karmada v1.7.0安装指导
  • OK3568 forlinx系统编译过程及问题汇总
  • JVM篇---第五篇
  • C/C++ 排序算法总结
  • 机器学习---RBM、KL散度、DBN
  • (c语言)有序序列合并
  • 小谈设计模式(18)—适配器模式
  • Python柱形图
  • 用OpenCV(Python)获取图像的SIFT特征
  • 阿里云ECS和轻量服务器有什么区别?
  • 华为云云耀云服务器L实例评测|安装搭建学生成绩管理系统
  • Audacity 使用教程:轻松录制、编辑音频
  • 深入了解“注意力”和“变形金刚”-第2部分
  • ​“债务飙升!美国一天内增加2750亿美元,金融震荡的前奏已拉开帷幕!”
  • 最新Uniapp软件社区-全新带勋章源码
  • 基于goravel的CMS,企业官网通用golang后台管理系统