当前位置: 首页 > news >正文

模拟实现消息队列项目(完结) -- 基于MQ的生产者消费者模型

目录

前言

1. 生产者

2. 消费者

3. 启动消息队列服务器

4. 运行效果

 结语


前言

        在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!


1. 生产者

我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.

我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.

1. 实例化创建连接的工厂类

2. 设置消息队列服务器的IP地址以及端口号

3. 新建一个连接,创建Channel,交换机,队列

4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器

5. 关闭通道,关闭连接

package com.example.demo.demo;import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;/*** Created with IntelliJ IDEA.* Description:生产者  通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:06*/
public class DemoProducer {public static void main(String[] args) throws IOException, InterruptedException {System.out.println("启动生产者");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建交换机和队列channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);// 创建一个消息并发送byte[] body = "hello".getBytes();boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);System.out.println("消息投递完成! ok=" + ok);Thread.sleep(500);channel.close();connection.close();}
}

2. 消费者

消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.

1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.

2. 消费者需要循环等待消息队列的响应,等待消费.

package com.example.demo.demo;import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;import java.io.IOException;
/*** Created with IntelliJ IDEA.* Description:消费者  通常是一个单独的服务器程序* User: YAO* Date: 2023-08-03* Time: 16:07*/
public class DemoConsumer {public static void main(String[] args) throws MqException, InterruptedException, IOException {System.out.println("启动消费者!");ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(9090);Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);channel.queueDeclare("testQueue", true, false, false, null);channel.basicConsume("testQueue", true, new Consumer() {@Overridepublic void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {System.out.println("[消费数据] 开始!");System.out.println("consumerTag=" + consumerTag);System.out.println("basicProperties=" + basicProperties);String bodyString = new String(body, 0, body.length);System.out.println("body=" + bodyString);System.out.println("[消费数据] 结束!");}});// 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.while (true) {Thread.sleep(500);}}
}

3. 启动消息队列服务器

在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.

package com.example.demo;import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;import java.io.IOException;@SpringBootApplication
public class DemoApplication {public static ConfigurableApplicationContext context;public static void main(String[] args) throws IOException {context = SpringApplication.run(DemoApplication.class, args);BrokerServer brokerServer = new BrokerServer(9090);brokerServer.start();}}

4. 运行效果

1. 服务器启动:

2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存

3. 启动生产者进行投递消息

上述就是按照我们自定义的应用层协议进行发送请求. 

我们再来看服务器这边的日志:

4. 启动消费者进行消费消息 

 我们再来看服务器这边日志


 结语

         以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️

        完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇

模拟实现消息队列icon-default.png?t=N6B9https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq

http://www.lryc.cn/news/116510.html

相关文章:

  • 专业商城财务一体化-线上商城+进销存管理软件,批发零售全行业免费更新
  • 深度思考mysql面经
  • 2023-08-09力扣每日一题
  • [23] Instruct 3D-to-3D: Text Instruction Guided 3D-to-3D conversion
  • 设计模式行为型——访问者模式
  • vue3官网文档学习、复习笔记(快速上手)
  • 0基础学习VR全景平台篇 第81篇:全景相机-临云镜如何直播推流
  • 分数线划定
  • 考研C语言进阶题库——更新26-30题
  • 用C语言实现定积分计算(包括无穷积分/可自定义精度)
  • 使用Presto、Trino数据库时提示“The datetime zone id ‘GMT+08:00‘ is not recognised”
  • C# BeginInvoke 加 EndInvoke实现异步操作
  • “华为杯”研究生数学建模竞赛2015年-【华为杯】B题:数据的多流形结构分析(续)
  • R语言APSIM模型高级应用及批量模拟
  • 【硬件设计】模拟电子基础三--集成运算放大电路
  • JavaWeb(11)——前端综合案例5(小黑记事本)
  • 在使用TensorFlow的时候内部报错:内部某个方法或属性不存在
  • dubbo之高可用
  • gitee代码扫描js代码,降低复杂度,减少if-else判断的处理方法
  • MySQL及SQL语句(3)
  • MySQL 查询语句大全
  • 【Axure高保真原型】账单列表和详情
  • 嵌入式面试题1
  • base64转二进制流,file文件
  • 各种查找算法的效率分析
  • 微报告下载!市场不确定性周期下的激光雷达前装赛道
  • Java版企业电子招标采购系统源码Spring Cloud + Spring Boot +二次开发+ MybatisPlus + Redis tbms
  • 并网逆变器学习笔记6---三电平SVPWM下的连续和不连续调制
  • TS协议之PES(ES数据包)
  • 银河麒麟V10 SP3 X86 二进制文件部署 mysql-5.7.29 GTID 半同步复制的双主架构