当前位置: 首页 > news >正文

微服务技术 RabbitMQ SpringAMQP P61-P76

B站学习视频icon-default.png?t=N7T8https://www.bilibili.com/video/BV1LQ4y127n4?p=61&vd_source=8665d6da33d4e2277ca40f03210fe53a

文档资料:

链接:https://pan.baidu.com/s/1P_Ag1BYiPaF52EI19A0YRw?pwd=d03r 
提取码:d03r

一 初始MQ

1. 同步通讯

2. 异步通讯

3. MQ常见架构

二 RabbitMQ 快速入门

1. RabbitMQ概述和安装 --单机部署

我们在Centos7虚拟机中使用Docker来安装

1.1.下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:从本地加载

在课前资料已经提供了镜像包:

上传到虚拟机中后,使用命令加载镜像即可:

docker load -i mq.tar

1.2.安装MQ

执行下面的命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

1.2. RabbitMQ概述和安装 --集群部署

2.常见消息模型

3.快速入门 --Basic Queue 简单队列模型

三 SpringAMQP

1. Basic Queue 简单队列模型

  • 结构

1.1 消息发送 

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class PublisherTest {@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

1.2 消息接收

  • 16572是UI端口  5672消息端口

import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.建立连接ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("127.0.0.1");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

2. SpringAMQP入门案例

2.1 介绍

2.2 案例 -- 发送消息

spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /
@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}
}

2.3 案例 --接收消息

spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: /listener:simple:prefetch: 1
@Component  //声明成 bean
public class SpringRabbitListener {// @RabbitListener(queues = "simple.queue")// public void listenSimpleQueue(String msg) {//     System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");// }@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}
}

3. Work Queue 工作队列模型

3.1 介绍

  • 接收信息
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalTime;
import java.util.Map;@Component  //声明成 bean
public class SpringRabbitListener {/*  @RabbitListener(queues = "simple.queue")public void listenSimpleQueue(String msg) {System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");}
*/@RabbitListener(queues = "simple.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20);}@RabbitListener(queues = "simple.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200);}
}
  • 发送信息

@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue() {String queueName = "simple.queue";String message = "hello, spring amqp!";rabbitTemplate.convertAndSend(queueName, message);}@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 1; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}
}
  • application.yml

    logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
    spring:rabbitmq:host: 127.0.0.1 # rabbitMQ的ip地址port: 5672 # 端口username: adminpassword: 123virtual-host: / #虚拟主机listener:simple:prefetch: 1 # 预取 每次只取一条消息,处理完才能获取下一条消息

4. 发布模型介绍

5. 发布、订阅模型-Fanout

5.1 利用SpringAMQP演示FanoutExchange的使用 ( 发布、订阅模型-Direct)

5.2 步骤一 在consumer服务生命Exchange、QUEUE、Binding

6. 发布、订阅模型-Direct

6.1 案例

7. 发布、订阅模型-Topic

8. 消息转换器

推荐使用json

http://www.lryc.cn/news/260091.html

相关文章:

  • BearPi Std 板从入门到放弃 - 先天神魂篇(3)(RT-Thread I2C设备 读取光照强度BH1750)
  • 中文分词演进(查词典,hmm标注,无监督统计)新词发现
  • Docker容器数据卷
  • chatGPT 国内版,嵌入midjourney AI创作工具
  • Yum仓库架构解析与搭建实践
  • ElementPlus中的分页逻辑与实现
  • 实验01:静态路由配置实验
  • C#中简单的继承和多态
  • 15、lambda表达式、右值引用、移动语义
  • spring boot 实现直播聊天室(二)
  • alibaba fastjson GET List传参 和 接收解析
  • API自动化测试是什么?我们该如何做API自动化测试呢?
  • PyTorch 的 10 条内部用法
  • Django、Echarts异步请求、动态更新
  • Mac部署Odoo环境-Odoo本地环境部署
  • 【✅面试编程题:如何用队列实现一个栈】
  • Windows本地的RabbitMQ服务怎么在Docker for Windows的容器中使用
  • YOLOv5改进 | 2023卷积篇 | AKConv轻量级架构下的高效检测(既轻量又提点)
  • 微信小程序:模态框(弹窗)的实现
  • uniapp交互反馈api的使用示例
  • XUbuntu22.04之HDMI显示器设置竖屏(一百九十八)
  • 如何用 Cargo 管理 Rust 工程系列 甲
  • Windows下ping IP+端口的方法
  • 【python】os.getcwd()函数详解和示例
  • Linux(二十一)——virtualenv安装成功之后,依然提示未找到命令(-bash: virtualenv: 未找到命令)
  • RNN介绍及Pytorch源码解析
  • Qt 文字描边(基础篇)
  • .360勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • Nginx(四层+七层代理)+Tomcat实现负载均衡、动静分离
  • 【前端】vscode 相关插件