RabbitMQ 4.1.1初体验-队列和交换机
接上一篇博文
启动RabbitMQ
注意:根据版本,要提前安装Erlang的环境
这里用的版本是 RabbitMQ 4.0.9
Erlang 28.0.1
- Windows
# 启动rabbitmq-server
D:\rabbitmq-server-windows-4.0.9\rabbitmq_server-4.0.9>sbin\rabbitmq-server.bat
- Linux
切换到安装目录的sbin目录下:
#启动
./rabbitmq-server -detached
说明:
-detached 选项为后台启动运行rabbitmq;不加该参数表示前台启动;detach单词是“分离”的意思
rabbitmq的运行日志存放在rabbitmq安装目录的var目录下;
现在的目录是:/usr/local/rabbitmq_server-4.0.9/var/log/rabbitmq
detached单词 :独立的;超然的;单独的
查看RabbitMQ的状态
切换到sbin目录下执行:
./rabbitmqctl -n rabbit status
# 或者
./rabbitmqctl status
说明:-n rabbit 是指定节点名称为rabbit,目前只有一个节点,节点名默认为rabbit。 一个节点就是RabbitMQ服务器。
注意:此处-n rabbit 也可以省略
停止RabbitMQ
# 切换到sbin目录下执行
./rabbitmqctl shutdown
RabbitMQ管理命令
rabbitmqctl 是一个管理命令,可以管理rabbitmq的很多操作。
rabbitmqctl help可以查看一下有哪些操作
查看具体子命令 可以使用 rabbitmqctl help 子命令名称
用户管理
RabbitMQ Web控制台 需要身份验证和授权才能访问, 在控制台能够管理消息。创建用户,赋予用户角色(tag), 角色代表对Web控制台的使用权限
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
这些操作都是通过rabbitmqctl管理命令来实现完成。
# 查看帮助
./rabbitmqctl help add_user# 查看当前用户列表
./rabbitmqctl list_users# 新增一个用户
语法:rabbitmqctl add_user Username Password
示例: ./rabbitmqctl add_user admin 123456# 设置用户角色 说明:此处设置admin用户的角色为管理员角色
rabbitmqctl set_user_tags User Tag
示例:./rabbitmqctl set_user_tags admin administrator#查看用户权限
./rabbitmqctl list_permissions#设置用户权限
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"-p : 要设置虚拟主机
/ : 虚拟主机名称, / 根主机,默认有这个主机
后面的 “.*”是config , writer ,read 权限
# 说明:此操作是设置admin用户拥有操作虚拟主机“/”下的所有权限
虚拟主机(virtual host)?
RabbitMQ中的虚拟主机(virtual host)是一个逻辑隔离的概念,它能够允许多个用户、交换器、队列和绑定共存。每个虚拟主机在RabbitMQ中都有自己的权限系统,使得不同的团队或项目可以在同一个RabbitMQ服务器上工作,彼此之间不会有太大的干扰。
权限参考页面:https://www.rabbitmq.com/access-control.html,可以后面再看这个列表
通过web页面新建虚拟主机
新建虚拟主机my-virtual-host(tags:administrator),参见下图
建完后如下
Default Queue Type:
- classic:经典队列,RabbitMQ默认的队列类型是“classic”,也就是直接的、传统的队列。在这种类型的队列中,消息被按照入队的顺序处理,每个消息都会被分发给一个消费者。
- quorum:仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。Quorum队列更适合于长期存在的队列,并且在对容错、数据安全方面有更严格要求的场景。相对于追求低延迟、非持久化等高级队列,Quorum队列提供了更可靠的数据复制机制,以满足对数据一致性和高可用性的要求。
- Stream流式队列: Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。
交换机 (Exchange)
-
交换器 (Exchange)类型
- 1、Fanout Exchange(扇形)
- 2、Direct Exchange(直连)
- 3、Topic Exchange(主题)
- 4、Headers Exchange(头部)
-
Fanout Exchange
Fanout 扇形的,散开的; 扇形交换机
投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
-
关键代码
application.yml
#定义要使用的交换机和队列名称
exchange:name: exchange.fanoutqueueA: queue.a
queueB: queue.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。* 这个方法返回值必须是对象* 默认这个bean在容器中的名称是方法名称* name: 属性,指定bean的名称(id)*/@Beanpublic FanoutExchange fanoutExchange(){//参数: 交换机名称return new FanoutExchange(exchangeName);}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){//构造方法return new Queue(queueA);}@Beanpublic Queue queueB(){//构造方法return new Queue(queueB);}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(fanoutExchange);}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用扇形交换机FanoutExchange";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//FanoutExchange不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}
运行结果:
- MQ的消息包含两部分
- 消息体body
- 消息属性MessageProperties
Direct Exchange
根据路由键精确匹配(一模一样)进行路由消息队列;
实操如下
application.yml
#定义要使用的交换机和队列名称
exchange:name: exchange.directqueueA: queue.direct.a
queueB: queue.direct.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。* 这个方法返回值必须是对象* 默认这个bean在容器中的名称是方法名称* name: 属性,指定bean的名称(id)*/@Beanpublic DirectExchange directExchange(){//参数: 交换机名称 构建器模式(buidler)return ExchangeBuilder.directExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){//绑定队列queueA和交换机directExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(directExchange).with("info");}@Beanpublic Binding bindingB(DirectExchange directExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(directExchange).with("error");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用直连交换机DirectExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"info",message);rabbitTemplate.send(exchangeName,"error",message);System.out.println("---------------->发送消息完成----------------");}
}
运行结果
- Topic Exchange
通配符匹配,相当于模糊匹配;- # 匹配多个单词,用来表示任意数量(零个或多个)单词
- * 匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词:
beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx,beijing.queue.a.b
beijing.* == beijing.queue, beijing.xyz
发送时指定的路由键:lazy.orange.rabbit
示例代码
application.yml
#定义要使用的交换机和队列名称
exchange:name: exchange.topic
queueA: queue.topic.a
queueB: queue.topic.b
spring:application:name: topic-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。* 这个方法返回值必须是对象* 默认这个bean在容器中的名称是方法名称* name: 属性,指定bean的名称(id)*/@Beanpublic TopicExchange topicExchange(){//参数: 交换机名称 构建器模式(buidler)return ExchangeBuilder.topicExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){//绑定队列queueA和交换机TopicExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB(TopicExchange topicExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用主题交换机TopicExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);rabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);System.out.println("---------------->发送消息完成----------------");}
}
运行结果
- Headers Exchange(用的比较少)
基于每条消息属性中的headers属性进行匹配;
示例代码
#定义要使用的交换机和队列名称
exchange:name: exchange.headerqueueA: queue.header.a
queueB: queue.header.bspring:application:name: header-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。* 这个方法返回值必须是对象* 默认这个bean在容器中的名称是方法名称* name: 属性,指定bean的名称(id)*/@Beanpublic HeadersExchange headersExchange(){//参数: 交换机名称 构建器模式(buidler)return ExchangeBuilder.headersExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",2);headerValues.put("status","f");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//指定匹配条件,需要使用 Message的属性, 在header中增加条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");MessageProperties prop = new MessageProperties();prop.setHeaders(headerValues);//使用 MessageBuilder创建消息//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用Header交换机HeaderExchange====";Message message = MessageBuilder.withBody(text.getBytes()).andProperties(prop).build();//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//HeaderExchange 不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}
运行结果
默认交换机
- 创建虚拟主机的时候就会创建默认交换机,默认交换机的名字是空字符串,默认交换机是直连交换机(Direct)。
- 每新建一个队列,都会自动和默认交换机绑定,绑定的路由key是该队列的名字
- 向默认交换机发送消息,交换机名指定为空字符串,路由key为队列的名字