当前位置: 首页 > news >正文

RabbitMQ手动应答与持久化

1.SleepUtil线程睡眠工具类

package com.hong.utils;/*** @Description: 线程睡眠工具类* @Author: hong* @Date: 2023-12-16 23:10* @Version: 1.0**/
public class SleepUtil {public static void sleep(int second) {try {Thread.sleep(1000*second);} catch (InterruptedException e) {Thread.currentThread().interrupt();}}
}

2.消息生产者

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 22:33* @Version: 1.0**/
public class Task3 {public static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

3.两个消费者

模拟一个处理速度快(Worker3),另一个处理速度慢(Worker4)

3.1.处理时间短

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失,放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker3 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker3等待接收消息,处理速度快");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(1);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

3.2.处理时间长

package com.hong.rabbitmq3;import com.hong.utils.RabbitMQUtil;
import com.hong.utils.SleepUtil;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;/*** @Description: 消息手动应答时不丢失, 放回队列重新消费* @Author: hong* @Date: 2023-12-16 23:05* @Version: 1.0**/
public class Worker4 {private static final String TASK_QUEUE_NAME = "ack_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();System.out.println("worker4等待接收消息,处理速度慢");DeliverCallback deliverCallback = (comsumerTag, message) -> {SleepUtil.sleep(20);System.out.println("接收到的消息:"+  new String(message.getBody(),"UTF-8"));//手动应答/*** 第一个参数:消息标识* 第二个参数是否批量:true批量*/channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};CancelCallback cancelCallback = var -> System.out.println(var + "消息消费被中断!");//手动应答falsechannel.basicConsume(TASK_QUEUE_NAME,false,deliverCallback,cancelCallback);}
}

4.结果

启动生产者后启动2个消费者,等消息bb接收到后,发送cc和dd
在这里插入图片描述
在这里插入图片描述
等Worker4接收到消息bb后将其关闭,发现原本该Worker4消费的消息dd并未丢失,重回队列被Worker3消费
在这里插入图片描述

5.持久化

5.1.队列持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;import java.util.Scanner;/*** @Description: 队列持久化* @Author: hong* @Date: 2023-12-17 22:52* @Version: 1.0**/
public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

5.2.消息持久化

package com.hong.rabbitmq4;import com.hong.utils.RabbitMQUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;import java.util.Scanner;/*** @Description: 队列持久化与消息持久化* @Author: hong* @Date: 2023-12-17 22:52* @Version: 1.0**/
public class Task4 {public static final String TASK_QUEUE_NAME = "persist_queue";public static void main(String[] args) throws Exception{Channel channel = RabbitMQUtil.getChannel();//队列持久化   true持久化channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Scanner scanner = new Scanner(System.in);System.out.println("请输入:");while (scanner.hasNext()){String message = scanner.next();//消息持久化  MessageProperties.PERSISTENT_TEXT_PLAINchannel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println("消息发送完成------" + message);}}
}

在这里插入图片描述

http://www.lryc.cn/news/262650.html

相关文章:

  • java使用枚举类型解决if-else大量堆积
  • 【数据结构】八大排序之直接插入排序算法
  • 网络编程『socket套接字 ‖ 简易UDP网络程序』
  • FreeSWITCH rtp endpoint recvonly
  • Hadoop和Spark的区别
  • 英文论文降重修改技巧 papergpt
  • DevOps搭建(十)-安装Harbor镜像仓库详细步骤
  • DDA 算法
  • 天猫数据平台-淘宝天猫数据-天猫销售数据分析:11月天猫平台滑雪运动装备行业销量翻倍!
  • 使用OpenCV和PIL库读取图片的区别
  • Amazon CodeWhisperer:AI 编程助手
  • Linux 使用 Anaconda+Uwsgi 部署 Django项目和前端项目
  • 分析若依的文件上传处理逻辑
  • Note3---初阶二叉树~~
  • ElasticSearch学习篇8_Lucene之数据存储(Stored Field、DocValue、BKD Tree)
  • ROS机器人入门
  • 30. 深度学习进阶 - 池化
  • 工业应用新典范,飞凌嵌入式FET-D9360-C核心板发布!
  • Webrtc 学习交流
  • 华为云之轻松搭建 Nginx 静态网站
  • 【pytorch】图像运行过程中,保证梯度情况下变换
  • 学习Java第70天,过滤器Filter简介
  • Ubuntu Desktop 22.04 设置 ssh 超时时间
  • 【微服务】Spring Aop原理深入解析
  • Spring Boot JSON中文文档
  • Flink系列之:State Time-To-Live (TTL)
  • 数据结构(Chapter Two -01)—线性表及顺序表
  • 【刷题笔记1】
  • 视频数据卡设计方案:120-基于PCIe的视频数据卡
  • Windows使用VNC Viewer远程桌面Ubuntu【内网穿透】