当前位置: 首页 > news >正文

RabbitMQ日常使用小结

一、使用场景

削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。

RabbitMQ的基本概念

二、组成及工作流程

1.主要组成

Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。
VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue
Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。
Queue:消息队列。

2.工作流程

RabbitMQ的工作流程

生产者发送消息流程:
1、和Broker建立TCP连接。
2、和Broker建立通道。
3、通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)。消费者接收消息流程:
1、和Broker建立TCP连接
2、和Broker建立通道
3、监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、接收到消息。
6、ack回复。

三、交换机Exchange(默认direct)

交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。

1.交换机种类

Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。
直连交换机
fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。
广播/扇形交换机
topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。
主题交换机
headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。

2.交换机属性

Name:交换机名称
Durability:持久化标志,表明此交换机是否是持久化的
Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除
Arguments:依赖代理本身。

3.交换机状态

持久(durable)
暂存(transient)

4.消息确认机制(ACK)

自动ACK:消息一旦被接收,消费者自动发送ACK。
手动ACK:消息接收后,不会发送ACK,需要手动调用。

四、rabbitmq 客户端的使用

1.引入依赖

       <!-- rabbitmq 客户端依赖 --><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.8.0</version></dependency>

2.创建连接工具

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class MyRabbitMQUtils {public static Connection getConnel() throws Exception{//1 创建 ConnectionFactoryConnectionFactory factory = new ConnectionFactory() ;factory.setHost("127.0.0.1");//端口factory.setPort(5672);//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mqfactory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true);factory.setNetworkRecoveryInterval(3000);Connection connection = factory.newConnection();//   Channel channel = connection.createChannel();return connection;}
}

3.生产者

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 交换机名称private final static String EXCHANGE_NAME = "simple_exchange";// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[]args) throws Exception{Connection produceConnection = MyRabbitMQUtils.getConnel();Channel produceChannel = produceConnection.createChannel();// 建立交换机(广播)produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);/** 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* 3、props,消息的属性* 4、body,消息内容*/for(int i=0;i<10;i++){String message = "生产者发布的消息---!";message = message+i;produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());System.out.println(" Producer 发布'" + message + "'");}//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)produceChannel.close();produceConnection.close();}
}

4.消费者

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;public class Comsumer {// 队列名称private final static String QUEUE_NAME = "simple_queue";public static void main(String[] argv) throws Exception {Connection comsumerConnection = MyRabbitMQUtils.getConnel();Channel comsumerChannel = comsumerConnection.createChannel();/** 参数明细* 1、queue 队列名称* 2、durable 是否持久化* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间*/comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);//实现消费方法DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){/** 当接收到消息后此方法将被调用* @param consumerTag  消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume* @param envelope 信封,通过envelope* @param properties 消息属性* @param body 消息内容*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {//交换机String exchange = envelope.getExchange();//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收long deliveryTag = envelope.getDeliveryTag();// body 即消息体String msg = new String(body,"utf-8");System.out.println("Comsumer 获得: " + msg + "!");// 手动 ACKcomsumerChannel.basicAck(envelope.getDeliveryTag(),false);}};// 监听队列,第二个参数:是否自动进行消息确认。//参数:String queue, boolean autoAck, Consumer callback/*** 参数明细:* 1、queue 队列名称* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。* 3、callback,消费方法,当消费者接收到消息要执行的方法。*/comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);}
}

五、Spring中使用RabbitMQ

1.引入依赖

        <!-- AMQP 依赖 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.7.RELEASE</version></dependency><!--springboot测试依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><version>2.5.6</version></dependency>

2.更改配置

spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guest

3.把交换机、和队列加入IOC容器中

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// email队列public static final String QUEUE_EMAIL = "queue_email";// sms队列public static final String QUEUE_SMS = "queue_sms";// topics类型交换机public static final String EXCHANGE_NAME="topic.exchange";public static final String ROUTINGKEY_EMAIL="topic.#.email.#";public static final String ROUTINGKEY_SMS="topic.#.sms.#";//声明交换机@Bean(EXCHANGE_NAME)public Exchange exchange(){//durable(true) 持久化,mq重启之后交换机还在return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();}//声明email队列/***   new Queue(QUEUE_EMAIL,true,false,false)*   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列*   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false*   exclusive  表示该消息队列是否只在当前connection生效,默认是false*/@Bean(QUEUE_EMAIL)public Queue emailQueue(){return new Queue(QUEUE_EMAIL);}//声明sms队列@Bean(QUEUE_SMS)public Queue smsQueue(){return new Queue(QUEUE_SMS);}//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey@Beanpublic Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();}//ROUTINGKEY_SMS队列绑定交换机,指定routingKey@Beanpublic Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();}}

4.模拟业务发送消息

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
public class Send {@AutowiredRabbitTemplate rabbitTemplate;@Testpublic void sendMsgByTopics(){/*** 参数:* 1、交换机名称* 2、routingKey* 3、消息内容*/for (int i=0;i<5;i++){String message = "恭喜您,注册成功!userid="+i;rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);System.out.println(" [x] Sent '" + message + "'");}}
}

5.消息的监听及处理

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class Receive {//监听邮件队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.email.#","email.*"}))public void rece_email(String msg){System.out.println(" [邮件服务] received : " + msg + "!");}//监听短信队列@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),key = {"topic.#.sms.#"}))public void rece_sms(String msg){System.out.println(" [短信服务] received : " + msg + "!");}}

http://www.lryc.cn/news/70139.html

相关文章:

  • ​​​​​​​博物馆文物馆藏环境空气质量无线监控系统方案
  • 测试理论----Bug的严重程度(Severity)和优先级(Priority)的分类
  • 斯坦福、Nautilus Chain等联合主办的 Hackathon 活动,现已接受报名
  • 00后卷王,把我们这些老油条卷的辞职信都写好了........
  • JavaEE(系列12) -- 常见锁策略
  • 前端nginx接口跨域
  • 【国产虚拟仪器】基于 ZYNQ 的电能质量系统高速数据采集系统设计
  • Java前缀和算法
  • pico 的两个双核相关函数的延时问题
  • Doxygen源码分析: QCString类依赖的qstr系列C函数浅析
  • 华为OD机试之一种字符串压缩表示的解压(Java源码)
  • Microsoft Project Online部署方案
  • 飞浆AI studio人工智能课程学习(3)-在具体场景下优化Prompt
  • 企业工程行业管理系统源码-专业的工程管理软件-提供一站式服务
  • Ehcache 整合Spring 使用页面、对象缓存
  • Spring Cloud中的服务路由与负载均衡
  • rails routes的使用
  • Linux基础内容(21)—— 进程消息队列和信号量
  • STM32实现基于RS485的简单的Modbus协议
  • springboot服务端接口公网远程调试 - 实现HTTP服务监听【端口映射】
  • zabbix监控之javasnmp自定义监控
  • Inertial Explorer处理pospac数据总结
  • tps和qps的区别是什么?怎么理解
  • 【Java系列】深入解析枚举类型
  • 网络原理(五):IP 协议
  • MySQL---空间索引、验证索引、索引特点、索引原理
  • 选择合适的 MQTT 云服务:一文了解 EMQX Cloud Serverless、Dedicated 与 BYOC 版本
  • uvc驱动ioctl分析下
  • 数据库可视化神器,你在用哪一款呢
  • CMD与DOS脚本编程【第三章】