当前位置: 首页 > news >正文

RabbitMQ 4.1.1初体验-队列和交换机

接上一篇博文
在这里插入图片描述

启动RabbitMQ

注意:根据版本,要提前安装Erlang的环境
这里用的版本是 RabbitMQ 4.0.9 Erlang 28.0.1

  • Windows
# 启动rabbitmq-server
D:\rabbitmq-server-windows-4.0.9\rabbitmq_server-4.0.9>sbin\rabbitmq-server.bat
  • Linux
    切换到安装目录的sbin目录下:
#启动
./rabbitmq-server  -detached

说明:
-detached 选项为后台启动运行rabbitmq;不加该参数表示前台启动;detach单词是“分离”的意思
rabbitmq的运行日志存放在rabbitmq安装目录的var目录下;
现在的目录是:/usr/local/rabbitmq_server-4.0.9/var/log/rabbitmq
detached单词 :独立的;超然的;单独的

查看RabbitMQ的状态

切换到sbin目录下执行:

./rabbitmqctl -n rabbit status 
# 或者
./rabbitmqctl  status

说明:-n rabbit 是指定节点名称为rabbit,目前只有一个节点,节点名默认为rabbit。 一个节点就是RabbitMQ服务器。
注意:此处-n rabbit 也可以省略

停止RabbitMQ

# 切换到sbin目录下执行
./rabbitmqctl shutdown

RabbitMQ管理命令

rabbitmqctl 是一个管理命令,可以管理rabbitmq的很多操作。
rabbitmqctl help可以查看一下有哪些操作
查看具体子命令 可以使用 rabbitmqctl help 子命令名称

用户管理

RabbitMQ Web控制台 需要身份验证和授权才能访问, 在控制台能够管理消息。创建用户,赋予用户角色(tag), 角色代表对Web控制台的使用权限
用户管理包括增加用户,删除用户,查看用户列表,修改用户密码。
这些操作都是通过rabbitmqctl管理命令来实现完成。

# 查看帮助
./rabbitmqctl help add_user# 查看当前用户列表
./rabbitmqctl list_users# 新增一个用户
语法:rabbitmqctl add_user Username  Password
示例: ./rabbitmqctl add_user admin 123456# 设置用户角色 说明:此处设置admin用户的角色为管理员角色
rabbitmqctl set_user_tags  User  Tag
示例:./rabbitmqctl set_user_tags admin administrator#查看用户权限
./rabbitmqctl list_permissions#设置用户权限
./rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"-p : 要设置虚拟主机
/  : 虚拟主机名称, / 根主机,默认有这个主机
后面的 “.*”是config , writer ,read 权限
# 说明:此操作是设置admin用户拥有操作虚拟主机“/”下的所有权限

在这里插入图片描述

虚拟主机(virtual host)?

RabbitMQ中的虚拟主机(virtual host)是一个逻辑隔离的概念,它能够允许多个用户、交换器、队列和绑定共存。每个虚拟主机在RabbitMQ中都有自己的权限系统,使得不同的团队或项目可以在同一个RabbitMQ服务器上工作,彼此之间不会有太大的干扰。

权限参考页面:https://www.rabbitmq.com/access-control.html,可以后面再看这个列表
在这里插入图片描述

通过web页面新建虚拟主机

新建虚拟主机my-virtual-host(tags:administrator),参见下图
在这里插入图片描述
建完后如下
在这里插入图片描述

Default Queue Type:

  • classic:经典队列,RabbitMQ默认的队列类型是“classic”,也就是直接的、传统的队列。在这种类型的队列中,消息被按照入队的顺序处理,每个消息都会被分发给一个消费者。
  • quorum:仲裁队列,是RabbitMQ从3.8.0版本,引入的一个新的队列类型,整个3.8.X版本,也都是在围绕仲裁队列进行完善和优化。仲裁队列相比Classic经典队列,在分布式环境下对消息的可靠性保障更高。官方文档中表示,未来会使用Quorum仲裁队列代替传统Classic队列。Quorum队列更适合于长期存在的队列,并且在对容错、数据安全方面有更严格要求的场景。相对于追求低延迟、非持久化等高级队列,Quorum队列提供了更可靠的数据复制机制,以满足对数据一致性和高可用性的要求。
  • Stream流式队列: Stream队列是RabbitMQ自3.9.0版本开始引入的一种新的数据队列类型。这种队列类型的消息是持久化到磁盘并且具备分布式备份的,更适合于消费者多,读消息非常频繁的场景。

交换机 (Exchange)

  • 交换器 (Exchange)类型

    • 1、Fanout Exchange(扇形)
    • 2、Direct Exchange(直连)
    • 3、Topic Exchange(主题)
    • 4、Headers Exchange(头部)
  • Fanout Exchange
    Fanout 扇形的,散开的; 扇形交换机
    投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;
    在这里插入图片描述

  • 关键代码
    application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.fanoutqueueA: queue.a
queueB: queue.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic FanoutExchange fanoutExchange(){//参数: 交换机名称return new FanoutExchange(exchangeName);}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){//构造方法return new Queue(queueA);}@Beanpublic Queue queueB(){//构造方法return new Queue(queueB);}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(FanoutExchange fanoutExchange,Queue queueA){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueA).to(fanoutExchange);}@Beanpublic Binding bindingB(FanoutExchange fanoutExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(fanoutExchange);}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用扇形交换机FanoutExchange";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//FanoutExchange不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果:
在这里插入图片描述

在这里插入图片描述

  • MQ的消息包含两部分
    • 消息体body
    • 消息属性MessageProperties

Direct Exchange
根据路由键精确匹配(一模一样)进行路由消息队列;
在这里插入图片描述

实操如下
application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.directqueueA: queue.direct.a
queueB: queue.direct.bspring:application:name: fanout-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic DirectExchange directExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.directExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定@Beanpublic Binding bindingA(DirectExchange directExchange,Queue queueA){//绑定队列queueA和交换机directExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(directExchange).with("info");}@Beanpublic Binding bindingB(DirectExchange directExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(directExchange).with("error");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用直连交换机DirectExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"info",message);rabbitTemplate.send(exchangeName,"error",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Topic Exchange
    通配符匹配,相当于模糊匹配;
    • # 匹配多个单词,用来表示任意数量(零个或多个)单词
    • * 匹配一个单词(必须有一个,而且只有一个),用.隔开的为一个单词:
      beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx,beijing.queue.a.b
      beijing.* == beijing.queue, beijing.xyz

发送时指定的路由键:lazy.orange.rabbit
在这里插入图片描述
示例代码
application.yml

#定义要使用的交换机和队列名称
exchange:name: exchange.topic
queueA: queue.topic.a
queueB: queue.topic.b
spring:application:name: topic-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic TopicExchange topicExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.topicExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(TopicExchange topicExchange,Queue queueA){//绑定队列queueA和交换机TopicExchange, 需要routingkey( info )return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");}@Beanpublic Binding bindingB(TopicExchange topicExchange,Queue queueB){//绑定队列queueA和交换机fanoutExchangereturn BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用主题交换机TopicExchange====";Message message = new Message(text.getBytes());//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//DirectExchange需要routingKeyrabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);rabbitTemplate.send(exchangeName,"lazy.orange.rabbit",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • Headers Exchange(用的比较少)
    基于每条消息属性中的headers属性进行匹配;
    在这里插入图片描述
    示例代码
#定义要使用的交换机和队列名称
exchange:name: exchange.headerqueueA: queue.header.a
queueB: queue.header.bspring:application:name: header-exchange#配置连接 rabbitmq服务器rabbitmq:#mq服务器的iphost: 127.0.0.1#访问端口号port: 5672#用户名称username: admin#密码password: 123456#虚拟主机virtual-host: my-virtual-host
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** 创建Exchange , Queue等对象* 1.创建Exchange* 2.创建Queue* 3.将Exchange和Queue绑定*/
@Configuration
public class RabbitConfig {@Value("${exchange.name}")private String exchangeName;@Value("${queueA}")private String queueA;@Value("${queueB}")private String queueB;//1.创建Exchange( 构造方法和Builder)/*** @Bean: 将方法的返回值对象放入到spring容器。*        这个方法返回值必须是对象*        默认这个bean在容器中的名称是方法名称*   name: 属性,指定bean的名称(id)*/@Beanpublic HeadersExchange headersExchange(){//参数: 交换机名称   构建器模式(buidler)return ExchangeBuilder.headersExchange(exchangeName).build();}//2.创建Queue(构造方法和Builder)@Beanpublic Queue queueA(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueA).build();}@Beanpublic Queue queueB(){// buidler模式, durable:表示持久化队列return QueueBuilder.durable(queueB).build();}//将Exchange和Queue绑定, 因为是fanout exchange无需routingkey参数@Beanpublic Binding bindingA(HeadersExchange headersExchange,Queue queueA){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();}@Beanpublic Binding bindingB(HeadersExchange headersExchange,Queue queueB){//匹配条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",2);headerValues.put("status","f");//绑定队列queueA和交换机HeadersExchangereturn BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();}}
import cn.hutool.core.date.DateUtil;
import jakarta.annotation.Resource;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;import java.util.HashMap;
import java.util.Map;@Service
public class ProductMessageService {@Value("${exchange.name}")private String exchangeName;/*** RabbitTemplate:发送消息的对象(RedisTemplate)*/@Resourceprivate RabbitTemplate rabbitTemplate;public void sendMessage() {//指定匹配条件,需要使用 Message的属性, 在header中增加条件Map<String, Object> headerValues = new HashMap<>();headerValues.put("type",1);headerValues.put("status","m");MessageProperties prop  = new MessageProperties();prop.setHeaders(headerValues);//使用 MessageBuilder创建消息//发送一个文本消息, mq中消息由: 消息体和消息属性两个部分组成String text = DateUtil.now()+ ":欢迎使用Header交换机HeaderExchange====";Message message = MessageBuilder.withBody(text.getBytes()).andProperties(prop).build();//发送消息 1:交换机名称 2:routingKey(路由键) 3.消息对象//HeaderExchange 不需要routingKeyrabbitTemplate.send(exchangeName,"",message);System.out.println("---------------->发送消息完成----------------");}
}

运行结果
在这里插入图片描述
在这里插入图片描述

默认交换机

  • 创建虚拟主机的时候就会创建默认交换机,默认交换机的名字是空字符串,默认交换机是直连交换机(Direct)。
  • 每新建一个队列,都会自动和默认交换机绑定,绑定的路由key是该队列的名字
  • 向默认交换机发送消息,交换机名指定为空字符串,路由key为队列的名字
http://www.lryc.cn/news/581300.html

相关文章:

  • 快速掌握Python编程基础
  • 结构型智能科技的关键可行性——信息型智能向结构型智能的转变(修改提纲)
  • 小架构step系列05:Springboot三种运行模式
  • 黑马点评系列问题之基础篇p7 06初识redis无法在虚拟机查到图形化界面存进去的键
  • 运算方法和运算器补充
  • TCP协议概念和特性
  • AI Agent与Agentic AI原理与应用(下) - 主流Agent平台、框架与项目技术拆解
  • 编程中的英语
  • cocos 打包安卓
  • Rust与PyTorch实战:精选示例
  • 机器学习--实践与分析
  • python优先队列使用
  • NAT、代理服务、内网穿透
  • Ubuntu 22.04 修改默认 Python 版本为 Python3 笔记
  • C#使用开源框架NetronLight绘制流程图
  • C++------模板初阶
  • JS 网页全自动翻译v3.17发布,全面接入 GiteeAI 大模型翻译及自动部署
  • 2025年的前后端一体化CMS框架优选方案
  • 【大模型入门】访问GPT的API
  • 【Halcon】WPF 自定义Halcon显示控件完整流程与 `OnApplyTemplate` 未触发的根本原因解析!
  • day 60 python打卡
  • ffplay6 播放器关键技术点分析 1/2
  • Windows内核并发优化
  • rk3128 emmc显示剩余容量为0
  • 深度学习5(深层神经网络 + 参数和超参数)
  • 力扣网编程55题:跳跃游戏之逆向思维
  • 前端相关性能优化笔记
  • Python数据容器-list和tuple
  • 四、jenkins自动构建和设置邮箱
  • PHP语法基础篇(九):正则表达式