当前位置: 首页 > news >正文

RabbitMQ学习笔记(消息发布确认,死信队列,集群,交换机,持久化,生产者、消费者)

 MQ(message queue):本质上是个队列,遵循FIFO原则,队列中存放的是message,是一种跨进程的通信机制,用于上下游传递消息。MQ提供“逻辑解耦+物理解耦”的消息通信服务。使用了MQ之后消息发送上游只需要依赖MQ,不需要依赖其它服务。

功能1:流量消峰

功能2:应用解耦

功能3:异步处理

MQ的分类:

1.Kafka

2.RabbitMQ

RabbitMQ概念:

四大核心概念:

交换机:

队列: 

 六大核心模式:

1.简单模式。2.工作模式。3.发布订阅模式。4.路由模式。5.主题模式。6.发布确认模式。

RabbitMQ工作原理:

Channer:信道,发消息的通道。

下载:

1. 官网地址:https://www.rabbitmq.com/download.html。参考的下载地址如下:Linux下安装RabbitMQ_rabbitmq下载_零碎de記憶的博客-CSDN博客

2.安装Erlang环境

yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz tcp_wrappers

3.下载Erlang,方式1:找到下面网址,在网址中下载rpm文件:

el/7/erlang-22.3.4.12-1.el7.x86_64.rpm - rabbitmq/erlang · packagecloud

或者直接输入下面指令下载rpm文件: 

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

然后输入下面的命令安装已下载的安装包:

yum localinstall erlang-22.3.4.12-1.el7.x86_64.rpm

4.下载RabbitMQ,输入下面的下载

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

 输入下面的命令进行本地安装:

yum localinstall rabbitmq-server-3.8.13-1.el7.noarch.rpm

5. 下载socat,检查是否已下载:

yum install socat -y

注意以下的操作都要在 /usr/local/software目录下查看: 

6.添加开机启动RabbitMQ服务:chkconfig rabbitmq-server on。启动rabbitmq /sbin/service rabbitmq-server start。

7.查看服务状态 /sbin/service rabbitmq-server status

8.停止服务 /sbin/service rabbitmq-server stop。重新查看服务状态。

10.开启web管理界面 sudo rabbitmq-plugins enable rabbitmq_management

11.查看防火墙状态:systemctl status firewalld。关闭防火墙:systemctl stop firewalld。关闭rabbit服务器输入:sudo rabbitmqctl stop。开启rabbit服务器输入:sudo rabbitmq-server -detached。

12.在浏览器中输入地址:Linux服务器ip地址:15672,可访问web管理界面。

13.用户名guest,密码默认,但无法登陆,无权限。

14.rabbitmqctl list_users查看用户。创建账号 rabbitmqctl add_user admin 123。设置用户角色为管理员 rabbitmqctl set_user_tags admin administrator。设置用户权限 rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"。

15.再经尝试可以重新登录:

创建Java开发环境

1.创建1个新项目,命名atguigu-rabbitmq,然后创建模块Module。GroupId可以填写:com.atguigu.rabbitmq,ArtifactId可以填rabbitmq-hello,选择quickstart:

导入依赖如下:

  <dependencies><!--rabbitmq依赖客户端--><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency><!--操作文件流的依赖--><dependency><groupId>commons-io</groupId><artifactId>commons-io</artifactId><version>2.6</version></dependency><dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version> <!-- 根据你的需求指定版本号 --></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>8</source><target>8</target></configuration></plugin></plugins></build>

在下图中,P是生产者,C是消费者。中间的框是一个队列-RabbitMQ代表使用者保留的消息缓存区。

生产者代码

public class producer {//队列名称public static final String QUEUE_NAME = "hello";//发消息public static void main( String[] args ) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:生成一个队列(队列名称,是否持久化,是否排他,自动删除,队列参数)//持久化:是否存储入磁盘,默认是将消息存储在内存中//排他:队列是否只供一个消费者消费,是否进行消息共享,true可以供多个消费者消费//自动删除:最后一个消费者断开连接后,该队列是否自动删除channel.queueDeclare(QUEUE_NAME,false,false,false,null);//第6步:发消息,(交换机,路由key本次是队列名,参数,发送的消息)String message = "hello world";channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("消息发送成功!!!");}
}

消费者代码

public class consumer {public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws IOException, TimeoutException {//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.136");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel = connection.createChannel();//第5步:声明,接收消息DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println(new String(message.getBody()));};//第6步:取消消息时的回调CancelCallback cancelCallback = consumerTag->{System.out.println("消息消费被中断");};//第7步:接收,(队列名,自动or手动,接收消息,回调)//1.消费哪个队列;2.消费成功后是否要自动应答true代表自动应答,false表示手动应答//3.消费者未成功消费的回调channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}
}

注意几点:1.确保rabbitmq处于开启的状态(开启方式见前面)2.最好让防火墙处于关闭的状态 3.最好通过方法左侧的开关按钮进行启动,确保启动是选择Current File。

 

工作队列:

工作队列:又称任务队列,主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

情况:生产者大量分发消息给队列,工作线程接收队列的消息,工作线程不止一个,三者关系时竞争关系,你一条我一条他一条,但要注意一个消息只能被处理一次,不能被处理多次。

重复性的代码可以被抽取成为工具类。

在java — com — atguigu — rabbitmq下创建utils包,工具类起名RabbitMqUtils,放入如下代码:

public class RabbitMqUtils {public static Channel getChannel() throws Exception{//第1步:创建一个连接工程ConnectionFactory factory = new ConnectionFactory();//第2步:输入工厂IP,用户名和密码——连接RabbitMQd队列factory.setHost("192.168.182.137");factory.setUsername("admin");factory.setPassword("123");//第3步:创建连接Connection connection = factory.newConnection();//第4步:获取信道Channel channel =  connection.createChannel();return channel;}
}

工作线程的更新后,worker01的代码如下:

public static final String QUEUE_NAME = "hello";public static void main(String [] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//声明队列,没有会报错//消息接收DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("接收到的消息:" + new String(message.getBody()));};CancelCallback cancelCallback = (consumerTag)->{System.out.println(consumerTag + "消息被取消消费接口回调逻辑");};System.out.println("c1等待接收消息......");channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);}

重复利用one包下的consumer类,将其更改为c2工作线程:

Task01作为生产者用于生产数据,与前面不同的是,Task01支持从IDEA控制台输入数据:

public class Task01 {public static final String QUEUE_NAME="hello";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);//从控制台当中接收信息Scanner scanner = new Scanner(System.in); //扫描控制台输入内容while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",QUEUE_NAME,null,message.getBytes());System.out.println("发送消息完成..");}}
}

  

消息应答:

概念:

自动应答:

手动应答:

手动应答好处,建议不批量应答,选择false:

 消息自动重新入队:

原本正常传输,C1突然失去连接,检测到C1断开连接,于是会让消息重新入队,原本的消息交由C2进行处理。

实验思路:写1个生产者,2个消费者,当关闭掉其中1个工作线程,消息不丢失,还被另一个工作线程接收。消费在手动应答时不丢失、放回队列中重新消费。

消息手动应答(生产者):

public class Task2 {
public static final String task_queue_name = "ack_queue";
public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.queueDeclare(task_queue_name,false,false,false,null);Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish("",task_queue_name,null,message.getBytes("UTF-8"));System.out.println("生产者发出消息:"+message);}
}
}

消息手动应答(消费者):

public class Work03 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C1等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(1);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}
public class Work04 {public static final String task_queue_name = "ack_queue";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();System.out.println("C2等待接收消息处理时间较短");DeliverCallback deliverCallback = (consumerTag,message)->{SleepUtils.sleep(30);System.out.println("接收到的消息:"+new String(message.getBody(),"UTF-8"));//1.消息的标记tag 2.是否批量应答channel.basicAck(message.getEnvelope().getDeliveryTag(),false);};//采用手动应答boolean autoAck = false;channel.basicConsume(task_queue_name,autoAck,deliverCallback,(consumerTag->{System.out.println(consumerTag + "消费者取消消费接口回调逻辑");}));}
}

实现效果:在生产者输入AA BB CC DD EE等消息,消费者1接收速度快,会立即打印AA CC EE等消息,消费者2接收速度慢,会在一段时间后接收到BB,此时如果关闭消费者2,则消费者1输出DD,表明消费在手动应答时不丢失、放回队列中重新消费。

持久化:

如果报错,说明原本的队列就是不持久化,此时无法设定持久化,只能先将队列删除然后再重新设定。

控制队列持久化,需要修改生产者声明函数的第2个参数:

消息持久化:

队列持久化和消息持久化不同,队列是MQ里的一个组件,消息是生产者发送的消息。

如果要让消息持久化,在发消息的时候就要通知队列。

更改的是生产者的信道的basicPublish的第3个参数,添加MessageProperties.PERSISTENT_TEXT_PLAIN

不公平分发: 

消费者处理任务的速度不一致,为了不让速度快的消费者长时间处于空闲状态,因此采用不公平分发。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

预取值:

前面N条数据分别交给谁处理,如下图就是前7条数据中,2条给C1,5条给C2

发布确认原理:

1.设置要求队列必须持久化:就算服务器宕机,队列也不至于消失。

2.设置要求队列中的消息也必须持久化。

3. 发布确认,消息保存到磁盘上之后,队列要告知生产者。

Channel channel = connection.createChannel();
channel.confirmSelect();

public static void main(String[] args){
}

单个发布确认:

是一种同步确认发布的方式,发布消息-确认消息-发布消息...必须要确认后才能继续发布,类似于一手交钱一手交货,缺点是发布速度很慢。

1. 创建com/atguigu/rabbitmq/four文件夹下的ConfirmMessage

public static void publishMessageIndividually() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();for(int i=0;i<MESSAGE_COUNT;i++){String message = i +"";channel.basicPublish("",queueName,null,message.getBytes());boolean flag = channel.waitForConfirms();if(flag){System.out.println("消息发送成功");}}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个单独确认消息,耗时:"+(end-begin)+"ms");}

批量发布确认:

public static void publishMessageBatch() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();int batchSize = 100; //批量确认消息的大小//批量发送消息,批量发布确认for(int i=0;i<MESSAGE_COUNT;i++){String message = i+"";channel.basicPublish("",queueName,null,message.getBytes());//判断达到100条消息的时候,批量确认一次if(i%batchSize==0) channel.waitForConfirms();}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个批量确认消息,耗时:"+(end-begin)+"ms");
}

异步发布确认:

map序列,key是消息序号(deliveryTag是消息的标识,multiple是是否为批量),value是消息内容,将消息每一条都编号,broker会对消息进行应答,分为两种一种是确认应答,另一种是未确认应答。消息生产者不需要等待接收方的消息,只需要负责发送消息即可,消息是否应答最终会以异步的形式回传,也就是说确认的时间可以是稍后的。

addConfirmListener单参数的是只能监听成功的,多参数的是可以监听成功也可以监听失败的,都是接口需要自己来写。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认long begin = System.currentTimeMillis();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{System.out.println("未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//发布确认}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");
}

 处理异步未确认消息:

最好的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用ConcurrentLinkedQueue这个队列在confirm callbacks与发布线程之间进行消息的传递。

public static void publishMessageAsync() throws Exception{Channel channel = RabbitMqUtils.getChannel(); //获取信道String queueName = UUID.randomUUID().toString();channel.queueDeclare(queueName,false,false,false,null);channel.confirmSelect();//开启发布确认/*线程安全有序的一个哈希表,适用于高并发的情况下1.轻松地将序号与消息进行关联2.轻松地批量删除条目只要给到序号3.支持高并发(多线程)*/ConcurrentSkipListMap<Long,String> outstandingConfirms = new ConcurrentSkipListMap<>();//消息确认成功,回调函数ConfirmCallback ackCallback = (deliveryTag, multiple)->{if(multiple){//2.删除掉已经确认的消息,剩下的就是未确认的消息ConcurrentNavigableMap<Long, String> confirmd =outstandingConfirms.headMap(deliveryTag);}else{outstandingConfirms.remove(deliveryTag);}System.out.println("确认的消息:"+deliveryTag);};//消息确认失败回调函数ConfirmCallback nackCallback = (deliveryTag, multiple)->{//3.打印一下未确认的消息都有哪些String message = outstandingConfirms.get(deliveryTag);System.out.println("未确认的消息是:"+message+"未确认的消息:"+deliveryTag);};//准备消息的监听器,监听哪些消息成功了,哪些消息失败了channel.addConfirmListener(ackCallback,nackCallback);long begin = System.currentTimeMillis();//批量发送消息for(int i=0;i<MESSAGE_COUNT;i++){String message="消息"+i;channel.basicPublish("",queueName,null,message.getBytes());//1.此处记录下所有发送的消息,消息的总和outstandingConfirms.put(channel.getNextPublishSeqNo(),message);}long end = System.currentTimeMillis();System.out.println("发布"+MESSAGE_COUNT+"个异步确认消息,耗时:"+(end-begin)+"ms");}
}

三种方式对比:

交换机:

一个消息可以被消费多次,需要通过交换机,仍旧遵循队列中的消息只能被消费一次。

 生产者生产的消息从不会直接发送到队列。生产者将消息发送到交换机。交换机负责接收来自生产者的消息,将消息推入队列。

Exchanges的类型:直接(direct),主题(topic),标题(headers),扇出(fanout)

消息能路由发送到队列中其实是由routingKey(bindingkey)绑定key指定的。

创建临时队列:

String queueName = channel.queueDedare().getQueue();

绑定:

根据Routing key来确定消息要发给哪个队列,如果Routing Key相同消息就可以发送给多个队列。

先添加一个队列queue1,再添加一个交换机exchange1,最后点击exchange1交换机,进入绑定菜单,然后输入绑定的队列是queue1,然后Routing key随便设置为123。

广播Fanout:

Fanout(扇出)是将接收到的所有消息广播到它知道的所有队列中。如果Routing Key相同则发送给队列以相同消息。

生产者:

public class EmitLog {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

消费者:

public class ReceiveLogs01 {public static final String EXCHANGE_NAME="logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//声明一个交换机//声明一个队列临时队列,队列的名称是随机的,当消费者断开与队列的连接时候,队列就删除了String queueName = channel.queueDeclare().getQueue();//绑定交换机与队列channel.queueBind(queueName,EXCHANGE_NAME,"");System.out.println("等待接收消息,把接收到消息打印在屏幕上......");DeliverCallback deliverCallback = (consumerTag,message)->{System.out.println("ReceiveLogs01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume(queueName,true,deliverCallback,consumerTag->{});}
}

效果:实现广播的功能

Direct路由交换机:

消费者1:

public class ReceiveLogsDirect01 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("console",false,false,false,null);channel.queueBind("console",EXCHANGE_NAME,"info"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect01控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("console",true,deliverCallback,consumerTag->{});}
}

消费者2:

public class ReceiveLogsDirect02 {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);channel.queueDeclare("disk",false,false,false,null);channel.queueBind("disk",EXCHANGE_NAME,"error"); //队列名称,交换机名称,RoutingkeyDeliverCallback deliverCallback =(consumerTag,message)->{System.out.println("ReceiveLogsDirect02控制台打印接收到的消息:"+new String(message.getBody(),"UTF-8"));};channel.basicConsume("disk",true,deliverCallback,consumerTag->{});}
}

生产者: 

public class DirectLogs {public static final String EXCHANGE_NAME="direct_logs";public static void main(String[] args) throws Exception {Channel channel = RabbitMqUtils.getChannel();Scanner scanner = new Scanner(System.in);while(scanner.hasNext()){String message = scanner.next();channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));System.out.println("生产者发出消息"+message);}}
}

效果:

如果【channel.basicPublish(EXCHANGE_NAME,"info",null,message.getBytes("UTF-8"));】的第2个参数填info就只会发送消息给消费者1,填写error就只会发送消息给消费者2。

Topics主题交换机:

发布(生产者)订阅(消费者)模式:

http://www.lryc.cn/news/180677.html

相关文章:

  • PyTorch - 模型训练损失 (Loss) NaN 问题的解决方案
  • 8、Nacos服务注册服务端源码分析(七)
  • MySQL使用Xtrabackup在线做主从
  • scala基础入门
  • 【Java-LangChain:面向开发者的提示工程-5】推断
  • 【C++】手撕vector(vector的模拟实现)
  • 智能指针那些事
  • Fiddler抓取手机https包的步骤
  • idea没有maven工具栏解决方法
  • levelDB引擎
  • IM同步服务
  • MySQL 运维常用脚本
  • ABC322刷题记
  • visual studio的安装及scanf报错的解决
  • React生命周期
  • SpringBoot整合RocketMQ笔记
  • 【【萌新的RiscV学习之在写代码之前对于关键路径的分析-11】】
  • A. Sequence with Digits
  • gitlab配置webhook限制提交注释
  • 蓝桥杯Python scratch C++选拔赛stema个人如何报名?
  • Cesium实现动态旋转四棱锥(2023.9.11)
  • 2023最新PS(photoshop)Win+Mac免费下载安装包及教程内置AI绘画-网盘下载
  • 【JAVA】为什么要使用封装以及如何封装
  • 18.示例程序(编码器接口测速)
  • 【超详细】Fastjson 1.2.24 命令执行漏洞复现-JNDI简单实现反弹shell(CVE-2017-18349)
  • 【牛客网】JZ39 数组中出现次数超过一半的数字
  • 【Mysql】Lock wait timeout exceeded; try restarting transaction
  • python生成中金所期权行权价
  • CentOS7.9 安装postgresql
  • qt线程介绍