当前位置: 首页 > news >正文

模拟实现消息队列项目

项目介绍:

在实际的后端开发中, 尤其是分布式系统里, 跨主机之间使用生产者消费者模型, 也是非常普遍的需求。因此, 我们通常会把阻塞队列封装成一个独立的服务器程序, 并且赋予其更丰富的功能。 这样的服务程序称为消息队列 (Message Queue, MQ)。

消息队列有很多:RabbitMQ ,Kafka ,RocketMQ , ActiveMQ等;其中RabbitMQ比较知名,所以仿照 RabbitMQ 模拟实现一个简单的消息队列。

模拟实现的消息队列相当于是消息的发布-订阅的功能。

环境搭建:

protobuf:实现序列化和反序列化的框架,进行序列化和反序列化。

Muduo:用于底层通信的框架。

SQLite3:轻量化数据库。

第三方库的介绍:

Protobuf:(全称Protocol Buffer)是数据结构序列化和反序列化框架。

它具有以下特点:

• 语言无关、平台无关:即 ProtoBuf 支持 Java、C++、Python 等多种语言,支持 多个平台

• 高效:即比 XML 更小、更快、更为简单

• 扩展性、兼容性好:你可以更新数据结构,而不影响和破坏原有的旧程序

Muduo:是一个基于非阻塞IO和事件驱动的C++高并发TCP网络编程库。是一款基于主从Reactor模型的网络库。

reactor模型:基于事件触发的模型(基于epoll进行io时间监控)。

主从reactor:将io事件监控进行进一步的层次划分。

主reactor:只对新建连接进行监控(保证不受io阻塞影响,实现高效的新建连接获取)

从reactor:针对新建的链接进行io时间监控(进行io操作和业务处理)

主从reactor必然是一个多执行流的并发模式。

Muduo库常见接口:

TcpServer 类:搭建服务器

EventLoop类:事件监控,业务处理

SQLite:是一个进程内的轻量级数据库,它实现了自给自足的、无服务器的、零配置的、 事务性的 SQL 数据库引擎。

GTest:是一个跨平台的 C++单元测试框架,它提供了丰富的断言、致命和非致命判断、参数化等。

单元测试框架可以很清楚的看到几个测试案例通过了,几个没有通过。

C++11 异步操作:

std::future是C++11标准库中的一个模板类,它表示一个异步操作的结果。std::future可以帮助我们在需要的时候获取任务的执行 结果。std::future的一个重要特性是能够阻塞当前线程,直到异步操作完成,从而确保 我们在获取结果时不会遇到未完成的操作。

//std::launch::deffered在执行get获取异步结果的时候,才会执行add这个异步任务,表明该函数会被延迟调用,直到在future上调用get()或者wait()才会开始执行任务 

//std::launch::async内部会创建线程,异步的完成任务(表明函数会在自己创建的线程上运行)

C++11实现线程池:

线程池的工作思想: a. 用户传入要执行的函数,以及需要处理的数据(函数的参数),由线程池中的 工作线程来执行函数完成任务

管理的成员 1. 任务池:用vector维护的一个函数任务池子 2.互斥锁 & 条件变量: 实现同步互斥 3. 一定数量的工作线程:用于不断从任务池取出任务执行任务 4.结束运行标志:以便于控制线程池的结束。 

管理的操作: 1.入队任务:入队一个函数和参数 2.停止运行:终止线程池

项目需求分析:

具体的流程是:发布消息之后,消息发送到交换机上,交换机根据某个规则和消息队列进行bind绑定,将消息发送到对应的队列中,最后当消费者订阅了某个队列中的消息,就会将消息推送出去。

消息队列服务器:最核心的部分, 负责消息的存储和转发

虚拟机 (VirtualHost): 类似于 MySQL 的 "database", 是一个逻辑上的集合。一个 消息队列服务器上可以存在多个 VirtualHost。

交换机 (Exchange): 生产者把消息先发送到 Broker 的 Exchange 上,再根据不同 的规则, 把消息转发给不同的 Queue 

队列 (Queue): 真正用来存储消息的部分, 每个消费者决定自己从哪个 Queue 上 读取消息

绑定 (Binding): Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以 理解成 "多对多" 关系,使用一个关联表就可以把这两个概念联系起来。

上述数据结构, 既需要在内存中存储, 也需要在硬盘中存储 ;

内存存储: 方便使用 

硬盘存储: 重启数据不丢失

要实现的内容:
1.broker服务器:消息队列服务器。
2.消息发布客户端:向服务器发布消息。
3.消息订阅客户端:从服务器订阅消息。
AMQP协议:交换机、队列、绑定三者组合在一起叫做虚拟机。

模块划分:

 数据管理模块---交换机数据管理:

 数据管理模块---队列数据管理:

 数据管理模块---绑定数据管理: 

 数据管理模块---消息数据管理: 

 虚拟机数据管理模块:

 路由匹配模块:

 消费者管理模块:

 信道管理:

 连接管理:

 服务器模块:

整体流程图:

 项目框架目录:

队列消息管理:

交换机路由管理:

匹配算法讲解视频:第84个

队列消费者/订阅者管理:

信道管理设计之前需要进行网络通信协议的设计;因为Channel是针对Connection连接的一个通信信道,

客户端这边存在两个异步工作线程:

• 一个是muduo库中客户端连接的异步循环线程EventLoopThread。(io事件监控)

• 一个是当收到消息后进行异步处理的工作线程池。(收到的消息异步处理)

项目总结:

仿RabbitMQ实现一个简化版的消息队列组件,其内部 实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消 息推送功能。

项目中所用到的技术:基于muduo库实现底层网络通信服务器和客户端的搭建, 在应用层基于protobuf协议设计应用层协议接口,在数据管理上使用了轻量数据库 sqlite 来进行数据的持久化管理,以及基于AMQP模型的理解(broker服务器包括队列,交换机,绑定等整合到一起),实现整个消息队列项目技术的整合,并在项目的实现过程中使用gtest框架进行单元测试,完成项目的最终实现。

代码出现的问题:

mq_helper写的时候出现很多细节问题。

protoc --cpp_out=./mq_msg.proto中的./后面要有空格。

写函数时返回值类型写错了

标点符号。


需要无参构造,但是类里面没有无参构造,所以需要定义一个无参构造函数。


变量名避免和数据库重名


在删除交换机代码时,数据库写错了命令。


应用层协议采用lv协议解决粘包问题

代码链接:

https://gitee.com/feng-peijie/queue.git

http://www.lryc.cn/news/597149.html

相关文章:

  • 音视频学习(四十三):H264无损压缩
  • 《使用Qt Quick从零构建AI螺丝瑕疵检测系统》——3. QML入门:像搭积木一样构建UI
  • ESP32-S3学习笔记<4>:I2C的应用
  • DeepSeek 助力 Vue3 开发:打造丝滑的日历(Calendar),日历_家庭维护示例(CalendarView01_31)
  • WebGIS 中常用空间数据格式
  • 2025暑期—06神经网络-常见网络3
  • 2025暑期—06神经网络-常见网络2
  • 2026 拼多多秋招内推码(提前批)
  • 为什么设置 git commit签名是公钥而不是私钥?
  • yo easy-ui5生成项目,ui5版本降级处理
  • Tang Prime 20K板I2S输入输出例程
  • Hexo - 免费搭建个人博客01 - 安装软件工具
  • Java应用程序内存占用分析
  • 大致自定义文件I/O库函数的实现详解(了解即可)
  • 软件开发、项目开发基本步骤
  • Java从入门到精通!第十二天(泛型)
  • 搭建 Android 开发环境JAVA+AS
  • 阿里云ODPS十五周年重磅升级发布:为AI而生的数据平台
  • HTTP性能优化终极指南:从协议原理到企业级实践
  • k8s pvc是否可绑定在多个pod上
  • 【Kubernetes】集群启动nginx,观察端口映射,work节点使用kubectl配置
  • 优化 Elasticsearch JVM 参数配置指南
  • 每日一算:华为-批萨分配问题
  • 谷粒商城篇章13--P340-P360--k8s/KubeSphere【高可用集群篇一】
  • 常用的正则表达式
  • 代码随想录算法训练营第五十二天|图论part3
  • 图论的题目整合(Dijkstra)
  • 【图论,拓扑排序】P1347 排序
  • 算法竞赛备赛——【图论】最小生成树
  • Modbus协议详解与c#应用