C++ 仿RabbitMQ实现消息队列项目
C++ 仿RabbitMQ实现消息队列项目
目录
- C++ 仿RabbitMQ实现消息队列项目
- 1.引言
- 2. 项目介绍
- 3. 需求分析
- 3.1 核心概念
- 3.2 核心API
- 3.3 交换机类型
- 3.4 持久化
- 3.5 网络通信
- 3.6 消息应答
- 4. 模块划分
- 4.1 服务端模块
- 4.1.1 持久化数据管理中心模块
- 4.1.2 虚拟机管理模块
- 4.1.3 交换路由模块
- 4.1.4 消费者管理模块
- 4.1.5 信道管理模块
- 4.1.6 连接管理模块
- 4.1.7 Broker服务器模块
- 4.2 客户端模块
- 4.2.1 消费者管理模块
- 4.2.2 信道请求模块
- 4.2.3 通信连接模块
- 4.3 项目模块关系图
- 5. 消息队列服务端核心模块实现
- 5.1 项目创建
- 5.2 日志打印工具
- 5.3 实用Helper工具
- 5.3.1 文件基础操作
- 5.3.2 SQLite基础操作类
- 5.3.3 字符串操作类
- 5.3.4 UUID生成器类
- 5.4 消息分类定义&交换机类型定义
- 5.5 交换机数据管理
- 5.6 队列数据管理
- 5.7 绑定信息(交换机-队列)管理
- 5.8 队列消息管理
- 5.9 虚拟机管理
- 5.10 交换机路由管理
- 5.11 队列消费者/订阅者管理
- 5.12 信道管理
- 5.13 连接管理
- 6. 网络通信协议设计
- 6.1 需求确认
- 6.2 设计应用层协议
- 6.3 定义请求/响应参数
- 7. 服务器模块实现
- 8. 客户端模块实现
- 8.1 订阅者模块
- 8.2 信道管理模块
- 8.3 异步工作线程实现
- 8.4 连接管理模块
- 9. 案例:基于MQ的生产者消费者模型
- 9.1 生产者客户端的实现
- 9.2 消费者客户端的实现
- 10.项目总结
1.引言
在后端开发中,尤其是分布式系统中,跨主机间的生产者-消费者模型是非常普遍的需求。消息队列(Message Queue, MQ)作为一种高效的异步通信机制,能够很好地解决这类问题。其中RabbitMQ是一个非常知名的、功能强大且广泛使用的消息队列中间件。
本项目使用C++语言仿照RabbitMQ实现了一个简化版的消息队列系统,主要功能包括:
核心消息队列功能:实现了生产者、消费者、中间人(Broker)等核心概念,支持消息的发布(Publish)和订阅(Subscribe)模式
多种交换机类型:支持Direct、Fanout和Topic三种交换机类型,满足不同路由需求
持久化机制:消息、队列、交换机和绑定关系都支持持久化存储,确保服务器重启后数据不丢失
高并发网络通信:基于muduo网络库实现高性能服务器,采用自定义应用层协议和Protobuf序列化
模块化设计:将系统划分为服务端和客户端多个模块,结构清晰,易于扩展
2. 项目介绍
-
在后端开发中,尤其是分布式系统里,跨主机之间使用生产者消费者模型,是非常普遍的需求。因此,我们通常会把阻塞队列封装成一个独立的服务器程序,并且赋予其丰富的功能。这样的服务程序我们就称为消息队列(Message Queue,MQ)。其中RabbitMQ是一个非常知名的、功能强大且广泛使用的消息队列。本项目就仿照RabbitMQ模拟实现一个简单的消息队列。
-
开发环境:Linux(Ubuntu-22.04)、VSCode/Vim、g++/gdb、Makefile
-
技术选型:
- 开发主语言:C++
- 序列化框架:
Protobuf
二进制序列化 - 网络通信:自定义应用层协议+
muduo
库:对TCP长连接的封装、并且使用epoll
的事件驱动模式,实现高并发服务器与客户端 - 源数据信息数据库:
SQLite3
- 单元测试框架:
Gtest
3. 需求分析
3.1 核心概念
-
生产者(Producer)、消费者(Consumer)、中间人(Broker)、发布(Publish)、订阅(Subscribe)
-
生产者 : 消费者 = 1:1
-
生产者 : 消费者 = n:n
-
Broker Server 是最核心的部分,负责消息的存储和转发。
-
而在 AMQP(Advanced Message Queuing Protocol,高级消息队列协议,一个提供统一消息服务的应用层标准高级消息队列协议,为面向消息的中间件设计,使得遵从该规范的客户端应用和消息中间件服务器的全功能互操作成为可能)模型中,也就是消息中间件服务器 Broker 中,又存在以下概念:
- 虚拟机(VirtualHost)类似于 MySQL 的 “database”,是一个逻辑上的集合,一个 BrokerServer 上可以存在多个 VirtualHost;
- 交换机(Exchange)是生产者把消息先发送到 Broker 的 Exchange 上,再根据不同的规则把消息转发给不同的 Queue;
- 队列(Queue)是真正用来存储消息的部分,每个消费者决定自己从哪个 Queue 上读取消息;
- 绑定(Binding)是 Exchange 和 Queue 之间的关联关系,Exchange 和 Queue 可以理解成 “多对多” 关系,使用一个关联表就可以把这两个概念联系起来;
- 消息(Message)是传递的内容。
- 所谓的 Exchange 和 Queue 可以理解成 “多对多” 关系,和数据库中的 “多对多” 一样,意思是:一个Exchange 可以绑定多个 Queue(可以向多个 Queue 中转发消息),一个 Queue 也可以被多个 Exchange 绑定(一个 Queue 中的消息可以来自于多个 Exchange)。
-
上述数据结构,既需要在内存中存储,也需要在硬盘中存储。
3.2 核心API
-
对于 Broker 来说,要实现以下核心 API 来实现消息队列的基本功能:
-
创建交换机 (exchangeDeclare);
-
销毁交换机 (exchangeDelete);
-
创建队列 (queueDeclare);
-
销毁队列 (queueDelete);
-
创建绑定 (queueBind);
-
解除绑定 (queueUnbind);
-
发布消息 (basicPublish);
-
订阅消息 (basicConsume);
-
确认消息 (basicAck);
-
取消订阅 (basicCancel)。
-
-
另一方面,Producer 和 Consumer 则通过网络的方式远程调用这些 API,实现生产者消费者模型。
-
关于 VirtualHost:对于 RabbitMQ 来说,VirtualHost 也是可以随意创建删除的,此处暂时不做这部分功能。
3.3 交换机类型
-
RabbitMQ主要支持四种交换机类型:
- Direct
- Fanout
- Topic
- Header
- 其中Header方式比较复杂且比较少见,常用的是前三种交换机类型,项目中也主要实现这三种。
- Direct方式是生产者发送消息时直接指定被该交换机绑定的队列名;
- Fanout方式是生产者发送的消息会被复制到该交换机的所有队列中;
- Topic方式是绑定队列到交换机上时指定一个字符串为bindingKey,发送消息指定一个字符串为routingKey,当routingKey和bindingKey满足一定的匹配条件时则把消息投递到指定队列。
3.4 持久化
Exchange、Queue、Binding、Message等数据都有持久化需求,当程序重启 / 主机重启,保证上述内容不丢失。
3.5 网络通信
-
生产者和消费者都是客户端程序,Broker 则是作为服务器,通过网络进行通信。
-
在网络通信的过程中,客户端部分要提供对应的 API 来实现对服务器的操作,包括:
- 创建 Connection;
- 关闭 Connection;
- 创建 Channel;
- 关闭 Channel;
- 创建队列 (queueDeclare);
- 销毁队列 (queueDelete);
- 创建交换机 (exchangeDeclare);
- 销毁交换机 (exchangeDelete);
- 创建绑定 (queueBind);
- 解除绑定 (queueUnbind);
- 发布消息 (basicPublish);
- 订阅消息 (basicConsume);
- 确认消息 (basicAck);
- 取消订阅 (basicCancel)。
-
可以看到,在 Broker 的基础上,客户端还要增加 Connection 操作和 Channel 操作。
-
Connection 对应一个 TCP 连接,Channel 则是 Connection 中的逻辑通道。
-
一个 Connection 中可以包含多个 Channel,Channel 和 Channel 之间的数据是独立的,不会相互干扰。这样做主要是为了能够更好地复用 TCP 连接,达到长连接的效果,避免频繁的创建关闭 TCP 连接。
-
Connection 可以理解成一根网线,Channel 则是网线里具体的线缆。
3.6 消息应答
-
被消费的消息需要进行应答。应答模式分成两种:
- 自动应答(消费者只要消费了消息,就算应答完毕了,Broker直接删除这个消息)
- 手动应答(消费者手动调用应答接口,Broker收到应答请求之后,才真正删除这个消息)。手动应答的目的是为了保证消息确实被消费者处理成功了,在一些对于数据可靠性要求高的场景比较常见。
4. 模块划分
4.1 服务端模块
4.1.1 持久化数据管理中心模块
-
在数据管理模块中管理交换机、队列、队列绑定、消息等部分数据。
-
交换机管理:
a. 管理信息:名称,类型,是否持久化标志,是否(无人使用时)自动删除标志,其他参数;
b. 管理操作:恢复历史信息,声明,删除,获取,判断是否存在。
-
队列管理:
a. 管理信息:名称,是否持久化标志,是否独有标志,是否(无人使用时)自动删除标志,其他参数;
b. 管理操作:恢复历史信息,声明,删除,获取,判断是否存在。
-
绑定管理:
a. 管理信息:交换机名称,队列名称,绑定主题;
b. 管理操作:恢复历史信息,绑定,解绑,解除交换机关联绑定信息,解除队列关联绑定信息,获取交换机关联绑定信息。
-
消息管理:
a. 管理信息:
i. 属性:消息ID,路由主题,持久化模式标志;
ii. 消息内容;
iii. 有效标志(持久化需要);
iv. 持久化位置(内存中);
v. 持久化消息长度(内存中);
b. 管理操作:恢复历史信息,向指定队列新增消息,获取指定队列队首消息,确认移除消息。
-
-
这几个核心概念数据都需要在内存和硬盘中存储,以内存存储为主保证快速查找信息进行处理,以硬盘存储为辅保证服务器重启后之前信息可正常保持。
4.1.2 虚拟机管理模块
-
因为交换机/队列/绑定都是基于虚拟机为单元整体进行操作的,因此虚拟机是对以上数据管理模块的整合模块。
-
虚拟机管理信息:
a. 交换机数据管理模块句柄;
b. 队列数据管理模块句柄;
c. 绑定数据管理模块句柄;
d. 消息数据管理模块句柄。
-
虚拟机对外操作:
a. 提供虚拟机内交换机声明,交换机删除操作;
b. 提供虚拟机内队列声明,队列删除操作;
c. 提供虚拟机内交换机-队列绑定,解除绑定操作(交换机和队列必须存在);
d. 获取交换机相关绑定信息。(一条信息要发布给指定交换机的时候,交换机获取所有的绑定信息,来确定消息要发布到哪个队列)
e.获取指定队列的消息,以及对指定队列的指定消息进行确认
-
虚拟机管理操作:
a. 创建虚拟机;
b. 查询虚拟机;
c. 删除虚拟机。
-
4.1.3 交换路由模块
-
当客户端发布一条消息到交换机后,这条消息应该被入队到该交换机绑定的哪些队列中?交换路由模块就是决定这件事情的。
-
在绑定信息中有一个binding_key,而每条发布的消息中有一个routing_key,能否入队取决于两个要素:交换机类型和key。
- 广播:将消息入队到该交换机的所有绑定队列中;
- 直接:将消息入队到绑定信息中binding_key与消息routing_key一致的队列中;
- 主题:将消息入队到绑定信息中binding_key与routing_key是匹配成功的队列中。
-
binding_key:队列发布的匹配规则
- 是由数字字母下划线构成的,并且使用
.
分成若干部分 - 例如:
news.music.#
,这用于表示交换机绑定的当前队列是一个用于发布音乐新闻的队列。 - 支持
*
和#
两种通配符,但是*
,#
只能作为.
切分出来的独立部分,不能和其他数字字母混用 - 比如
a.*.b
是合法的,a.*a.b
是不合法的 *
可以匹配任意一个单词(注意是单词不是字母)#
可以匹配任意零个或者多个单词(注意是单词不是字母)。
- 是由数字字母下划线构成的,并且使用
-
routing_key:消息的发布规则
- 是由数据、字母和下划线构成,并且可以使用
.
划分成若干部分 - 例如:
news.music.pop
,这用于表示当前发布的消息是一个流行音乐的新闻。
- 是由数据、字母和下划线构成,并且可以使用
4.1.4 消费者管理模块
-
消费者管理是以队列为单元的,因为每个消费者都会在开始的时候订阅一个队列的消息,当队列中有消息后,会将队列消息轮询推送给订阅了该队列的消费者。
-
因此操作流程通常是,从队列关联的消息管理中取出消息,从队列关联的消费者中取出一个消费者,然后将消息推送给消费者(这就是发布订阅中负载均衡的用法)。
-
消费者信息:
a. 标识;
b. 订阅队列名称;
c. 自动应答标志(决定了一条消息推送给消费者后,是否需要等待收到确认后再删除消息);
d. 消息处理回调函数指针(一个消息发布后调用回调,选择消费者进行推送…),其函数签名为:void(const std::string& tag, const BasicProperties& p, const std::string& body)。
-
消费者管理功能包括:添加、删除、轮询获取指定队列的消费者,以及移除队列所有消费者等操作。
-
4.1.5 信道管理模块
-
本质上,在AMQP模型中,除了通信连接Connection概念外,还有一个Channel的概念,Channel是针对Connection连接的一个更细粒度的通信信道,多个Channel可以使用同一个通信连接Connection进行通信,但是同一个Connection的Channel之间相互独立。
-
一旦某个客户端要关闭连接,关闭的不是连接,而是自己对应的通信通道,关闭信道我们就需要将客户端的订阅给取消。
-
而信道模块就是再次将上述模块进行整合提供服务的模块,主要功能包括:
-
管理信息:
a. 信道ID;
b. 信道关联的消费者句柄;
c. 信道关联的连接句柄;
d. 信道关联的虚拟机句柄;
e. 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成。整个服务器共用一个线程池,所有信道都是通过同一个线程池进行异步操作的);
-
管理操作:
a. 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息);
b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息);
c. 提供绑定&解绑队列操作;
d. 提供订阅&取消订阅队列消息操作;
e. 提供发布&确认消息操作。
-
4.1.6 连接管理模块
-
本质上,我们仿照实现的服务器是通过 muduo 库来实现底层通信的,而这里的连接管理,更多的是对 muduo 库中的 Connection 进行二次封装管理,并额外提供项目所需操作。
-
当一个连接要关闭的时候,就应该把连接关联的信道全部关闭,因此也有数据管理至少要管理关联的信道。
-
管理信息包括:
a. 连接关联的信道;
b. 连接关联的 muduo 库 Connection。
-
管理操作包括:新增连接,删除连接,获取连接,打开信道,关闭信道。
4.1.7 Broker服务器模块
-
整合以上所有模块,并搭建网络通信服务器,实现与客户端网络通信,能够识别客户端请求,并提供客户端请求的处理服务。
-
管理信息:
a. 虚拟机管理模块句柄;
b. 消费者管理模块句柄;
c. 连接管理模块句柄;
d. 工作线程池句柄;
e. muduo库通信所需元素。
4.2 客户端模块
4.2.1 消费者管理模块
-
消费者在客户端的存在感比较低,因为在用户的使用角度中,只要创建一个信道后,就可以通过信道完成所有的操作,因此对于消费者的感官更多是在订阅的时候传入了一个消费者标识,且当前的简单实现也仅仅是一个信道只能创建订阅一个队列,也就是只能创建一个消费者,它们一一对应,因此更是弱化了消费者的存在。
-
消费者信息:
a. 标识;
b. 订阅队列名称;
c. 自动应答标志(决定了一条消息推送给消费者后,是否需要等待收到确认后再删除消息);
d. 消息处理回调函数指针(当消费者订阅了某个队列的消息,这个队列有了消息后,就会将消息推送给这个客户端,这时候收到消息则使用回调函数进行处理)。
-
消费者管理:添加,删除,轮询获取指定队列的消费者,移除队列所有消费者等操作。
-
4.2.2 信道请求模块
-
与服务端的信道类似,客户端这边在 AMQP 模型中,也是除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。
-
客户端的信道与服务端的信道是一一对应的,服务端信道提供的服务,客户端都有。相当于,服务端为客户端提供服务,客户端为用户提供服务。
-
-
信道管理信息:
a. 信道 ID;
b. 信道关联的通信连接;
c. 信道关联的消费者;
d. 请求对应的响应信息队列(这里队列使用 hash 表,以便于查找指定的响应);
e. 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是 muduo 库的通信是异步的,因此需要我们自己在收到响应后,通过判断是否是等待的指定响应来进行同步)。
f.线程池句柄(对推送过来的消息进行回调处理,处理过程通过工作线程来进行)
-
信道管理操作:
a. 提供创建信道操作;
b. 提供删除信道操作;
c. 提供声明交换机操作(强断言-有则 OK,没有则创建);
d. 提供删除交换机;
e. 提供创建队列操作(强断言-有则 OK,没有则创建);
f. 提供删除队列操作;
g. 提供交换机-队列绑定操作;
h. 提供交换机-队列解除绑定操作;
i. 提供添加订阅操作;
j. 提供取消订阅操作;
k. 提供发布消息操作。
-
4.2.3 通信连接模块
-
向用户提供一个用于实现网络通信的 Connection 对象,从其内部可创建出粒度更轻的 Channel 对象,用于与服务端进行网络通信。
-
对于用户来说,所有的服务都是通过信道完成的,信道在用户的角度就是一个通信信道,因此所有的请求都是通过信道来完成的。
-
连接的管理就包含了客户端资源的整合
-
管理信息:
a. 连接关联的实际用于通信的 muduo::net::Connection 连接;
b. 连接关联的信管理句柄(实现信道的增删查);
c. 连接关联的 EventLoop 异步循环工作线程;
d. 异步工作线程池(用于对收到服务器推送过来的消息进行处理的线程池)。
-
管理操作:
a. 提供创建 Channel 信道的操作;
b. 提供删除 Channel 信道的操作。
-
4.3 项目模块关系图
5. 消息队列服务端核心模块实现
5.1 项目创建
在Linux上创建mq项目并规划开发目录,使用Makefile组织项目。
[]$ tree bitmq/
bitmq/
|-- demo
|-- mqclient
|-- mqcommon
|-- mqserver
|-- mqtest
|-- third
demo
:编写一些功能用例时所在的目录mqcommon
: 公共模块代码(线程池,数据库访问,文件访问,日志打印,pb 相关,以及其他的一些琐碎功能模块代码)mqclient
:客户端模块代码mqserver
:服务器模块代码mqtest
: 单元测试third
:用到的第三方库存放目录
5.2 日志打印工具
为了便于编写项目中能够快速定位程序的错误位置,因此编写一个日志打印类,进行简单的日志打印。
#ifndef __M_LOG_H__
#define __M_LOG_H__
#include <iostream>
#include <ctime>#define DBG_LEVEL 0
#define INF_LEVEL 1
#define ERR_LEVEL 2
#define DEFAULT_LEVEL DBG_LEVEL
#define LOG(lev_str, level, format, ...) {\if (level >= DEFAULT_LEVEL) {\time_t t = time(nullptr);\struct tm* ptm = localtime(&t);\char time_str[32];\strftime(time_str, 31, "%H:%M:%S", ptm);\printf("[%s][%s][%s:%d]\t" format "\n", lev_str, time_str, __FILE__, __LINE__, ##__VA_ARGS__);\}\
}//##__VA_ARGS__,中的##用来避免宏定义函数的参数列表中由于没有不定参导致多余的逗号报错#define DLOG(format, ...) LOG("DBG", DBG_LEVEL, format, ##__VA_ARGS__)
#define ILOG(format, ...) LOG("INF", INF_LEVEL, format, ##__VA_ARGS__)
#define ELOG(format, ...) LOG("ERR", ERR_LEVEL, format, ##__VA_ARGS__)
#endif
5.3 实用Helper工具
Helper
工具类中要完成的是项目中需要的一些辅助零碎的功能代码实现,其中包括文件的基础操作,字符串的额外操作等在项目中用到的零碎功能。
5.3.1 文件基础操作
- 文件是否存在的判断
- 文件大小的获取
- 文件读/写
- 文件创建/删除
- 文件重命名
- 目录创建/删除
- 获取文件的父目录路径
class FileHelper {public:FileHelper(const std::string &filename):_filename(filename){}bool exists() {struct stat st;return (stat(_filename.c_str(), &st) == 0);}size_t size() {struct stat st;int ret = stat(_filename.c_str(), &st);if (ret < 0) {return 0;}return st.st_size;}bool read(char *body, size_t offset, size_t len) {//1. 打开文件std::ifstream ifs(_filename, std::ios::binary | std::ios::in); if (ifs.is_open() == false) {ELOG("%s 文件打开失败!", _filename.c_str());return false;}//2. 跳转文件读写位置ifs.seekg(offset, std::ios::beg);//3. 读取文件数据ifs.read(body, len);if (ifs.good() == false) {ELOG("%s 文件读取数据失败!!", _filename.c_str());ifs.close();return false;}//4. 关闭文件ifs.close();return true;}bool read(std::string &body) {//获取文件大小,根据文件大小调整body的空间size_t fsize = this->size();body.resize(fsize);return read(&body[0], 0, fsize);}bool write(const char *body, size_t offset, size_t len) {//1. 打开文件std::fstream fs(_filename, std::ios::binary | std::ios::in | std::ios::out); if (fs.is_open() == false) {ELOG("%s 文件打开失败!", _filename.c_str());return false;}//2. 跳转到文件指定位置fs.seekp(offset, std::ios::beg);//3. 写入数据fs.write(body, len);if (fs.good() == false) {ELOG("%s 文件写入数据失败!!", _filename.c_str());fs.close();return false;}//4. 关闭文件fs.close();return true;}bool write(const std::string &body) {return write(body.c_str(), 0, body.size());}bool rename(const std::string &nname) {return (::rename(_filename.c_str(), nname.c_str()) == 0);}static std::string parentDirectory(const std::string &filename) {// /aaa/bb/ccc/ddd/test.txtsize_t pos = filename.find_last_of("/");if (pos == std::string::npos) {// test.txtreturn "./";}std::string path = filename.substr(0, pos);return path;}static bool createFile(const std::string &filename) {std::fstream ofs(filename, std::ios::binary | std::ios::out); if (ofs.is_open() == false) {ELOG("%s 文件打开失败!", filename.c_str());return false;}ofs.close();return true;}static bool removeFile(const std::string &filename) {return (::remove(filename.c_str()) == 0);}static bool createDirectory(const std::string &path) {// aaa/bbb/ccc cccc// 在多级路径创建中,我们需要从第一个父级目录开始创建size_t pos, idx = 0;while(idx < path.size()) {pos = path.find("/", idx);if (pos == std::string::npos) {return (mkdir(path.c_str(), 0775) == 0);}std::string subpath = path.substr(0, pos);int ret = mkdir(subpath.c_str(), 0775);if (ret != 0 && errno != EEXIST) {ELOG("创建目录 %s 失败: %s", subpath.c_str(), strerror(errno));return false;}idx = pos + 1;}return true;}static bool removeDirectory(const std::string &path) {// rm -rf path// system()std::string cmd = "rm -rf " + path;return (system(cmd.c_str()) != -1);}private:std::string _filename;
};
5.3.2 SQLite基础操作类
- 判断库是否存在
- 创建并打开库 / 关闭库 / 删除库
- 启动 / 提交 / 回滚事务
- 执行语句
/*** @class SqliteHelper* @brief SQLite数据库操作辅助类,封装了SQLite数据库的基本操作*/
class SqliteHelper {public:/*** @typedef SqliteCallback* @brief SQLite回调函数类型定义* @param void* 用户自定义参数* @param int 结果集中的列数* @param char** 行数据数组* @param char** 列名数组* @return int 回调函数执行状态*/typedef int(*SqliteCallback)(void*, int, char**, char**);/*** @brief 构造函数* @param dbfile 数据库文件路径* @note 初始化时数据库连接句柄为nullptr*/SqliteHelper(const std::string &dbfile) : _dbfile(dbfile), _handler(nullptr) {}/*** @brief 打开数据库连接* @param safe_leve 安全级别标志,默认为SQLITE_OPEN_FULLMUTEX(完全互斥模式)* @return bool 打开成功返回true,失败返回false* @note 打开模式为读写模式,如果数据库不存在则创建*/bool open(int safe_leve = SQLITE_OPEN_FULLMUTEX) {// SQLite3打开函数参数说明:// filename: 数据库文件名// ppDb: 返回的数据库句柄指针// flags: 打开标志组合// zVfs: 使用的VFS模块名(nullptr表示默认)int ret = sqlite3_open_v2(_dbfile.c_str(), // 数据库文件名&_handler, // 返回的数据库句柄SQLITE_OPEN_READWRITE | // 读写模式SQLITE_OPEN_CREATE | // 不存在时创建safe_leve, // 线程安全级别nullptr // 不使用自定义VFS);if (ret != SQLITE_OK) {// 输出错误日志,包含SQLite返回的错误信息ELOG("创建/打开sqlite数据库失败: %s", sqlite3_errmsg(_handler));return false;}return true;}/*** @brief 执行SQL语句* @param sql 要执行的SQL语句* @param cb 回调函数指针,用于处理查询结果* @param arg 传递给回调函数的用户参数* @return bool 执行成功返回true,失败返回false*/bool exec(const std::string &sql, SqliteCallback cb, void *arg) {// SQLite3执行函数参数说明:// sqlite3*: 数据库句柄// sql: 要执行的SQL语句// callback: 回调函数// arg: 传递给回调函数的参数// errmsg: 错误信息指针(此处设为nullptr)int ret = sqlite3_exec(_handler, // 数据库句柄sql.c_str(),// SQL语句cb, // 回调函数arg, // 回调函数参数nullptr // 不接收错误信息);if (ret != SQLITE_OK) {// 输出错误日志,包含失败的SQL语句和错误信息ELOG("%s \n语句执行失败: %s", sql.c_str(), sqlite3_errmsg(_handler));return false;}return true;}/*** @brief 关闭数据库连接* @note 使用sqlite3_close_v2确保资源正确释放*/void close() {if (_handler) {sqlite3_close_v2(_handler); // 关闭数据库连接_handler = nullptr; // 重置句柄指针}}private:std::string _dbfile; // 数据库文件路径sqlite3 *_handler; // SQLite数据库连接句柄
};
5.3.3 字符串操作类
- 提供字符串分割功能
class StrHelper{public:static size_t split(const std::string &str, const std::string &sep, std::vector<std::string> &result) {// news....music.#.pop// 分割的思想:// 1. 从0位置开始查找指定字符的位置, 找到之后进行分割// 2. 从上次查找的位置继续向后查找指定字符size_t pos, idx = 0;while(idx < str.size()) {pos = str.find(sep, idx);if (pos == std::string::npos) {//没有找到,则从查找位置截取到末尾result.push_back(str.substr(idx));return result.size();}//pos == idx 代表两个分隔符之间没有数据,或者说查找起始位置就是分隔符if (pos == idx) {idx = pos + sep.size();continue;}result.push_back(str.substr(idx, pos - idx));idx = pos + sep.size();}return result.size();}
};
5.3.4 UUID生成器类
- UUID(Universally Unique Identifier), 也叫通用唯一识别码,通常由 32 位 16 进制数字字符组成。
- UUID 的标准型式包含 32 个 16 进制数字字符,以连字号分为五段,形式为
8-4-4-4-12
的 32 个字符,如:550e8400-e29b-41d4-a716-446655440000
。 - 在这里,UUID 生成,我们采用生成 8 个随机数字,加上 8 字节序号,共 16 字节数组生成 32 位 16 进制字符的组合形式来确保全局唯一的同时能够根据序号来分辨数据。
class UUIDHelper {public:static std::string uuid() {std::random_device rd;// 生成一个机器随机数,效率较低//因此解决方案,就是通过一个机器随机数作为生成伪随机数的种子std::mt19937_64 gernator(rd());// 通过梅森旋转算法,生成一个伪随机数//我们要生成的是8个0~255之间的数字,所以要限定数字区间std::uniform_int_distribution<int> distribution(0, 255);//然后将生成的数字转换为16进制数字字符std::stringstream ss;for (int i = 0; i < 8; i++) {ss << std::setw(2) << std::setfill('0') << std::hex << distribution(gernator) ;if (i == 3 || i == 5 || i == 7) {ss << "-";}}static std::atomic<size_t> seq(1);// 定义一个原子类型整数,初始化为1size_t num = seq.fetch_add(1);for (int i = 7; i >= 0; i--) {ss << std::setw(2) << std::setfill('0') << std::hex << ((num>>(i*8)) & 0xff);if (i == 6) ss << "-";}return ss.str();}
};
5.4 消息分类定义&交换机类型定义
在开始正式项目功能模块代码编写之前,我们需要先提前做一件事情,就是将消息类型定义出来。而消息最终是需要进行持久化存储的,因此涉及到数据的序列化和反序列化,因此消息的类型定义我们使用 protobuf 来进行生成。因此定义消息类型,其实就是定义一个消息类型的 proto 文件,并生成相关代码。
消息所需要素包括:
- 消息本身要素:
a. 消息属性:
i. 消息 ID
ii. 消息投递模式(非持久化模式 / 持久化模式)
iii. 消息的routing_key
b. 消息有效载荷内容 - 消息额外存储所需要素:
a. 消息的存储位置
b. 消息的长度
c. 消息是否有效(注意:这里不使用 bool 类型,而是使用字符的 0/1,因为 bool 类型在持久化时所占长度不同,可能导致修改文件中消息有效位后消息长度发生变化)
由于客户端与服务端都会用到交换机相关信息(如交换机类型、消息投递模式),因此我们将以下枚举也定义到 proto 文件中:
- 交换机类型:
a.DIRECT
b.FANOUT
c.TOPIC
- 消息投递模式:
a.UNDURABLE
(在 RabbitMQ 中此模式的值为 1,我们保持一致)
b.DURABLE
(值为 2)
syntax = "proto3";
package bitmq;enum ExchangeType {UNKNOWTYPE = 0;DIRECT = 1;//直接交换FANOUT = 2;//广播交换TOPIC = 3;//主题交换
};enum DeliveryMode {UNKNOWMODE = 0;UNDURABLE = 1;DURABLE = 2;
};message BasicProperties{string id = 1;//消息 IDDeliveryMode delivery_mode = 2;////持久化模式 1-非持久化; 2-持久化string routing_key = 3;//与binding_key做匹配
};message Message {message Payload {BasicProperties properties = 1;//消息属性string body = 2;//有效载荷数据string valid = 3;//消息是否有效位};Payload payload = 1;//真正持久化的只有这一个字段uint32 offset = 2;//这两个字段用于记录消息在持久化文件中的位置和长度uint32 length = 3;//以便于在加载时可以在指定位置读取指定长度的数据获取到消息
};
5.5 交换机数据管理
-
定义交换机数据类:
- 交换机名称;
- 交换机类型;
- 是否持久化标志;
- 是否自动删除标志;
- 其他参数。
-
定义交换机内部数据持久化类(数据持久化的sqlite3数据库中):
- 创建/删除交换机数据表;
- 新增交换机数据;
- 移除交换机数据;
- 查询所有交换机数据;
- 查询指定交换机数据(根据名称)。
-
定义交换机外部数据管理类:
- 声明交换机,并添加管理(存在则OK,不存在则创建);
- 删除交换机;
- 获取指定交换机;
- 销毁所有交换机数据。
#ifndef __M_EXCHANGE_H__
#define __M_EXCHANGE_H__#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include <google/protobuf/map.h>
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>namespace bitmq {// 1. 定义交换机类
struct Exchange {using ptr = std::shared_ptr<Exchange>;// 1. 交换机名称std::string name;// 2. 交换机类型ExchangeType type;// 3. 交换机持久化标志bool durable;// 4. 是否自动删除标志bool auto_delete;// 5. 其他参数google::protobuf::Map<std::string, std::string> args;Exchange() {}// 构造函数,初始化交换机属性Exchange(const std::string &ename, ExchangeType etype, bool edurable,bool eauto_delete,const google::protobuf::Map<std::string, std::string> &eargs):name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs) {}// args 存储键值对,在存储数据库的时候,会组织一个格式字符串进行存储 key=val&key=val....// 内部解析 str_args 字符串,将内容存储到成员中void setArgs(const std::string &str_args) {// key=val&key=val&std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &str : sub_args) {size_t pos = str.find("=");std::string key = str.substr(0, pos);std::string val = str.substr(pos + 1);args[key] = val;} } // 将 args 中的内容进行序列化后,返回一个字符串std::string getArgs() {std::string result;for (auto start = args.begin(); start != args.end(); ++start) {result += start->first + "=" + start->second + "&";} return result;}
};// 定义交换机映射类型
using ExchangeMap = std::unordered_map<std::string, Exchange::ptr>;// 2. 定义交换机数据持久化管理类--数据存储在 sqlite 数据库中
class ExchangeMapper {
public:// 构造函数,初始化数据库文件路径并创建表ExchangeMapper(const std::string &dbfile):_sql_helper(dbfile) {std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();} // 创建交换机表void createTable() {#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (ret == false) {DLOG("创建交换机数据库表失败!!");abort(); // 直接异常退出程序} } // 删除交换机表void removeTable() {#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (ret == false) {DLOG("删除交换机数据库表失败!!");abort(); // 直接异常退出程序} } // 插入交换机数据到数据库bool insert(Exchange::ptr &exp) {std::stringstream ss;ss << "insert into exchange_table values(";ss << "'" << exp->name << "', ";ss << exp->type << ", ";ss << exp->durable << ", ";ss << exp->auto_delete << ", ";ss << "'" << exp->getArgs() << "');";return _sql_helper.exec(ss.str(), nullptr, nullptr);} // 从数据库删除指定交换机void remove(const std::string &name) {std::stringstream ss;ss << "delete from exchange_table where name=";ss << "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);} // 从数据库恢复所有交换机数据到内存ExchangeMap recovery() {ExchangeMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table";_sql_helper.exec(sql, selectCallback, &result);return result;} private:/*** SQLite 查询回调函数 - 用于处理数据库查询结果并构建 Exchange 对象映射* * @param arg 用户传入的参数指针,此处应为 ExchangeMap* 类型* @param numcol 结果集中的列数* @param row 当前行的各列值数组(字符串形式)* @param fields 结果集的列名数组(本函数未使用)* @return 始终返回0,表示处理成功*/static int selectCallback(void* arg, int numcol, char** row, char** fields) {// 1. 转换用户参数:将void*转换为ExchangeMap指针类型// ExchangeMap 是存储Exchange对象的映射容器,键为exchange名称,值为shared_ptr<Exchange>ExchangeMap *result = (ExchangeMap*)arg;// 2. 创建Exchange智能指针对象// 使用std::make_shared自动管理内存,避免裸指针auto exp = std::make_shared<Exchange>();// 3. 填充Exchange对象属性(从SQL查询结果行数据)// 第0列:exchange名称(字符串)exp->name = row[0]; // 第1列:exchange类型(需将字符串转换为枚举值)// 先转成int,再强制转换为bitmq::ExchangeType枚举exp->type = (bitmq::ExchangeType)std::stoi(row[1]);// 第2列:是否持久化(将字符串"0/1"转换为bool)exp->durable = (bool)std::stoi(row[2]);// 第3列:是否自动删除(将字符串"0/1"转换为bool)exp->auto_delete = (bool)std::stoi(row[3]);// 4. 处理可选参数(第4列可能为NULL)if (row[4]) exp->setArgs(row[4]);// 5. 将构建好的Exchange对象插入结果映射// 使用make_pair创建键值对,键是exchange名称,值是智能指针result->insert(std::make_pair(exp->name, exp));// 6. 返回0表示成功(SQLite回调函数约定)return 0;} private:SqliteHelper _sql_helper; // SQLite 数据库操作助手
};// 3. 定义交换机数据内存管理类
class ExchangeManager {
public:using ptr = std::shared_ptr<ExchangeManager>;// 构造函数,初始化数据库文件路径并从数据库恢复数据ExchangeManager(const std::string &dbfile) : _mapper(dbfile) {_exchanges = _mapper.recovery();} // 声明(创建)交换机bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it != _exchanges.end()) {// 如果交换机已经存在,那就直接返回,不需要重复新增return true;} auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);if (durable == true) {bool ret = _mapper.insert(exp);if (ret == false) return false;} _exchanges.insert(std::make_pair(name, exp));} // 删除交换机void deleteExchange(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return; } if(it->second->durable == true) {_mapper.remove(name);}_exchanges.erase(name);} // 获取指定交换机对象Exchange::ptr selectExchange(const std::string &name) { std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return Exchange::ptr();} return it->second;} // 判断交换机是否存在bool exists(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _exchanges.find(name);if (it == _exchanges.end()) {return false;} return true;} // 获取当前交换机数量size_t size() {std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();} // 清理所有交换机数据void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchanges.clear();} private:std::mutex _mutex; // 互斥锁,保证线程安全ExchangeMapper _mapper; // 交换机持久化管理器ExchangeMap _exchanges; // 交换机内存映射表
};} // namespace bitmq#endif
5.6 队列数据管理
- 当前队列数据的管理本质上是队列描述信息的管理,描述当前服务器上有哪些队列。具体包括:
- 定义队列描述数据类:
- 队列名称;
- 是否持久化标志;
- 定义队列数据持久化类(数据持久化到sqlite3数据库中):
- 创建/删除队列数据表
- 新增队列数据
- 移除队列数据
- 查询所有队列数据
- 定义队列数据管理类:
- 创建队列(存在则OK,不存在则创建)
- 删除队列
- 获取指定队列
- 获取所有队列
- 判断指定队列是否存在
- 获取队列数量
- 销毁所有队列数据
#ifndef __M_QUEUE_H__
#define __M_QUEUE_H__#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>namespace bitmq {// 消息队列结构定义
struct MsgQueue { using ptr = std::shared_ptr<MsgQueue>;std::string name; // 队列名称bool durable; // 是否持久化(服务器重启后是否保留)bool exclusive; // 是否排他队列(仅限一个连接使用)bool auto_delete; // 是否自动删除(当最后一个消费者取消订阅时)google::protobuf::Map<std::string, std::string> args; // 队列附加参数MsgQueue() {}// 带参数的构造函数MsgQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs):name(qname), durable(qdurable), exclusive(qexclusive),auto_delete(qauto_delete), args(qargs) {}// 从字符串格式解析参数(key=value&key2=value2)void setArgs(const std::string &str_args) {std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &str : sub_args) {size_t pos = str.find("=");std::string key = str.substr(0, pos);std::string val = str.substr(pos + 1);args[key] = val;} } // 将参数转换为字符串格式(key=value&key2=value2)std::string getArgs() {std::string result;for (auto start = args.begin(); start != args.end(); ++start) {result += start->first + "=" + start->second + "&";} return result;}
};// 消息队列映射表,以队列名称为键
using QueueMap = std::unordered_map<std::string, MsgQueue::ptr>;// 用于管理队列持久化到SQLite数据库的类
class MsgQueueMapper {
public:MsgQueueMapper(const std::string &dbfile):_sql_helper(dbfile) {std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);_sql_helper.open();createTable();} // 创建队列表(如果不存在)void createTable() {std::stringstream sql;sql << "create table if not exists queue_table(";sql << "name varchar(32) primary key, ";sql << "durable int, ";sql << "exclusive int, ";sql << "auto_delete int, ";sql << "args varchar(128));";assert(_sql_helper.exec(sql.str(), nullptr, nullptr));} // 删除队列表void removeTable() {std::string sql = "drop table if exists queue_table;";_sql_helper.exec(sql, nullptr, nullptr);} // 插入新队列到数据库bool insert(MsgQueue::ptr &queue) {// insert into queue_table values('queue1', true, false, false, "k1=v1&k2=v2&");std::stringstream sql;sql << "insert into queue_table values(";sql << "'" << queue->name << "', ";sql << queue->durable << ", ";sql << queue->exclusive << ", ";sql << queue->auto_delete << ", ";sql << "'" << queue->getArgs() << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);} // 从数据库删除队列void remove(const std::string &name) {// delete from queue_table where name='queue1';std::stringstream sql;sql << "delete from queue_table where name=";sql << "'" << name << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);} // 从数据库恢复所有队列到内存QueueMap recovery() {QueueMap result;std::string sql = "select name, durable, exclusive, \auto_delete, args from queue_table;";_sql_helper.exec(sql, selectCallback, &result);return result;} private:// SQLite查询结果回调函数static int selectCallback(void* arg, int numcol, char** row, char** fields) {QueueMap *result = (QueueMap*)arg;MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = row[0];mqp->durable = (bool)std::stoi(row[1]);mqp->exclusive = (bool)std::stoi(row[2]);mqp->auto_delete = (bool)std::stoi(row[3]);if (row[4]) mqp->setArgs(row[4]);result->insert(std::make_pair(mqp->name, mqp));return 0;} private:SqliteHelper _sql_helper; // SQLite操作辅助类
};// 主队列管理类
class MsgQueueManager {
public:using ptr = std::shared_ptr<MsgQueueManager>;MsgQueueManager(const std::string &dbfile):_mapper(dbfile) {_msg_queues = _mapper.recovery(); // 启动时从数据库恢复队列} // 声明一个新队列bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(qname);if (it != _msg_queues.end()) {return true;} MsgQueue::ptr mqp = std::make_shared<MsgQueue>();mqp->name = qname;mqp->durable = qdurable;mqp->exclusive = qexclusive;mqp->auto_delete = qauto_delete;mqp->args = qargs;if (qdurable == true) {bool ret = _mapper.insert(mqp);if (ret == false) return false;} _msg_queues.insert(std::make_pair(qname, mqp));return true;} // 删除指定队列void deleteQueue(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(name);if (it == _msg_queues.end()) {return ; } if (it->second->durable == true) _mapper.remove(name);_msg_queues.erase(name);} // 查询指定队列MsgQueue::ptr selectQueue(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(name);if (it == _msg_queues.end()) {return MsgQueue::ptr();} return it->second;} // 获取所有队列QueueMap allQueues() {std::unique_lock<std::mutex> lock(_mutex);return _msg_queues;} // 检查队列是否存在bool exists(const std::string &name) {std::unique_lock<std::mutex> lock(_mutex);auto it = _msg_queues.find(name);if (it == _msg_queues.end()) {return false;} return true;} // 获取队列数量size_t size() {std::unique_lock<std::mutex> lock(_mutex);return _msg_queues.size();} // 清空所有队列void clear() {_mapper.removeTable();_msg_queues.clear();} private:std::mutex _mutex; // 线程安全锁MsgQueueMapper _mapper; // 队列持久化映射器QueueMap _msg_queues; // 内存中的队列映射表
};
} #endif
5.7 绑定信息(交换机-队列)管理
绑定信息,本质上就是一个交换机关联了哪些队列的描述。
-
定义绑定信息类
a. 交换机名称
b. 队列名称
c.
binding_key
(分发匹配规则-决定了哪些数据能被交换机放入队列) -
定义绑定信息数据持久化类(数据持久化的 sqlite3 数据库中)
a. 创建/删除绑定信息数据表
b. 新增绑定信息数据
c. 移除指定绑定信息数据
d. 移除指定交换机相关绑定信息数据:移除交换机的时候会被调用
e. 移除指定队列相关绑定信息数据:移除队列的时候会被调用
f. 查询所有绑定信息数据:用于重启服务器时进行历史数据恢复
-
定义绑定信息数据管理类
a. 创建绑定信息,并添加管理(存在则 OK,不存在则创建)
b. 解除指定的绑定信息
c. 删除指定队列的所有绑定信息
d. 删除交换机相关的所有绑定信息
e. 获取交换机相关的所有绑定信息:交换机收到消息后,需要分发给自己关联的队列
f. 判断指定绑定信息是否存在
g. 获取当前绑定信息数量
h. 销毁所有绑定信息数据
#ifndef __M_BINDING_H__
#define __M_BINDING_H__#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"
#include <iostream>
#include <unordered_map>
#include <mutex>
#include <memory>namespace bitmq {// 绑定关系结构体,表示交换机与队列之间的绑定关系struct Binding { using ptr = std::shared_ptr<Binding>; // 智能指针别名std::string exchange_name; // 交换机名称std::string msgqueue_name; // 消息队列名称std::string binding_key; // 绑定键(路由规则)// 默认构造函数Binding() {}// 带参构造函数Binding(const std::string &ename, const std::string &qname, const std::string &key):exchange_name(ename), msgqueue_name(qname), binding_key(key) {}};// 队列名与绑定信息的映射关系(方便通过队列名查找绑定信息)using MsgQueueBindingMap = std::unordered_map<std::string, Binding::ptr>;// 交换机名称与绑定信息的映射关系(包含所有绑定信息,按交换机分组)using BindingMap = std::unordered_map<std::string, MsgQueueBindingMap>;// 绑定关系管理类(负责绑定关系的持久化存储和内存管理)class BindingMapper {public:// 构造函数,初始化SQLite数据库连接BindingMapper(const std::string &dbfile) : _sql_helper(dbfile) {std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path); // 创建数据库目录_sql_helper.open(); // 打开数据库连接createTable(); // 创建绑定关系表} // 创建绑定关系表void createTable() {std::stringstream sql;sql << "create table if not exists binding_table(";sql << "exchange_name varchar(32), "; // 交换机名字段sql << "msgqueue_name varchar(32), "; // 队列名字段sql << "binding_key varchar(128));"; // 绑定键字段assert(_sql_helper.exec(sql.str(), nullptr, nullptr));} // 删除绑定关系表void removeTable() {std::string sql = "drop table if exists binding_table;";_sql_helper.exec(sql, nullptr, nullptr);} // 插入新的绑定关系bool insert(Binding::ptr &binding) {std::stringstream sql;sql << "insert into binding_table values(";sql << "'" << binding->exchange_name << "', ";sql << "'" << binding->msgqueue_name << "', ";sql << "'" << binding->binding_key << "');";return _sql_helper.exec(sql.str(), nullptr, nullptr);} // 删除指定交换机和队列的绑定关系void remove(const std::string &ename, const std::string &qname) {std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "' and ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);} // 删除指定交换机的所有绑定关系void removeExchangeBindings(const std::string &ename) { std::stringstream sql;sql << "delete from binding_table where ";sql << "exchange_name='" << ename << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);} // 删除指定队列的所有绑定关系void removeMsgQueueBindings(const std::string &qname) { std::stringstream sql;sql << "delete from binding_table where ";sql << "msgqueue_name='" << qname << "';";_sql_helper.exec(sql.str(), nullptr, nullptr);} // 从数据库恢复所有绑定关系BindingMap recovery() {BindingMap result;std::string sql = "select exchange_name, msgqueue_name, binding_key from binding_table;";_sql_helper.exec(sql, selectCallback, &result); // 执行查询并回调处理结果return result;} private:/*** @brief SQL查询结果回调函数 - 将数据库查询结果转换为内存中的数据结构* * @param arg 用户传入的参数指针,此处应指向BindingMap类型的结果容器* @param numcol 返回的列数(字段数)* @param row 当前行的各列值数组,每个元素是一个字符串表示的字段值* @param fields 字段名数组(未使用)* @return int 固定返回0表示成功(SQLite回调函数约定)* * @note 此函数由SQLite在每次获取查询结果行时自动调用*/static int selectCallback(void* arg, int numcol, char** row, char** fields) {// 将void*类型的参数转换为BindingMap指针类型// BindingMap是一个存储交换机到队列绑定关系的映射容器BindingMap* result = (BindingMap*)arg;// 使用查询结果创建新的Binding对象:// row[0] - 交换机名称 (exchange_name)// row[1] - 队列名称 (msgqueue_name)// row[2] - 路由键 (routing_key)// 使用std::make_shared创建智能指针管理的Binding对象Binding::ptr bp = std::make_shared<Binding>(row[0], row[1], row[2]);// 获取或创建该交换机对应的队列绑定映射:// 1. 通过exchange_name在result映射中查找// 2. 如果不存在会自动创建一个空的MsgQueueBindingMap// MsgQueueBindingMap是队列名称到Binding对象的映射MsgQueueBindingMap& qmap = (*result)[bp->exchange_name];// 将新建的绑定关系插入到对应交换机的映射中:// 使用队列名称(msgqueue_name)作为key,Binding智能指针作为valueqmap.insert(std::make_pair(bp->msgqueue_name, bp));// 返回0表示处理成功(SQLite回调函数约定)return 0;}private:SqliteHelper _sql_helper; // SQLite数据库操作助手};// 绑定关系管理器(提供线程安全的绑定关系操作接口)class BindingManager { public:using ptr = std::shared_ptr<BindingManager>; // 智能指针别名// 构造函数,从数据库恢复绑定关系BindingManager(const std::string &dbfile) : _mapper(dbfile) {_bindings = _mapper.recovery(); // 从数据库加载绑定关系} // 绑定交换机和队列bool bind(const std::string &ename, const std::string &qname, const std::string &key, bool durable) {std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全// 检查绑定关系是否已存在auto it = _bindings.find(ename);if (it != _bindings.end() && it->second.find(qname) != it->second.end()) {return true; // 绑定关系已存在,直接返回成功} // 创建新的绑定关系对象Binding::ptr bp = std::make_shared<Binding>(ename, qname, key);// 如果需要持久化,则写入数据库if (durable) {bool ret = _mapper.insert(bp);if (ret == false) return false; // 数据库操作失败} // 更新内存中的绑定关系auto &qbmap = _bindings[ename];qbmap.insert(std::make_pair(qname, bp));return true;} // 解除交换机和队列的绑定void unBind(const std::string &ename, const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);// 查找交换机绑定信息auto eit = _bindings.find(ename);if (eit == _bindings.end()) { return; } // 没有该交换机的绑定信息// 查找队列绑定信息auto qit = eit->second.find(qname);if (qit == eit->second.end()) { return; } // 没有该队列的绑定信息// 从数据库和内存中删除绑定关系_mapper.remove(ename, qname);_bindings[ename].erase(qname);} // 删除指定交换机的所有绑定关系void removeExchangeBindings(const std::string &ename) { std::unique_lock<std::mutex> lock(_mutex);_mapper.removeExchangeBindings(ename); // 从数据库删除_bindings.erase(ename); // 从内存删除} // 删除指定队列的所有绑定关系void removeMsgQueueBindings(const std::string &qname) { std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgQueueBindings(qname); // 从数据库删除// 遍历所有交换机,从内存中删除该队列的绑定信息for (auto start = _bindings.begin(); start != _bindings.end(); ++start) {start->second.erase(qname);} } // 获取指定交换机的所有绑定关系MsgQueueBindingMap getExchangeBindings(const std::string &ename) {std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()) { return MsgQueueBindingMap(); // 返回空映射} return eit->second;} // 获取指定交换机和队列的绑定关系Binding::ptr getBinding(const std::string &ename, const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()) { return Binding::ptr(); // 返回空指针} auto qit = eit->second.find(qname);if (qit == eit->second.end()) { return Binding::ptr(); // 返回空指针}return qit->second;} // 检查绑定关系是否存在bool exists(const std::string &ename, const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);auto eit = _bindings.find(ename);if (eit == _bindings.end()) { return false; } auto qit = eit->second.find(qname);if (qit == eit->second.end()) { return false; } return true;} // 获取绑定关系总数size_t size() {size_t total_size = 0;std::unique_lock<std::mutex> lock(_mutex);// 遍历所有交换机的绑定关系并计数for (auto start = _bindings.begin(); start != _bindings.end(); ++start) {total_size += start->second.size();} return total_size;} // 清空所有绑定关系void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable(); // 删除数据库表_bindings.clear(); // 清空内存数据} private:std::mutex _mutex; // 互斥锁(保证线程安全)BindingMapper _mapper; // 绑定关系持久化管理器BindingMap _bindings; // 内存中的绑定关系映射};
} #endif
5.8 队列消息管理
- 因为消息数据需要在网络中进行传输,因此消息的类型定义使用
protobuf
进行,因为protobuf
中自带了序列化和反序列化功能,操作起来会简便一些。 - 需要特别说明的是,消息的存储并没有使用数据库,因为消息长度通常不定,且有些消息可能会非常庞大,因此并不适合存储在数据库中,因此我们的处理方式(包括RabbitMQ)是直接将消息存储在文件中进行管理,而内存中管理的消息只需要记录好自己在文件中的所在位置和长度即可。
- 为了便于管理,消息管理以队列为单元进行管理,因此每个队列都会有自己独立的数据存储文件。
-
消息类型的proto定义在本章的5.4部分
-
消息的持久化管理
a. 管理数据
i. 队列消息文件存储的路径
ii. 队列消息的存储文件名
iii. 队列消息的临时交换文件名
b. 管理操作
i. 日志消息存储在文件中(4B 长度+(属性+内容+有效位)序列化消息,连续存储即可)
ii. 提供队列消息文件创建/删除功能
iii. 提供队列消息的新增持久化/删除持久化
iv. 提供持久化内容的垃圾回收(其实就是重新加载出所有有效消息返回,并重新生成新的消息存储文件)
-
消息的管理(以队列为单位进行管理)
a. 队列消息管理数据
i. 队列名称
ii. 待推送消息链表
iii. 持久化消息 hash
iv. 待确认消息 hash
v. 有效消息数量
vi. 已经持久化消息总量
vii. 持久化管理句柄
b. 队列管理操作
i. 新增消息
ii. 获取队首消息(获取的同时将消息加入待确认队列)
iii. 移除指定待确认消息
iv. 获取队列待消费&待确认消息数量
v. 恢复队列历史消息。
vi. 销毁队列所有消息
vii. 判断队列消息是否为空
c. 消息的总体对外管理
i. 初始化新建队列的消息管理结构,并创建消息存储文件
ii. 删除队列的消息管理结构,以及消息存储文件
iii. 向指定队列新增消息
iv. 获取指定队列队首消息
v. 确认指定队列待确认消息(删除)
vi. 判断指定队列消息是否为空
#ifndef __M_MSG_H__
#define __M_MSG_H__// 包含必要的头文件
#include "../mqcommon/mq_logger.hpp" // 日志记录功能
#include "../mqcommon/mq_helper.hpp" // 文件操作辅助类
#include "../mqcommon/mq_msg.pb.h" // Protobuf生成的消息定义
#include <iostream> // 标准输入输出
#include <unordered_map> // 哈希表容器
#include <mutex> // 互斥锁
#include <memory> // 智能指针
#include <list> // 链表容器namespace bitmq {// 定义常量字符串#define DATAFILE_SUBFIX ".mqd" // 数据文件后缀#define TMPFILE_SUBFIX ".mqd.tmp" // 临时文件后缀// 使用智能指针管理消息对象using MessagePtr = std::shared_ptr<bitmq::Message>;/*** @class MessageMapper* @brief 消息持久化管理类,负责消息的磁盘存储和加载*/class MessageMapper {public:/*** @brief 构造函数,初始化消息存储路径* @param basedir 基础目录路径* @param qname 队列名称*/MessageMapper(std::string &basedir, const std::string &qname):_qname(qname) {// 确保目录路径以'/'结尾if (basedir.back() != '/') basedir.push_back('/');// 构造数据文件和临时文件路径_datafile = basedir + qname + DATAFILE_SUBFIX;_tmpfile = basedir + qname + TMPFILE_SUBFIX;// 如果基础目录不存在则创建if (FileHelper(basedir).exists() == false) {assert(FileHelper::createDirectory(basedir));}createMsgFile();}/*** @brief 创建消息数据文件* @return 创建成功返回true,失败返回false*/bool createMsgFile() {// 如果文件已存在则直接返回成功if (FileHelper(_datafile).exists() == true) {return true;}// 创建新文件bool ret = FileHelper::createFile(_datafile);if (ret == false) {DLOG("创建队列数据文件 %s 失败!", _datafile.c_str());return false;}return true;}/*** @brief 删除消息数据文件*/void removeMsgFile() {FileHelper::removeFile(_datafile);FileHelper::removeFile(_tmpfile);}/*** @brief 插入消息到数据文件* @param msg 消息指针* @return 插入成功返回true,失败返回false*/bool insert(MessagePtr &msg) {return insert(_datafile, msg);}/*** @brief 从数据文件移除消息(逻辑删除)* @param msg 消息指针* @return 删除成功返回true,失败返回false*/bool remove(MessagePtr &msg) {// 1. 将msg中的有效标志位修改为'0'(逻辑删除)msg->mutable_payload()->set_valid("0");// 2. 对msg进行序列化std::string body = msg->payload().SerializeAsString();if (body.size() != msg->length()) {DLOG("不能修改文件中的数据信息,因为新生成的数据与原数据长度不一致!");return false;}// 3. 将序列化后的消息写入文件原位置(覆盖)FileHelper helper(_datafile);bool ret = helper.write(body.c_str(), msg->offset(), body.size());if (ret == false) {DLOG("向队列数据文件写入数据失败!");return false;}return true;}/*** @brief 执行垃圾回收,整理磁盘空间* @return 返回整理后仍然有效的消息列表*/std::list<MessagePtr> gc() {bool ret;std::list<MessagePtr> result;// 1. 加载所有有效数据ret = load(result);if (ret == false) {DLOG("加载有效数据失败!\n");return result;}// 2. 将有效数据写入临时文件FileHelper::createFile(_tmpfile);for (auto &msg : result) {DLOG("向临时文件写入数据: %s", msg->payload().body().c_str());ret = insert(_tmpfile, msg);if (ret == false) {DLOG("向临时文件写入消息数据失败!!");return result;}}// 3. 删除原数据文件ret = FileHelper::removeFile(_datafile);if (ret == false) {DLOG("删除源文件失败!");return result;}// 4. 将临时文件重命名为原数据文件名ret = FileHelper(_tmpfile).rename(_datafile);if (ret == false) {DLOG("修改临时文件名称失败!");return result;}// 5. 返回新的有效数据return result;}private:/*** @brief 从文件加载所有有效消息* @param result 输出参数,存储加载的有效消息* @return 加载成功返回true,失败返回false*/bool load(std::list<MessagePtr> &result) {// 文件存储格式:4字节长度|数据|4字节长度|数据...FileHelper data_file_helper(_datafile);size_t offset = 0, msg_size;size_t fsize = data_file_helper.size();bool ret;while(offset < fsize) {// 读取消息长度ret = data_file_helper.read((char*)&msg_size, offset, sizeof(size_t));if (ret == false) {DLOG("读取消息长度失败!");return false;}offset += sizeof(size_t);// 读取消息内容std::string msg_body(msg_size, '\0');data_file_helper.read(&msg_body[0], offset, msg_size);if (ret == false) {DLOG("读取消息数据失败!");return false;}offset += msg_size;// 反序列化消息MessagePtr msgp = std::make_shared<Message>();msgp->mutable_payload()->ParseFromString(msg_body);// 跳过无效消息if (msgp->payload().valid() == "0") {DLOG("加载到无效消息:%s", msgp->payload().body().c_str());continue;}// 保存有效消息result.push_back(msgp);}return true;}/*** @brief 向指定文件插入消息* @param filename 目标文件名* @param msg 消息指针* @return 插入成功返回true,失败返回false*/bool insert(const std::string &filename, MessagePtr &msg) {// 1. 序列化消息std::string body = msg->payload().SerializeAsString();// 2. 获取文件当前大小FileHelper helper(filename);size_t fsize = helper.size();size_t msg_size = body.size();// 写入格式:4字节长度 + 实际数据bool ret = helper.write((char*)&msg_size, fsize, sizeof(size_t));if (ret == false) {DLOG("向队列数据文件写入数据长度失败!");return false;}// 3. 写入实际数据ret = helper.write(body.c_str(), fsize + sizeof(size_t), body.size());if (ret == false) {DLOG("向队列数据文件写入数据失败!");return false;}// 4. 更新消息的存储位置信息msg->set_offset(fsize + sizeof(size_t));msg->set_length(body.size());return true;}private:std::string _qname; // 队列名称std::string _datafile; // 数据文件路径std::string _tmpfile; // 临时文件路径};/*** @class QueueMessage* @brief 队列消息管理类,管理内存中的消息和持久化消息*/class QueueMessage {public:using ptr = std::shared_ptr<QueueMessage>;/*** @brief 构造函数* @param basedir 基础目录路径* @param qname 队列名称*/QueueMessage(std::string &basedir, const std::string &qname):_mapper(basedir, qname), _qname(qname), _valid_count(0), _total_count(0) {}/*** @brief 从磁盘恢复消息到内存* @return 恢复成功返回true*/bool recovery() {std::unique_lock<std::mutex> lock(_mutex);// 执行垃圾回收并获取有效消息_msgs = _mapper.gc();// 将消息加入持久化消息哈希表for (auto &msg : _msgs) {_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}// 更新计数器_valid_count = _total_count = _msgs.size();return true;}/*** @brief 插入新消息* @param bp 消息属性* @param body 消息体* @param queue_is_durable 队列是否持久化* @return 插入成功返回true,失败返回false*/bool insert(const BasicProperties *bp, const std::string &body, bool queue_is_durable) {// 1. 构造消息对象MessagePtr msg = std::make_shared<Message>();msg->mutable_payload()->set_body(body);// 设置消息属性if (bp != nullptr) {DeliveryMode mode = queue_is_durable ? bp->delivery_mode() : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(bp->id());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key(bp->routing_key());} else {DeliveryMode mode = queue_is_durable ? DeliveryMode::DURABLE : DeliveryMode::UNDURABLE;msg->mutable_payload()->mutable_properties()->set_id(UUIDHelper::uuid());msg->mutable_payload()->mutable_properties()->set_delivery_mode(mode);msg->mutable_payload()->mutable_properties()->set_routing_key("");}std::unique_lock<std::mutex> lock(_mutex);// 2. 处理持久化消息if (msg->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {msg->mutable_payload()->set_valid("1"); // 标记为有效// 持久化存储bool ret = _mapper.insert(msg);if (ret == false) {DLOG("持久化存储消息:%s 失败了!", body.c_str());return false;}// 更新计数器_valid_count += 1;_total_count += 1;// 加入持久化消息表_durable_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));}// 3. 加入内存消息队列_msgs.push_back(msg);return true;}/*** @brief 获取队首消息* @return 返回消息指针,队列为空返回空指针*/MessagePtr front() {std::unique_lock<std::mutex> lock(_mutex);if (_msgs.size() == 0) {return MessagePtr();}// 从队列取出消息MessagePtr msg = _msgs.front();_msgs.pop_front();// 加入待确认表_waitack_msgs.insert(std::make_pair(msg->payload().properties().id(), msg));return msg;}/*** @brief 删除消息(确认后调用)* @param msg_id 消息ID* @return 删除成功返回true*/bool remove(const std::string &msg_id) {std::unique_lock<std::mutex> lock(_mutex);// 1. 查找待确认消息auto it = _waitack_msgs.find(msg_id);if (it == _waitack_msgs.end()) {DLOG("没有找到要删除的消息:%s!", msg_id.c_str());return true;}// 2. 处理持久化消息if (it->second->payload().properties().delivery_mode() == DeliveryMode::DURABLE) {// 从磁盘删除_mapper.remove(it->second);_durable_msgs.erase(msg_id);_valid_count -= 1;// 检查是否需要垃圾回收gc();}// 3. 从内存删除_waitack_msgs.erase(msg_id);return true;}// 以下为各种计数器的获取方法size_t getable_count() {std::unique_lock<std::mutex> lock(_mutex);return _msgs.size(); // 可获取的消息数量}size_t total_count() {std::unique_lock<std::mutex> lock(_mutex);return _total_count; // 总消息数量}size_t durable_count() {std::unique_lock<std::mutex> lock(_mutex);return _durable_msgs.size(); // 持久化消息数量}size_t waitack_count() {std::unique_lock<std::mutex> lock(_mutex);return _waitack_msgs.size(); // 待确认消息数量}/*** @brief 清空队列*/void clear() {std::unique_lock<std::mutex> lock(_mutex);_mapper.removeMsgFile(); // 删除磁盘文件_msgs.clear(); // 清空内存队列_durable_msgs.clear(); // 清空持久化表_waitack_msgs.clear(); // 清空待确认表_valid_count = 0; // 重置计数器_total_count = 0;}private:/*** @brief 检查是否需要执行垃圾回收(Garbage Collection)* @return bool - 如果需要垃圾回收返回true,否则返回false* * 垃圾回收触发条件:* 1. 总消息数(_total_count)超过2000条* 2. 有效消息比例(_valid_count/_total_count)低于50%*/bool GCCheck() {// 检查总消息数是否超过阈值2000// 同时检查有效消息占比是否低于50%(通过整数运算避免浮点运算)// 注意:这里用_valid_count*10/_total_count<5来等价于_valid_count/_total_count<0.5if (_total_count > 2000 && _valid_count * 10 / _total_count < 5) {return true; // 满足GC条件}return false; // 不满足GC条件}/*** @brief 执行垃圾回收操作* * 垃圾回收流程:* 1. 检查是否满足GC条件* 2. 从持久化存储(_mapper)获取有效消息列表* 3. 重建内存中的消息索引* 4. 更新计数器*/void gc() {// 第一步:检查GC条件,不满足则直接返回if (GCCheck() == false) {return; // 当前不需要执行垃圾回收}// 第二步:从持久化存储执行GC并获取有效消息列表std::list<MessagePtr> msgs = _mapper.gc();// 遍历所有从持久化存储返回的有效消息for (auto &msg : msgs) {// 获取消息ID用于查找const auto& msg_id = msg->payload().properties().id();// 在内存持久化消息映射表(_durable_msgs)中查找该消息auto it = _durable_msgs.find(msg_id);// 情况1:内存中不存在该消息的记录if (it == _durable_msgs.end()) {// 记录警告日志(DEBUG级别)DLOG("垃圾回收后,有一条持久化消息,在内存中没有进行管理!");// 将该消息重新添加到内存队列_msgs.push_back(msg);// 在内存映射表中建立新记录_durable_msgs.insert(std::make_pair(msg_id, msg));continue;}// 情况2:内存中存在该消息记录// 更新消息的物理存储位置信息(offset和length)it->second->set_offset(msg->offset()); // 更新偏移量it->second->set_length(msg->length()); // 更新消息长度}// 第三步:更新计数器// GC后,有效消息数和总消息数都等于从持久层返回的消息数_valid_count = _total_count = msgs.size();}private:std::mutex _mutex; // 互斥锁std::string _qname; // 队列名称size_t _valid_count; // 有效消息计数size_t _total_count; // 总消息计数MessageMapper _mapper; // 消息持久化管理器std::list<MessagePtr> _msgs; // 待推送消息队列std::unordered_map<std::string, MessagePtr> _durable_msgs; // 持久化消息表std::unordered_map<std::string, MessagePtr> _waitack_msgs; // 待确认消息表};/*** @class MessageManager* @brief 消息管理器,管理所有队列的消息*/class MessageManager {public:using ptr = std::shared_ptr<MessageManager>;/*** @brief 构造函数* @param basedir 基础目录路径*/MessageManager(const std::string &basedir): _basedir(basedir){}/*** @brief 清空所有队列消息*/void clear() {std::unique_lock<std::mutex> lock(_mutex);for (auto &qmsg : _queue_msgs) {qmsg.second->clear();}}/*** @brief 初始化队列消息管理* @param qname 队列名称*/void initQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);// 检查是否已存在auto it = _queue_msgs.find(qname);if (it != _queue_msgs.end()) {return;}// 创建新的队列管理对象qmp = std::make_shared<QueueMessage>(_basedir, qname);_queue_msgs.insert(std::make_pair(qname, qmp));}// 恢复持久化消息qmp->recovery();}/*** @brief 销毁队列消息管理* @param qname 队列名称*/void destroyQueueMessage(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {return;}qmp = it->second;_queue_msgs.erase(it);}// 清空队列qmp->clear();}// 以下为各种队列操作的包装方法bool insert(const std::string &qname, BasicProperties *bp, const std::string &body, bool queue_is_durable) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("向队列%s新增消息失败:没有找到消息管理句柄!", qname.c_str());return false;}qmp = it->second;}return qmp->insert(bp, body, queue_is_durable);}MessagePtr front(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s队首消息失败:没有找到消息管理句柄!", qname.c_str());return MessagePtr();}qmp = it->second;}return qmp->front();}void ack(const std::string &qname, const std::string &msg_id) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("确认队列%s消息%s失败:没有找到消息管理句柄!", qname.c_str(), msg_id.c_str());return;}qmp = it->second;}qmp->remove(msg_id);return;}size_t getable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待推送消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->getable_count();}size_t total_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s总持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->total_count();}size_t durable_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s有效持久化消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->durable_count();}size_t waitack_count(const std::string &qname) {QueueMessage::ptr qmp;{std::unique_lock<std::mutex> lock(_mutex);auto it = _queue_msgs.find(qname);if (it == _queue_msgs.end()) {DLOG("获取队列%s待确认消息数量失败:没有找到消息管理句柄!", qname.c_str());return 0;}qmp = it->second;}return qmp->waitack_count();}private:std::mutex _mutex; // 互斥锁std::string _basedir; // 基础目录路径std::unordered_map<std::string, QueueMessage::ptr> _queue_msgs; // 队列消息管理表};
}#endif
5.9 虚拟机管理
虚拟机模块是对上述三个数据管理模块的整合,并基于数据之间的关联关系进行联合操作。
-
定义虚拟机类包含以下成员:
a. 交换机数据管理模块句柄
b. 队列数据管理模块句柄
c. 绑定数据管理模块句柄
d. 消息数据管理模块句柄
-
虚拟机包含操作:
a. 提供声明交换机的功能(存在则 OK,不存在则创建)
b. 提供删除交换机的功能(删除交换机的同时删除关联绑定信息)
c. 提供声明队列的功能(存在则 OK,不存在则创建,创建的同时创建队列关联消息管理对象)
d. 提供删除队列的功能(删除队列的同时删除关联绑定信息,删除关联消息管理对象及队列所有消息)
e. 提供交换机-队列绑定的功能
f. 提供交换机-队列解绑的功能
g. 提供获取交换机相关的所有绑定信息功能
h. 提供新增消息的功能
i. 提供获取指定队列队首消息的功能
j. 提供消息确认删除的功能
#ifndef __M_HOST_H__
#define __M_HOST_H__
#include "mq_exchange.hpp"
#include "mq_queue.hpp"
#include "mq_binding.hpp"
#include "mq_message.hpp"namespace bitmq {/*** @class VirtualHost* @brief 虚拟主机类,管理交换机、队列、绑定关系和消息*/class VirtualHost {public:using ptr = std::shared_ptr<VirtualHost>; // 智能指针类型定义/*** @brief 构造函数* @param hname 虚拟主机名称* @param basedir 消息存储基础目录* @param dbfile 数据库文件路径*/VirtualHost(const std::string &hname, const std::string &basedir, const std::string &dbfile):_host_name(hname), // 初始化主机名_emp(std::make_shared<ExchangeManager>(dbfile)), // 创建交换机管理器_mqmp(std::make_shared<MsgQueueManager>(dbfile)), // 创建队列管理器_bmp(std::make_shared<BindingManager>(dbfile)), // 创建绑定关系管理器_mmp(std::make_shared<MessageManager>(basedir)) { // 创建消息管理器// 获取到所有的队列信息,通过队列名称恢复历史消息数据QueueMap qm = _mqmp->allQueues();for (auto &q : qm) {_mmp->initQueueMessage(q.first); // 初始化每个队列的消息存储}}/*** @brief 声明交换机* @param name 交换机名称* @param type 交换机类型* @param durable 是否持久化* @param auto_delete 是否自动删除* @param args 额外参数* @return 是否声明成功*/bool declareExchange(const std::string &name,ExchangeType type, bool durable, bool auto_delete,const google::protobuf::Map<std::string, std::string> &args) {return _emp->declareExchange(name, type, durable, auto_delete, args);}/*** @brief 删除交换机* @param name 交换机名称* @note 删除交换机时会同时删除相关的绑定关系*/void deleteExchange(const std::string &name) {// 删除交换机的时候,需要将交换机相关的绑定信息也删除掉。_bmp->removeExchangeBindings(name); // 先删除绑定关系return _emp->deleteExchange(name); // 再删除交换机}/*** @brief 检查交换机是否存在* @param name 交换机名称* @return 是否存在*/bool existsExchange(const std::string &name) {return _emp->exists(name);}/*** @brief 选择指定名称的交换机* @param ename 交换机名称* @return 交换机智能指针*/Exchange::ptr selectExchange(const std::string &ename) {return _emp->selectExchange(ename);}/*** @brief 声明队列* @param qname 队列名称* @param qdurable 是否持久化* @param qexclusive 是否排他* @param qauto_delete 是否自动删除* @param qargs 额外参数* @return 是否声明成功*/bool declareQueue(const std::string &qname, bool qdurable, bool qexclusive,bool qauto_delete,const google::protobuf::Map<std::string, std::string> &qargs) {// 初始化队列的消息句柄(消息的存储管理)// 队列的创建_mmp->initQueueMessage(qname); // 初始化消息存储return _mqmp->declareQueue(qname, qdurable, qexclusive, qauto_delete, qargs);}/*** @brief 删除队列* @param name 队列名称* @note 会同时删除队列的消息和绑定关系*/void deleteQueue(const std::string &name) {// 删除的时候队列相关的数据有两个:队列的消息,队列的绑定信息_mmp->destroyQueueMessage(name); // 删除消息数据_bmp->removeMsgQueueBindings(name); // 删除绑定关系return _mqmp->deleteQueue(name); // 删除队列}/*** @brief 检查队列是否存在* @param name 队列名称* @return 是否存在*/bool existsQueue(const std::string &name) {return _mqmp->exists(name);}/*** @brief 获取所有队列* @return 队列映射表*/QueueMap allQueues() {return _mqmp->allQueues();}/*** @brief 绑定队列到交换机* @param ename 交换机名称* @param qname 队列名称* @param key 路由键* @return 是否绑定成功*/bool bind(const std::string &ename, const std::string &qname, const std::string &key) {Exchange::ptr ep = _emp->selectExchange(ename);if (ep.get() == nullptr) {DLOG("进行队列绑定失败,交换机%s不存在!", ename.c_str());return false;}MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("进行队列绑定失败,队列%s不存在!", qname.c_str());return false;}return _bmp->bind(ename, qname, key, ep->durable && mqp->durable);}/*** @brief 解绑队列和交换机* @param ename 交换机名称* @param qname 队列名称*/void unBind(const std::string &ename, const std::string &qname) {return _bmp->unBind(ename, qname);}/*** @brief 获取交换机的所有绑定关系* @param ename 交换机名称* @return 绑定关系映射表*/MsgQueueBindingMap exchangeBindings(const std::string &ename) {return _bmp->getExchangeBindings(ename);}/*** @brief 检查绑定关系是否存在* @param ename 交换机名称* @param qname 队列名称* @return 是否存在*/bool existsBinding(const std::string &ename, const std::string &qname) {return _bmp->exists(ename, qname);}/*** @brief 发布消息到队列* @param qname 队列名称* @param bp 消息属性* @param body 消息体* @return 是否发布成功*/bool basicPublish(const std::string &qname, BasicProperties *bp, const std::string &body) {MsgQueue::ptr mqp = _mqmp->selectQueue(qname);if (mqp.get() == nullptr) {DLOG("发布消息失败,队列%s不存在!", qname.c_str());return false;}return _mmp->insert(qname, bp, body, mqp->durable);}/*** @brief 消费队列中的消息* @param qname 队列名称* @return 消息指针*/MessagePtr basicConsume(const std::string &qname) {return _mmp->front(qname);}/*** @brief 确认消息已被消费* @param qname 队列名称* @param msgid 消息ID*/void basicAck(const std::string &qname, const std::string &msgid) {return _mmp->ack(qname, msgid);} /*** @brief 清空所有数据*/void clear() {_emp->clear(); // 清空交换机_mqmp->clear(); // 清空队列_bmp->clear(); // 清空绑定关系_mmp->clear(); // 清空消息} private:std::string _host_name; // 虚拟主机名称ExchangeManager::ptr _emp; // 交换机管理器指针MsgQueueManager::ptr _mqmp; // 队列管理器指针BindingManager::ptr _bmp; // 绑定关系管理器指针MessageManager::ptr _mmp; // 消息管理器指针};
}#endif
5.10 交换机路由管理
-
客户端将消息发布到指定的交换机,交换机这时候要考虑这条数据该放入到哪些与自己绑定的队列中,而这个考量是通过交换机类型以及匹配规则来决定的:
-
广播交换:直接将消息交给所有绑定的队列,无需匹配
-
直接交换:队列绑定信息中的 binding_key 与消息中的 routing_key 一致则匹配成功,否则失败。
-
主题交换:只有匹配队列主题的消息才会被放入队列中
-
-
其中广播交换和直接交换,都非常简单,唯一较为难以理解的是主题交换。在这里我们需要先对
binding_key
和routing_key
作以了解:-
binding_key
是由数字字母下划线构成的,并且使用
.
分成若干部分,并支持*
和#
通配符。例如:
news.music.#
,这用于表示交换机绑定的当前队列是一个用于发布音乐新闻的队列。-
支持
*
和#
两种通配符, 但是* #
只能作为.
切分出来的独立部分,不能和其他数字字母混用- 比如
a.*.b
是合法的,a.*a.b
是不合法的 *
可以匹配任意一个单词(注意是单词不是字母)#
可以匹配零个或者多个任意单词(注意是单词不是字母)
- 比如
-
注意事项:
a.#.b
- 一个单词中不能既出现
*
又出现#
, 也就是,一个单词中只能有一个通配符,且必须独立存在 #
通配符两边不能出现其他通配符,因为#
可以匹配任意多个任意单词,因此连续出现是没有意义的。
- 一个单词中不能既出现
-
-
routing_key
是由数据、字母和下划线构成,并且可以使用
.
划分成若干部分。例如:
news.music.pop
,这用于表示当前发布的消息是一个流行音乐的新闻。比如,在进行队列绑定时,某队列的
binding_key
约定为:news.music.#
表示这个队列用于发布音乐新闻。而这时候客户端发布了一条消息,其中routing_key
为:news.music.pop
则可以匹配成功,而,如果发布消息的routing_key
为:news.sport.football
,这时候就会匹配失败。
-
-
匹配算法
-
定义一个二维数组来标记每次匹配的结果,通过最终数组末尾位置的结果来查看是否整体匹配成功。
-
使用
routing_key
中的每个单词,与binding_key
中的单词进行逐个匹配,根据匹配结果来标记数组内容,最终以数组中的末尾标记来确定是否匹配成功。 -
该动态规划的核心主要在推导递推公式, 下面我们通过几个示例来推导递推公式。
-
示例一:
binding_key = "bbb.ddd"
;routing_key = "aaa.ddd"
定义二维数组大小:
dp[2][2]
aaa ddd bbb 0 0 ddd 0 1 binding_key = "aaa.ddd"
;routing_key = "aaa.ddd"
定义二维数组大小:
dp[2][2]
aaa ddd aaa 1 0 ddd 0 1 从上述例子中理解,两个单词匹配成功,并不是将位置无脑标记为 1,而是需要考虑父级单词是否匹配成功,只有父级是匹配成功的,本次匹配成功才有意义。
所以理解一个关键点:当一个
routing_key
单词,与binding_key
单词匹配成功,则应该继承上一个单词(上一行和上一列)的匹配结果单词匹配成功:
dp[i][j] = dp[i - 1][j - 1]
但是,在将思想转换为代码时,我们考虑当
aaa
匹配成功时,从左上继承结果,但是这时候是没有左上位置的,因此对于代码的逻辑就出现了一个例外的点(代码处理额外增加了难度)。因此,为了便于将思想转换为代码,因此我们的数组大小定义行列分别额外多申请一行一列,并将
dp[0][0]
位置置 1dp aaa ddd 1 0 0 aaa 0 1 0 ddd 0 1 这样初始将
dp[0][0]
位置置 1, 其他数组位置全部置 0; 这样只要单词匹配成功,则从左上位置继承结果。 -
示例二:#通配符的特殊
binding_key = "#"
;routing_key = "aaa.bbb"
aaa bbb 1 0 0 # 0 1 0 从这个例子中,能看出,当出现
#
通配符的时候是比较特殊的,如果bbb
与#
匹配成功的时候,从左上继承结果,得到的结果是 0,匹配失败,但是实际结果应该是成功的。因此,得出结论:当遇到通配符 # 时,不仅从左上继承结果,还可以从上一个单词与#的匹配结果处(左边)继承。即:
dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] ;
aaa bbb 1 0 0 # 0 1 1 -
示例三:#通配符的特殊
binding_key = "aaa.#"
;routing_key = "aaa"
aaa 1 0 aaa 0 1 # 0 0 从上例中,看出,当
aaa
与#
匹配成功时,从左边和左上继承的结果这时候都是 0,这也是不合理的。结论,因此当遇到 # 通配符匹配成功时,不仅从 左上,左边继承结果,也可以从上方继承结果。既:dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];
-
示例四:#通配符的特殊
binding_key = "#.aaa"
;routing_key = "aaa";
aaa 1 0 # 0 1 aaa 0 0 观察上述例子,当
aaa
匹配成功时,从左上继承匹配结果,这时候继承到的是 0 ,这是有问题的。因此,当
binding_key
中以起始行以#
开始时,应该将起始行的第 0 列置为 1,以便于后边的匹配结果继承aaa 1 0 # 1 1 aaa 0 1
-
#ifndef __M_ROUTE_H__
#define __M_ROUTE_H__
#include <iostream>
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_msg.pb.h"namespace bitmq {class Router {public:/*** 检查路由键是否合法* @param routing_key 要检查的路由键字符串* @return bool 如果合法返回true,否则返回false* @note 合法字符包括:a~z, A~Z, 0~9, ., _*/static bool isLegalRoutingKey(const std::string &routing_key) {// 遍历路由键中的每个字符for (auto &ch : routing_key) {// 检查字符是否在合法范围内if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.')) {continue; // 字符合法,继续检查下一个}return false; // 发现非法字符,立即返回false}return true; // 所有字符都合法,返回true}/*** 检查绑定键是否合法* @param binding_key 要检查的绑定键字符串* @return bool 如果合法返回true,否则返回false* @note 需要满足三个条件:* 1. 只包含合法字符(a~z, A~Z, 0~9, ., _, *, #)* 2. *和#必须独立存在(不能与其他字符组合)* 3. *和#不能连续出现*/static bool isLegalBindingKey(const std::string &binding_key) {// 1. 检查是否包含非法字符for (auto &ch : binding_key) {if ((ch >= 'a' && ch <= 'z') ||(ch >= 'A' && ch <= 'Z') ||(ch >= '0' && ch <= '9') ||(ch == '_' || ch == '.') ||(ch == '*' || ch == '#')) {continue; // 字符合法,继续检查}return false; // 发现非法字符}// 2. 检查*和#是否独立存在std::vector<std::string> sub_words;StrHelper::split(binding_key, ".", sub_words); // 按点号分割绑定键for (std::string &word : sub_words) {// 如果单词长度大于1且包含*或#,则非法if (word.size() > 1 && (word.find("*") != std::string::npos ||word.find("#") != std::string::npos)) {return false;}}// 3. 检查*和#是否连续出现for (int i = 1; i < sub_words.size(); i++) {// 检查各种非法组合情况if (sub_words[i] == "#" && sub_words[i - 1] == "*") {return false;}if (sub_words[i] == "#" && sub_words[i - 1] == "#") {return false;}if (sub_words[i] == "*" && sub_words[i - 1] == "#") {return false;}}return true; // 所有检查都通过,返回true}/*** 路由匹配函数* @param type 交换机类型(DIRECT/FANOUT/TOPIC)* @param routing_key 路由键* @param binding_key 绑定键* @return bool 是否匹配成功* @note 使用动态规划算法实现主题交换机的模式匹配*/static bool route(ExchangeType type, const std::string &routing_key, const std::string &binding_key) {// 直接交换:要求完全匹配if (type == ExchangeType::DIRECT) {return (routing_key == binding_key);}// 扇出交换:无条件匹配所有else if (type == ExchangeType::FANOUT) {return true;}// 主题交换:需要进行模式匹配// 1. 分割绑定键和路由键std::vector<std::string> bkeys, rkeys;int n_bkey = StrHelper::split(binding_key, ".", bkeys);int n_rkey = StrHelper::split(routing_key, ".", rkeys);// 2. 初始化动态规划数组(dp[i][j]表示bkeys前i个和rkeys前j个是否匹配)std::vector<std::vector<bool>> dp(n_bkey + 1, std::vector<bool>(n_rkey + 1, false));dp[0][0] = true; // 空字符串匹配空字符串// 3. 处理binding_key以#开头的情况(匹配0个或多个单词)for(int i = 1; i <= bkeys.size(); i++) {if (bkeys[i - 1] == "#") {dp[i][0] = true;continue;}break;}// 4. 动态规划填充匹配表for (int i = 1; i <= n_bkey; i++) {for (int j = 1; j <= n_rkey; j++) {// 情况1:当前单词匹配(*或相同单词)if (bkeys[i - 1] == rkeys[j - 1] || bkeys[i - 1] == "*") {dp[i][j] = dp[i - 1][j - 1]; // 继承左上角的结果}// 情况2:当前是#通配符(匹配0个、1个或多个单词)else if (bkeys[i - 1] == "#") {// 从三个方向继承结果(左上、左、上)dp[i][j] = dp[i - 1][j - 1] | dp[i][j - 1] | dp[i - 1][j];}}}return dp[n_bkey][n_rkey]; // 返回最终匹配结果}};
}#endif
5.11 队列消费者/订阅者管理
-
客户端这边每当发起一个订阅请求,意味着服务器这边就多了一个订阅者(处理消息的客户端描述),而这个消费者或者说订阅者它是和队列直接关联的,因为订阅请求中会描述当前用户想要订阅哪一个队列的消息。
-
而一个信道关闭的时候,或者队列被删除的时候,那么这个信道或队列关联的消费者也就没有存在的意义了,因此也需要将相关的消费者信息给删除掉
-
-
基于以上需求,因此需要对订阅者信息进行管理。
-
定义消费者信息结构
a. 消费者标识
b. 订阅的队列名称
c. 一个消息的处理回调函数(实现的是当发布一条消息到队列,则选择消费者进行消费,如何消费?对于服务端来说就是调用这个个回调函数进行处理,其内部逻辑就是找到消费者对应的连接,然后将数据发送给消费者对应的客户端)
void(const std::string&, const BasicProperties&, const std::string&)
d. 是否自动应答标志。(一个消息被消费者消费后,若自动应答,则直接移除待确认消息,否则等待客户端确认)
-
消费者管理–以队列为单元进行管理-队列消费者管理结构
a. 操作:
i. 新增消费者:信道提供的服务是订阅队列消息的时候创建
ii. 删除消费者:取消订阅 / 信道关闭 / 连接关闭 的时候删除
iii. 获取消费者:从队列所有的消费者中按序取出一个消费者进行消息的推送
iv. 判断队列消费者是否为空
v. 判断指定消费者是否存在
vi. 清理队列所有消费者
b. 元素
i. 消费者管理结构:vector
ii. 轮转序号:一个队列可能会有多个消费者,但是一条消息只需要被一个消费者消费即可,因此采用 RR 轮 转
iii. 互斥锁:保证线程安全
iv. 队列名称
-
对消费者进行统一管理结构
a. 初始化/删除队列的消费者信息结构(创建/删除队列的时候初始化)
b. 向指定队列新增消费者(客户端订阅指定队列消息的时候):新增完成的时候返回消费者对象
c. 从指定队列移除消费者(客户端取消订阅的时候)
d. 移除指定队列的所有消费者(队列被删除时销毁):删除消费者的队列管理单元对象
e. 从指定队列获取一个消费者(轮询获取-消费者轮换消费起到负载均衡的作用)
f. 判断队列中消费者是否为空
g. 判断队列中指定消费者是否存在
h. 清理所有消费者
#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__// 引入必要的头文件
#include "../mqcommon/mq_logger.hpp" // 消息队列日志模块
#include "../mqcommon/mq_helper.hpp" // 消息队列辅助工具
#include "../mqcommon/mq_msg.pb.h" // Protobuf消息定义
#include <iostream> // 标准输入输出
#include <unordered_map> // 哈希表容器
#include <mutex> // 互斥锁
#include <memory> // 智能指针
#include <vector> // 动态数组容器
#include <functional> // 函数对象namespace bitmq {// 定义消费者回调函数类型// 参数:消息主题,消息属性指针,消息内容using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;// 消费者结构体struct Consumer {using ptr = std::shared_ptr<Consumer>; // 智能指针别名std::string tag; // 消费者唯一标识std::string qname; // 消费者订阅的队列名称bool auto_ack; // 自动确认标志(true表示自动确认消息)ConsumerCallback callback; // 消息处理回调函数// 默认构造函数Consumer(){DLOG("new Consumer: %p", this); // 调试日志,记录消费者创建}// 带参构造函数Consumer(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb):tag(ctag), qname(queue_name), auto_ack(ack_flag), callback(std::move(cb)) { // 使用move转移回调函数所有权DLOG("new Consumer: %p", this); // 调试日志}// 析构函数~Consumer() {DLOG("del Consumer: %p", this); // 调试日志,记录消费者销毁}};// 以队列为单位的消费者管理类class QueueConsumer {public:using ptr = std::shared_ptr<QueueConsumer>; // 智能指针别名// 构造函数,初始化队列名称和轮转序号QueueConsumer(const std::string &qname) : _qname(qname), _rr_seq(0){}// 队列新增消费者Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb) {// 1. 加锁保证线程安全std::unique_lock<std::mutex> lock(_mutex);// 2. 判断消费者是否已存在(防止重复添加)for (auto &consumer : _consumers) {if (consumer->tag == ctag) {return Consumer::ptr(); // 已存在则返回空指针}}// 3. 创建新的消费者对象auto consumer = std::make_shared<Consumer>(ctag, queue_name, ack_flag, cb);// 4. 添加到消费者列表并返回_consumers.push_back(consumer);return consumer;}// 队列移除指定消费者void remove(const std::string &ctag) {// 1. 加锁保证线程安全std::unique_lock<std::mutex> lock(_mutex);// 2. 遍历查找并删除指定消费者for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {_consumers.erase(it);return ; // 找到并删除后直接返回}}return; // 未找到直接返回}// 轮转方式获取消费者(Round-Robin算法)Consumer::ptr choose() {// 1. 加锁保证线程安全std::unique_lock<std::mutex> lock(_mutex);// 如果当前没有消费者,返回空指针if (_consumers.size() == 0) {return Consumer::ptr();}// 2. 计算轮转下标并更新序号int idx = _rr_seq % _consumers.size();_rr_seq++; // 自增轮转序号// 3. 返回对应消费者return _consumers[idx];}// 判断当前队列是否没有消费者bool empty() {std::unique_lock<std::mutex> lock(_mutex);return _consumers.size() == 0;}// 检查指定消费者是否存在bool exists(const std::string &ctag) {std::unique_lock<std::mutex> lock(_mutex);// 遍历查找指定消费者for (auto it = _consumers.begin(); it != _consumers.end(); ++it) {if ((*it)->tag == ctag) {return true; // 找到返回true}}return false; // 未找到返回false}// 清空当前队列的所有消费者void clear() {std::unique_lock<std::mutex> lock(_mutex);_consumers.clear(); // 清空消费者列表_rr_seq = 0; // 重置轮转序号}private:std::string _qname; // 队列名称std::mutex _mutex; // 互斥锁,保证线程安全uint64_t _rr_seq; // 轮转序号(用于Round-Robin算法)std::vector<Consumer::ptr> _consumers; // 消费者指针列表};// 消费者管理器类(管理所有队列的消费者)class ConsumerManager {public:using ptr = std::shared_ptr<ConsumerManager>; // 智能指针别名ConsumerManager(){} // 默认构造函数// 初始化指定队列的消费者管理单元void initQueueConsumer(const std::string &qname) {// 1. 加锁保证线程安全std::unique_lock<std::mutex> lock(_mutex);// 2. 检查是否已存在该队列的管理单元auto it = _qconsumers.find(qname);if (it != _qconsumers.end()) {return ; // 已存在则直接返回}// 3. 创建新的队列消费者管理单元并添加到mapauto qconsumers = std::make_shared<QueueConsumer>(qname);_qconsumers.insert(std::make_pair(qname, qconsumers));}// 销毁指定队列的消费者管理单元void destroyQueueConsumer(const std::string &qname) {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.erase(qname); // 从map中移除}// 创建消费者(委托给对应队列的QueueConsumer处理)Consumer::ptr create(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb) {QueueConsumer::ptr qcp;{// 1. 加锁查找对应的队列消费者管理单元std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr(); // 未找到返回空指针}qcp = it->second;}// 2. 调用QueueConsumer的create方法return qcp->create(ctag, queue_name, ack_flag, cb);}// 移除指定消费者(委托给对应队列的QueueConsumer处理)void remove(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{// 1. 加锁查找对应的队列消费者管理单元std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return ; // 未找到直接返回}qcp = it->second;}// 2. 调用QueueConsumer的remove方法return qcp->remove(ctag);}// 轮转选择消费者(委托给对应队列的QueueConsumer处理)Consumer::ptr choose(const std::string &queue_name) {QueueConsumer::ptr qcp;{// 1. 加锁查找对应的队列消费者管理单元std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return Consumer::ptr(); // 未找到返回空指针}qcp = it->second;}// 2. 调用QueueConsumer的choose方法return qcp->choose();}// 检查指定队列是否为空(没有消费者)bool empty(const std::string &queue_name) {QueueConsumer::ptr qcp;{// 1. 加锁查找对应的队列消费者管理单元std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false; // 未找到认为非空}qcp = it->second;}// 2. 调用QueueConsumer的empty方法return qcp->empty();}// 检查指定消费者是否存在(委托给对应队列的QueueConsumer处理)bool exists(const std::string &ctag, const std::string &queue_name) {QueueConsumer::ptr qcp;{// 1. 加锁查找对应的队列消费者管理单元std::unique_lock<std::mutex> lock(_mutex);auto it = _qconsumers.find(queue_name);if (it == _qconsumers.end()) {DLOG("没有找到队列 %s 的消费者管理句柄!", queue_name.c_str());return false; // 未找到认为不存在}qcp = it->second;}// 2. 调用QueueConsumer的exists方法return qcp->exists(ctag);}// 清空所有队列的消费者管理单元void clear() {std::unique_lock<std::mutex> lock(_mutex);_qconsumers.clear(); // 清空整个map}private:std::mutex _mutex; // 互斥锁,保证线程安全// 队列名称到队列消费者管理单元的映射std::unordered_map<std::string, QueueConsumer::ptr> _qconsumers;};
}#endif
5.12 信道管理
- 在 AMQP 模型中,除了通信连接 Connection 概念外,还有一个 Channel 的概念,Channel 是针对 Connection 连接的一个更细粒度的通信信道,多个 Channel 可以使用同一个通信连接 Connection 进行通信,但是同一个 Connection 的 Channel 之间相互独立。
- 而信道模块就是再次将上述所讲模块进行整合提供服务的模块,其通信协议引用下面的网络通信协议设计
-
管理信息:
a. 信道 ID:信道的唯一标识
b. 信道关联的消费者:用于消费者信道在关闭的时候取消订阅,删除订阅者信息
c. 信道关联的连接:用于向客户端发送数据(响应,推送的消息)
d. protobuf 协议处理句柄:网络通信前的协议处理
e. 消费者管理句柄:信道关闭/取消订阅的时候,通过句柄删除订阅者信息
f. 虚拟机句柄:交换机/队列/绑定/消息数据管理
g. 工作线程池句柄(一条消息被发布到队列后,需要将消息推送给订阅了对应队列的消费者,过程由线程池完成)
-
管理操作:
a. 提供声明&删除交换机操作(删除交换机的同时删除交换机关联的绑定信息)
b. 提供声明&删除队列操作(删除队列的同时,删除队列关联的绑定信息,消息,消费者信息)
c. 提供绑定&解绑队列操作
d. 提供订阅&取消订阅队列消息操作
e. 提供发布&确认消息操作
-
信道管理:
a. 信道的增删查
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__// 包含必要的头文件
#include "muduo/net/TcpConnection.h" // Muduo网络库的TCP连接类
#include "muduo/proto/codec.h" // Protobuf编解码器
#include "muduo/proto/dispatcher.h" // Protobuf消息分发器
#include "../mqcommon/mq_logger.hpp" // MQ日志工具
#include "../mqcommon/mq_helper.hpp" // MQ辅助工具
#include "../mqcommon/mq_msg.pb.h" // MQ消息Protobuf定义
#include "../mqcommon/mq_proto.pb.h" // MQ协议Protobuf定义
#include "../mqcommon/mq_threadpool.hpp" // MQ线程池
#include "mq_consumer.hpp" // MQ消费者相关
#include "mq_host.hpp" // MQ虚拟主机
#include "mq_route.hpp" // MQ路由namespace bitmq {// 定义各种请求的智能指针类型别名using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using openChannelRequestPtr = std::shared_ptr<openChannelRequest>;using closeChannelRequestPtr = std::shared_ptr<closeChannelRequest>;using declareExchangeRequestPtr = std::shared_ptr<declareExchangeRequest>;using deleteExchangeRequestPtr = std::shared_ptr<deleteExchangeRequest>;using declareQueueRequestPtr = std::shared_ptr<declareQueueRequest>;using deleteQueueRequestPtr = std::shared_ptr<deleteQueueRequest>;using queueBindRequestPtr = std::shared_ptr<queueBindRequest>;using queueUnBindRequestPtr = std::shared_ptr<queueUnBindRequest>;using basicPublishRequestPtr = std::shared_ptr<basicPublishRequest>;using basicAckRequestPtr = std::shared_ptr<basicAckRequest>;using basicConsumeRequestPtr = std::shared_ptr<basicConsumeRequest>;using basicCancelRequestPtr = std::shared_ptr<basicCancelRequest>;// Channel类,表示MQ中的一个信道class Channel {public:using ptr = std::shared_ptr<Channel>; // Channel的智能指针类型别名// 构造函数Channel(const std::string &id, // 信道IDconst VirtualHost::ptr &host, // 所属虚拟主机const ConsumerManager::ptr &cmp, // 消费者管理器const ProtobufCodecPtr &codec, // Protobuf编解码器const muduo::net::TcpConnectionPtr &conn, // TCP连接const threadpool::ptr &pool): // 线程池_cid(id),_conn(conn),_codec(codec),_cmp(cmp),_host(host),_pool(pool){DLOG("new Channel: %p", this); // 调试日志,记录新创建的Channel}// 析构函数~Channel() {// 如果存在消费者,从消费者管理器中移除if (_consumer.get() != nullptr) {_cmp->remove(_consumer->tag, _consumer->qname);}DLOG("del Channel: %p", this); // 调试日志,记录销毁的Channel}// 声明交换机void declareExchange(const declareExchangeRequestPtr &req) {// 调用虚拟主机的declareExchange方法声明交换机bool ret = _host->declareExchange(req->exchange_name(), req->exchange_type(), req->durable(), req->auto_delete(), req->args());// 返回基本响应return basicResponse(ret, req->rid(), req->cid());}// 删除交换机void deleteExchange(const deleteExchangeRequestPtr &req) {// 调用虚拟主机的deleteExchange方法删除交换机_host->deleteExchange(req->exchange_name());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 声明队列void declareQueue(const declareQueueRequestPtr &req) {// 调用虚拟主机的declareQueue方法声明队列bool ret = _host->declareQueue(req->queue_name(),req->durable(), req->exclusive(),req->auto_delete(), req->args());if (ret == false) {return basicResponse(false, req->rid(), req->cid());}// 初始化该队列的消费者管理器_cmp->initQueueConsumer(req->queue_name());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 删除队列void deleteQueue(const deleteQueueRequestPtr &req) {// 销毁该队列的消费者管理器_cmp->destroyQueueConsumer(req->queue_name());// 调用虚拟主机的deleteQueue方法删除队列_host->deleteQueue(req->queue_name());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 队列绑定void queueBind(const queueBindRequestPtr &req) {// 调用虚拟主机的bind方法绑定队列到交换机bool ret = _host->bind(req->exchange_name(), req->queue_name(), req->binding_key());// 返回基本响应return basicResponse(ret, req->rid(), req->cid());}// 队列解绑void queueUnBind(const queueUnBindRequestPtr &req) {// 调用虚拟主机的unBind方法解绑队列_host->unBind(req->exchange_name(), req->queue_name());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 发布消息void basicPublish(const basicPublishRequestPtr &req) {// 1. 判断交换机是否存在auto ep = _host->selectExchange(req->exchange_name());if (ep.get() == nullptr) {return basicResponse(false, req->rid(), req->cid());}// 2. 进行交换路由,判断消息可以发布到交换机绑定的哪个队列中MsgQueueBindingMap mqbm = _host->exchangeBindings(req->exchange_name());BasicProperties *properties = nullptr;std::string routing_key;// 如果有消息属性,获取路由键if (req->has_properties()) {properties = req->mutable_properties();routing_key = properties->routing_key();}// 遍历所有绑定关系for (auto &binding : mqbm) {// 使用路由器判断消息是否应该路由到这个队列if (Router::route(ep->type, routing_key, binding.second->binding_key)) {// 3. 将消息添加到队列中_host->basicPublish(binding.first, properties, req->body());// 4. 向线程池中添加一个消息消费任务auto task = std::bind(&Channel::consume, this, binding.first);_pool->push(task);}}// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 消息确认void basicAck(const basicAckRequestPtr &req) {// 调用虚拟主机的basicAck方法确认消息_host->basicAck(req->queue_name(), req->message_id());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 订阅队列消息void basicConsume(const basicConsumeRequestPtr &req) {// 1. 判断队列是否存在bool ret = _host->existsQueue(req->queue_name());if (ret == false) {return basicResponse(false, req->rid(), req->cid());}// 2. 创建队列的消费者auto cb = std::bind(&Channel::callback, this, std::placeholders::_1,std::placeholders::_2, std::placeholders::_3);// 创建消费者,当前channel角色变为消费者_consumer = _cmp->create(req->consumer_tag(), req->queue_name(), req->auto_ack(), cb);// 返回基本响应return basicResponse(true, req->rid(), req->cid());}// 取消订阅void basicCancel(const basicCancelRequestPtr &req) {// 从消费者管理器中移除指定消费者_cmp->remove(req->consumer_tag(), req->queue_name());// 返回基本响应return basicResponse(true, req->rid(), req->cid());}private:// 消费者回调函数,用于推送消息给客户端void callback(const std::string tag, const BasicProperties *bp, const std::string &body) {// 组织消费响应消息basicConsumeResponse resp;resp.set_cid(_cid);resp.set_body(body);resp.set_consumer_tag(tag);// 如果有消息属性,设置到响应中if (bp) {resp.mutable_properties()->set_id(bp->id());resp.mutable_properties()->set_delivery_mode(bp->delivery_mode());resp.mutable_properties()->set_routing_key(bp->routing_key());}// 通过编解码器发送响应_codec->send(_conn, resp);}// 消费指定队列的消息void consume(const std::string &qname) {// 1. 从队列中取出一条消息MessagePtr mp = _host->basicConsume(qname);if (mp.get() == nullptr) {DLOG("执行消费任务失败,%s 队列没有消息!", qname.c_str());return;}// 2. 从队列订阅者中取出一个订阅者Consumer::ptr cp = _cmp->choose(qname);if (cp.get() == nullptr) {DLOG("执行消费任务失败,%s 队列没有消费者!", qname.c_str());return;}// 3. 调用订阅者的回调函数,实现消息推送cp->callback(cp->tag, mp->mutable_payload()->mutable_properties(), mp->payload().body());// 4. 如果是自动确认模式,直接确认消息if (cp->auto_ack) _host->basicAck(qname, mp->payload().properties().id());}// 发送基本响应void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid); // 设置请求IDresp.set_cid(cid); // 设置信道IDresp.set_ok(ok); // 设置操作结果_codec->send(_conn, resp); // 发送响应}private:std::string _cid; // 信道IDConsumer::ptr _consumer; // 消费者指针muduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufCodecPtr _codec; // Protobuf编解码器ConsumerManager::ptr _cmp; // 消费者管理器VirtualHost::ptr _host; // 虚拟主机threadpool::ptr _pool; // 线程池};// Channel管理器类,用于管理多个Channelclass ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>; // 智能指针类型别名ChannelManager(){} // 默认构造函数// 打开一个Channelbool openChannel(const std::string &id, // 信道IDconst VirtualHost::ptr &host, // 虚拟主机const ConsumerManager::ptr &cmp, // 消费者管理器const ProtobufCodecPtr &codec, // 编解码器const muduo::net::TcpConnectionPtr &conn, // TCP连接const threadpool::ptr &pool) { // 线程池std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全// 检查信道是否已存在auto it = _channels.find(id);if (it != _channels.end()) {DLOG("信道:%s 已经存在!", id.c_str());return false;}// 创建新的Channel并添加到管理器中auto channel = std::make_shared<Channel>(id, host, cmp, codec, conn, pool);_channels.insert(std::make_pair(id, channel));return true;}// 关闭一个Channelvoid closeChannel(const std::string &id){std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全_channels.erase(id); // 从管理器中移除指定Channel}// 获取指定ChannelChannel::ptr getChannel(const std::string &id) {std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全auto it = _channels.find(id);if (it == _channels.end()) {return Channel::ptr(); // 返回空指针如果没找到}return it->second; // 返回找到的Channel}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<std::string, Channel::ptr> _channels; // Channel存储容器};
}#endif
-
线程池
#ifndef __M_THRPOOL_H__ #define __M_THRPOOL_H__ #include <iostream> #include <functional> #include <memory> #include <thread> #include <future> #include <mutex> #include <condition_variable> #include <vector>class threadpool {public:using ptr = std::shared_ptr<threadpool>;using Functor = std::function<void(void)>;threadpool(int thr_count = 1) : _stop(false){for (int i = 0; i < thr_count; i++) {_threads.emplace_back(&threadpool::entry, this);}}~threadpool() {stop();}void stop() {if (_stop == true) return;_stop = true;_cv.notify_all();for (auto &thread : _threads) {thread.join();}}//push传入的是首先有一个函数--用户要执行的函数, 接下来是不定参,表示要处理的数据也就是要传入到函数中的参数//push函数内部,会将这个传入的函数封装成一个异步任务(packaged_task),//使用lambda生成一个可调用对象(内部执行异步任务),抛入到任务池中,由工作线程取出进行执行template<typename F, typename ...Args>auto push(F &&func, Args&& ...args) -> std::future<decltype(func(args...))> {//1. 将传入的函数封装成一个packaged_task任务using return_type = decltype(func(args...));auto tmp_func = std::bind(std::forward<F>(func), std::forward<Args>(args)...);auto task = std::make_shared<std::packaged_task<return_type()>>(tmp_func);std::future<return_type> fu = task->get_future();//2. 构造一个lambda匿名函数(捕获任务对象),函数内执行任务对象{std::unique_lock<std::mutex> lock(_mutex);//3. 将构造出来的匿名函数对象,抛入到任务池中_taskpool.push_back( [task](){ (*task)(); } );_cv.notify_one();}return fu;}private://线程入口函数---内部不断的从任务池中取出任务进行执行。void entry() {while(!_stop){std::vector<Functor> tmp_taskpool;{//加锁std::unique_lock<std::mutex> lock(_mutex);//等待任务池不为空,或者_stop被置位返回,_cv.wait(lock, [this](){ return _stop || !_taskpool.empty(); });//取出任务进行执行tmp_taskpool.swap(_taskpool);}for (auto &task : tmp_taskpool) {task();}}}private:std::atomic<bool> _stop;std::vector<Functor> _taskpool;//任务池std::mutex _mutex;std::condition_variable _cv;std::vector<std::thread> _threads; }; #endif
5.13 连接管理
向用户提供一个用于实现网络通信的 Connection
对象,从其内部可创建出粒度更轻的Channel
对象,用于与客户端进行网络通信。
-
成员信息:
a. 连接关联的信道管理句柄(实现信道的增删查)
b. 连接关联的实际用于通信的 muduo::net::Connection 连接
c. protobuf 协议处理的句柄(ProtobufCodec 对象)
d. 消费者管理句柄
e. 虚拟机句柄
f. 异步工作线程池句柄
-
连接操作:
a. 提供创建 Channel 信道的操作
b. 提供删除 Channel 信道的操作
-
连接管理:
a. 连接的增删查
#include "mq_channel.hpp"namespace bitmq {/*** @class Connection* @brief 表示一个客户端连接,管理该连接下的所有信道*/class Connection {public:using ptr = std::shared_ptr<Connection>; // 智能指针别名/*** @brief 构造函数* @param host 虚拟主机指针* @param cmp 消费者管理器指针* @param codec Protobuf编解码器指针* @param conn TCP连接指针* @param pool 线程池指针*/Connection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) :_conn(conn), // 初始化TCP连接_codec(codec), // 初始化编解码器_cmp(cmp), // 初始化消费者管理器_host(host), // 初始化虚拟主机_pool(pool), // 初始化线程池_channels(std::make_shared<ChannelManager>()){} // 创建信道管理器/*** @brief 打开一个新信道* @param req 打开信道请求指针*/void openChannel(const openChannelRequestPtr &req) {// 1. 判断信道ID是否重复,创建信道bool ret = _channels->openChannel(req->cid(), _host, _cmp, _codec, _conn, _pool);if (ret == false) {DLOG("创建信道的时候,信道ID重复了");// 2. 如果创建失败,返回错误响应return basicResponse(false, req->rid(), req->cid());}DLOG("%s 信道创建成功!", req->cid().c_str());// 3. 给客户端进行回复return basicResponse(true, req->rid(), req->cid());}/*** @brief 关闭指定信道* @param req 关闭信道请求指针*/void closeChannel(const closeChannelRequestPtr &req) {_channels->closeChannel(req->cid());return basicResponse(true, req->rid(), req->cid());}/*** @brief 获取指定信道* @param cid 信道ID* @return 信道指针*/Channel::ptr getChannel(const std::string &cid) {return _channels->getChannel(cid);}private:/*** @brief 发送基础响应* @param ok 操作是否成功* @param rid 请求ID* @param cid 信道ID*/void basicResponse(bool ok, const std::string &rid, const std::string &cid) {basicCommonResponse resp;resp.set_rid(rid); // 设置请求IDresp.set_cid(cid); // 设置信道IDresp.set_ok(ok); // 设置操作结果_codec->send(_conn, resp); // 发送响应}private:muduo::net::TcpConnectionPtr _conn; // TCP连接指针ProtobufCodecPtr _codec; // Protobuf编解码器ConsumerManager::ptr _cmp; // 消费者管理器VirtualHost::ptr _host; // 虚拟主机threadpool::ptr _pool; // 线程池ChannelManager::ptr _channels; // 信道管理器};/*** @class ConnectionManager* @brief 管理所有客户端连接*/class ConnectionManager {public:using ptr = std::shared_ptr<ConnectionManager>; // 智能指针别名ConnectionManager() {} // 默认构造函数/*** @brief 创建新连接* @param host 虚拟主机指针* @param cmp 消费者管理器指针* @param codec Protobuf编解码器指针* @param conn TCP连接指针* @param pool 线程池指针*/void newConnection(const VirtualHost::ptr &host, const ConsumerManager::ptr &cmp, const ProtobufCodecPtr &codec, const muduo::net::TcpConnectionPtr &conn,const threadpool::ptr &pool) {std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全auto it = _conns.find(conn);if (it != _conns.end()) {return ; // 如果连接已存在,直接返回}// 创建新连接对象并添加到连接映射中Connection::ptr self_conn = std::make_shared<Connection>(host, cmp, codec, conn, pool);_conns.insert(std::make_pair(conn, self_conn));}/*** @brief 删除指定连接* @param conn TCP连接指针*/void delConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全_conns.erase(conn); // 从映射中移除连接}/*** @brief 获取指定连接* @param conn TCP连接指针* @return 连接指针,如果不存在返回空指针*/Connection::ptr getConnection(const muduo::net::TcpConnectionPtr &conn) {std::unique_lock<std::mutex> lock(_mutex); // 加锁保证线程安全auto it = _conns.find(conn);if (it == _conns.end()) {return Connection::ptr(); // 返回空指针}return it->second; // 返回找到的连接}private:std::mutex _mutex; // 互斥锁,保证线程安全std::unordered_map<muduo::net::TcpConnectionPtr, Connection::ptr> _conns; // 连接映射表};
}
注:在RabbitMQ
中,虚拟主机是可以随意创建**/**删除的, 但是咱们此处为了实现简单,并没有实现虚拟主机的管理,因此我们默认就只有一个虚拟主机的存在,但是在数据结构的设计上我们预留了对于多虚拟主机的管理,从而保证不同虚拟主机中的Exchange
、Queue
、Binding
、Message
等资源都是相互隔离的
6. 网络通信协议设计
6.1 需求确认
-
这个章节我们考虑客户端和服务器之间的通信方式。回顾 MQ 的交互模型:
-
-
其中生产者和消费者都是客户端, 它们都需要通过网络和
Broker Server
进行通信。具体通信的过程我们使用Muduo
库来实现, 使用 TCP 作为通信的底层协议, 同时在这个基础上自定义应用层协议, 完成客户端对服务器功能的远端调用。 我们要实现的远端调用接口包括:- 创建 channel
- 关闭 channel
- 创建 exchange
- 删除 exchange
- 创建 queue
- 删除 queue
- 创建 binding
- 删除 binding
- 发送 message
- 订阅 message
- 发送 ack
- 返回 message (服务器 -> 客户端)
6.2 设计应用层协议
使用二进制的方式设计应用层协议。 因为 MQMessage
的消息体是使用 Protobuf
进行序列化的,本身是按照二进制存储的,所以不太适合用 json
等文本格式来定义协议。下面我们设计一下应用层协议:请求/响应报文设计。
len
:4 个字节,表示整个报文的长度nameLen
: 4 个字节, 表示typeName
数组的长度typeName
:是个字节数组, 占nameLen
个字节, 表示请求/响应报文的类型名,作用是分发不同消息到对应的远端接口调用中protobufData
:是个字节数组, 占len - nameLen - 8
个字节, 表示请求/响应参数数据通过protobuf
序列化之后的二进制checkSum
:4 个字节, 表示整个消息的校验和, 作用是为了校验请求/响应报文的完整性
6.3 定义请求/响应参数
因为这里的参数需要进行网络传输以及序列化, 所以我们需要将参数定义在 pb 文件中。
// 指定使用proto3语法
syntax = "proto3";// 定义包名为bitmq
package bitmq;// 导入mq_msg.proto文件
import "mq_msg.proto";// 信道的打开与关闭相关消息定义
message openChannelRequest {string rid = 1; // 请求ID,用于标识请求string cid = 2; // 连接ID,标识客户端连接
};message closeChannelRequest {string rid = 1; // 请求IDstring cid = 2; // 连接ID
};// 交换机的声明与删除相关消息定义
message declareExchangeRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring exchange_name = 3; // 交换机名称ExchangeType exchange_type = 4; // 交换机类型(从mq_msg.proto导入)bool durable = 5; // 是否持久化bool auto_delete = 6; // 是否自动删除map<string, string> args = 7; // 额外参数
};message deleteExchangeRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring exchange_name = 3; // 要删除的交换机名称
};// 队列的声明与删除相关消息定义
message declareQueueRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring queue_name = 3; // 队列名称bool exclusive = 4; // 是否排他队列bool durable = 5; // 是否持久化bool auto_delete = 6; // 是否自动删除map<string, string> args = 7; // 额外参数
};message deleteQueueRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring queue_name = 3; // 要删除的队列名称
};// 队列的绑定与解除绑定相关消息定义
message queueBindRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring exchange_name = 3; // 交换机名称string queue_name = 4; // 队列名称string binding_key = 5; // 绑定键
};message queueUnBindRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring exchange_name = 3; // 交换机名称string queue_name = 4; // 队列名称
};// 消息发布相关消息定义
message basicPublishRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring exchange_name = 3; // 目标交换机名称string body = 4; // 消息内容BasicProperties properties = 5; // 消息属性(从mq_msg.proto导入)
};// 消息确认相关消息定义
message basicAckRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring queue_name = 3; // 队列名称string message_id = 4; // 要确认的消息ID
};// 队列订阅相关消息定义
message basicConsumeRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring consumer_tag = 3; // 消费者标签string queue_name = 4; // 要订阅的队列名称bool auto_ack = 5; // 是否自动确认消息
};// 取消订阅相关消息定义
message basicCancelRequest {string rid = 1; // 请求IDstring cid = 2; // 连接IDstring consumer_tag = 3; // 要取消的消费者标签string queue_name = 4; // 队列名称
};// 消息推送相关消息定义(服务端->客户端)
message basicConsumeResponse {string cid = 1; // 连接IDstring consumer_tag = 2; // 消费者标签string body = 3; // 消息内容BasicProperties properties = 4; // 消息属性
};// 通用响应消息定义
message basicCommonResponse {string rid = 1; // 对应的请求IDstring cid = 2; // 连接IDbool ok = 3; // 操作是否成功
}
示例:创建一个交换机的请求,如下图所示:
按照 len - nameLen - 8
的长度读取出 protobufData
就可以将读到的二进制数据反序列化成 ExchangeDeclareArguments
对象进行后续处理。后续的请求报文和这里都是类似的。
7. 服务器模块实现
服务器模块我们借助Muduo
网络库来实现。
-
_server
:Muduo
库提供的一个通用 TCP 服务器, 我们可以封装这个服务器进行TCP 通信 -
_baseloop
:主事件循环器, 用于响应 IO 事件和定时器事件,主 loop 主要是为了响应监听描述符的 IO 事件 -
_codec
:一个protobuf
编解码器, 我们在 TCP 服务器上设计了一层应用层协议,这个编解码器主要就是负责实现应用层协议的解析和封装, 下边具体讲解 -
_dispatcher
:一个消息分发器, 当Socket
接收到一个报文消息后, 我们需要按照消息的类型, 即上面提到的typeName
进行消息分发, 会对不同类型的消息分发相对应的的处理函数中,下边具体讲解 -
_consumer
:服务器中的消费者信息管理句柄。 -
_threadpool
:异步工作线程池,主要用于队列消息的推送工作。 -
_connections
:连接管理句柄,管理当前服务器上的所有已经建立的通信连接。 -
_virtual_host
:服务器持有的虚拟主机。 队列、交换机 、绑定、消息等数据都是通过虚拟主机管理 -
创建
MQBrokerServer
BrokerServer
模块是对整体服务器所有模块的整合,接收客户端的请求,并提供服务。基于前边实现的简单的翻译服务器代码,进行改造,只需要实现服务器内部提供服务的各个业务接口即可。
在各个业务处理函数中,也比较简单,创建信道后,每次请求过来后,找到请求对应的信道句柄,通过句柄调用前边封装好的处理接口进行请求处理,最终返回处理结果。
#ifndef __M_BROKER_H__ #define __M_BROKER_H__// 包含必要的头文件 #include "muduo/proto/codec.h" // Protobuf编解码器 #include "muduo/proto/dispatcher.h" // Protobuf消息分发器 #include "muduo/base/Logging.h" // 日志记录 #include "muduo/base/Mutex.h" // 互斥锁 #include "muduo/net/EventLoop.h" // 事件循环 #include "muduo/net/TcpServer.h" // TCP服务器 #include "../mqcommon/mq_threadpool.hpp" // 线程池 #include "../mqcommon/mq_msg.pb.h" // 消息协议定义 #include "../mqcommon/mq_proto.pb.h" // 协议定义 #include "../mqcommon/mq_logger.hpp" // 日志记录器 #include "mq_connection.hpp" // 连接管理 #include "mq_consumer.hpp" // 消费者管理 #include "mq_host.hpp" // 虚拟主机管理namespace bitmq {// 定义常量#define DBFILE "/meta.db" // 数据库文件路径#define HOSTNAME "MyVirtualHost" // 虚拟主机名称// 服务器类定义class Server {public:// 消息指针类型定义typedef std::shared_ptr<google::protobuf::Message> MessagePtr;// 构造函数Server(int port, const std::string &basedir): // 初始化TCP服务器,监听指定端口_server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),// 初始化消息分发器,设置未知消息处理回调_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),// 初始化Protobuf编解码器,绑定到分发器的消息处理函数_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),// 初始化虚拟主机,指定名称、基础目录和数据库文件路径_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),// 初始化消费者管理器_consumer_manager(std::make_shared<ConsumerManager>()),// 初始化连接管理器_connection_manager(std::make_shared<ConnectionManager>()),// 初始化线程池_threadpool(std::make_shared<threadpool>()) {// 针对历史消息中的所有队列,初始化队列的消费者管理结构QueueMap qm = _virtual_host->allQueues();for (auto &q : qm) {_consumer_manager->initQueueConsumer(q.first);}// 注册各种业务请求处理函数_dispatcher.registerMessageCallback<bitmq::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<bitmq::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置服务器消息回调函数,绑定到编解码器的消息处理函数_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置服务器连接回调函数_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));}// 启动服务器void start() {_server.start(); // 启动TCP服务器_baseloop.loop(); // 启动事件循环}private:// 打开信道请求处理void onOpenChannel(const muduo::net::TcpConnectionPtr& conn, const openChannelRequestPtr& message, muduo::Timestamp) {// 获取连接对应的Connection对象Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("打开信道时,没有找到连接对应的Connection对象!");conn->shutdown(); // 关闭连接return;}// 调用Connection对象的打开信道方法return mconn->openChannel(message);}// 关闭信道请求处理void onCloseChannel(const muduo::net::TcpConnectionPtr& conn, const closeChannelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("关闭信道时,没有找到连接对应的Connection对象!");conn->shutdown();return;}return mconn->closeChannel(message);}// 声明交换机请求处理void onDeclareExchange(const muduo::net::TcpConnectionPtr& conn, const declareExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}// 获取指定信道Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明交换机时,没有找到信道!");return;}return cp->declareExchange(message);}// 删除交换机请求处理void onDeleteExchange(const muduo::net::TcpConnectionPtr& conn, const deleteExchangeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除交换机时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除交换机时,没有找到信道!");return;}return cp->deleteExchange(message);}// 声明队列请求处理void onDeclareQueue(const muduo::net::TcpConnectionPtr& conn, const declareQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("声明队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("声明队列时,没有找到信道!");return;}return cp->declareQueue(message);}// 删除队列请求处理void onDeleteQueue(const muduo::net::TcpConnectionPtr& conn, const deleteQueueRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("删除队列时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("删除队列时,没有找到信道!");return;}return cp->deleteQueue(message);}// 队列绑定请求处理void onQueueBind(const muduo::net::TcpConnectionPtr& conn, const queueBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列绑定时,没有找到信道!");return;}return cp->queueBind(message);}// 队列解绑请求处理void onQueueUnBind(const muduo::net::TcpConnectionPtr& conn, const queueUnBindRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列解除绑定时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列解除绑定时,没有找到信道!");return;}return cp->queueUnBind(message);}// 消息发布请求处理void onBasicPublish(const muduo::net::TcpConnectionPtr& conn, const basicPublishRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("发布消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("发布消息时,没有找到信道!");return;}return cp->basicPublish(message);}// 消息确认请求处理void onBasicAck(const muduo::net::TcpConnectionPtr& conn, const basicAckRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("确认消息时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("确认消息时,没有找到信道!");return;}return cp->basicAck(message);}// 队列消息订阅请求处理void onBasicConsume(const muduo::net::TcpConnectionPtr& conn, const basicConsumeRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息订阅时,没有找到信道!");return;}return cp->basicConsume(message);}// 队列消息取消订阅请求处理void onBasicCancel(const muduo::net::TcpConnectionPtr& conn, const basicCancelRequestPtr& message, muduo::Timestamp) {Connection::ptr mconn = _connection_manager->getConnection(conn);if (mconn.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到连接对应的Connection对象!");conn->shutdown();return;}Channel::ptr cp = mconn->getChannel(message->cid());if (cp.get() == nullptr) {DLOG("队列消息取消订阅时,没有找到信道!");return;}return cp->basicCancel(message);}// 未知消息处理void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName(); // 记录未知消息类型conn->shutdown(); // 关闭连接}// 连接状态变化处理void onConnection(const muduo::net::TcpConnectionPtr &conn) {if (conn->connected()) {// 新建连接时,创建对应的Connection对象_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);} else {// 连接断开时,删除对应的Connection对象_connection_manager->delConnection(conn);}}private:muduo::net::EventLoop _baseloop; // 基础事件循环muduo::net::TcpServer _server; // TCP服务器对象ProtobufDispatcher _dispatcher; // 请求分发器对象ProtobufCodecPtr _codec; // Protobuf协议处理器VirtualHost::ptr _virtual_host; // 虚拟主机管理对象ConsumerManager::ptr _consumer_manager; // 消费者管理对象ConnectionManager::ptr _connection_manager;// 连接管理对象threadpool::ptr _threadpool; // 线程池对象}; }#endif
8. 客户端模块实现
在 RabbitMQ
中,提供服务的是信道,因此在客户端的实现中,弱化了 Client
客户端的概念,也就是说在 RabbitMQ
中并不会向用户展示网络通信的概念出来,而是以一种提供服务的形式来体现。
其实现思想类似于普通的功能接口封装,一个接口实现一个功能,接口内部完成向客户端请求的过程,但是对外并不需要体现出客户端与服务端通信的概念,用户需要什么服务就调用什么接口就行。
基于以上的思想,客户端的实现共分为四大模块:
-
订阅者模块:
- 一个并不直接对用户展示的模块,其在客户端体现的作用就是对于角色的描述,表示这是一个消费者。
-
信道模块:
- 一个直接面向用户的模块,内部包含多个向外提供的服务接口,用户需要什么服务,调用对应接口即可
- 其包含交换机声明/删除,队列声明/删除,绑定/解绑,消息发布/确认,订阅/解除订阅等服务。
-
连接模块:
- 这是唯一能体现出网络通信概念的一个模块了,它向用户提供的功能就是用于打开/关闭信道。
-
异步线程模块:
-
虽然客户端部分,并不对外体现网络通信的概念,但是本质上内部还是包含有网络通信的,因此既然有网络通信,那么就必须包含有一个网络通信 IO 事件监控线程模块,用于进行客户端连接的 IO 事件监控,以便于在事件出发后进行 IO操作。
-
其次,在客户端部分存在一个情况就是,当一个信道作为消费者而存在的时候,服务端会向信道推送消息,而用户这边需要对收到的消息进行不同的业务处理,而这个消息的处理需要一个异步的工作线程池来完成。
-
因此异步线程模块包含两个部分:
- 客户端连接的 IO 事件监控线程
- 推送过来的消息异步处理线程
-
-
基于以上模块实现一个客户端的流程也就比较简单了:
- 实例化异步线程对象
- 实例化连接对象
- 通过连接对象,创建信道
- 根据信道获取自己所需服务
- 关闭信道
- 关闭连接
8.1 订阅者模块
与服务端,并无太大差别,客户端这边虽然订阅者的存在感微弱了很多,但是还是有的,当进行队列消息订阅的时候,会伴随着一个订阅者对象的创建,而这个订阅者对象有以下几个作用:
- 描述当前信道订阅了哪个队列的消息
- 描述了收到消息后该如何对这条消息进行处理
- 描述收到消息后是否需要进行确认回复
-
订阅者消息:
a. 订阅者标识
b. 订阅队列名
c. 是否自动确认标志
d. 回调处理函数(收到消息后该如何处理的回调函数对象)
#ifndef __M_CONSUMER_H__
#define __M_CONSUMER_H__// 引入必要的头文件
#include "../mqcommon/mq_logger.hpp" // 消息队列日志工具
#include "../mqcommon/mq_helper.hpp" // 消息队列辅助工具
#include "../mqcommon/mq_msg.pb.h" // Protobuf消息定义
#include <iostream> // 标准输入输出
#include <unordered_map> // 哈希表容器
#include <mutex> // 互斥锁
#include <memory> // 智能指针
#include <vector> // 动态数组容器
#include <functional> // 函数对象// 定义bitmq命名空间
namespace bitmq {// 定义消费者回调函数类型// 参数说明:// 1. const stdstring: 消息内容// 2. const BasicProperties*: 消息属性指针// 3. const stdstring: 附加参数using ConsumerCallback = std::function<void(const std::string, const BasicProperties *bp, const std::string)>;// 消费者结构体定义struct Consumer {using ptr = std::shared_ptr<Consumer>; // 定义智能指针别名std::string tag; // 消费者唯一标识符std::string qname; // 消费者订阅的队列名称bool auto_ack; // 自动确认标志,true表示自动确认消息ConsumerCallback callback; // 消息处理回调函数// 默认构造函数Consumer(){DLOG("new Consumer: %p", this); // 记录消费者对象创建日志}// 带参数的构造函数// 参数说明:// 1. const std::string &ctag: 消费者标签// 2. const std::string &queue_name: 队列名称// 3. bool ack_flag: 自动确认标志// 4. const ConsumerCallback &cb: 回调函数Consumer(const std::string &ctag, const std::string &queue_name, bool ack_flag, const ConsumerCallback &cb):tag(ctag), // 初始化消费者标签qname(queue_name), // 初始化队列名称auto_ack(ack_flag), // 初始化自动确认标志callback(std::move(cb)) // 移动语义初始化回调函数{DLOG("new Consumer: %p", this); // 记录消费者对象创建日志}// 析构函数~Consumer() {DLOG("del Consumer: %p", this); // 记录消费者对象销毁日志}};
}#endif
8.2 信道管理模块
同样的,客户端也有信道,其功能与服务端几乎一致,或者说不管是客户端的channel
还是服务端的 channel
都是为了用户提供具体服务而存在的,只不过服务端是为客户端的对应请求提供服务,而客户端的接口服务是为了用户具体需要服务,也可以理解是用户通过客户端 channel
的接口调用来向服务端发送对应请求,获取请求的服务。
-
信道信息:
a. 信道 ID
b. 信道关联的网络通信连接对象
c. protobuf 协议处理对象
d. 信道关联的消费者
e. 请求对应的响应信息队列(这里队列使用
<请求 ID,响应>
hash 表,以便于查找指定的响应)f. 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要等到响应才能继续,但是
muduo
库的通信是异步的,因此需要我们自己在收到响应后,通过判断是否是等待的指定响应来进行同步) -
信道操作:
a. 提供创建信道操作
b. 提供删除信道操作
c. 提供声明交换机操作(强断言-有则 OK,没有则创建)
d. 提供删除交换机
e. 提供创建队列操作(强断言-有则 OK,没有则创建)
f. 提供删除队列操作
g. 提供交换机-队列绑定操作
h. 提供交换机-队列解除绑定操作
i. 提供添加订阅操作
j. 提供取消订阅操作
k. 提供发布消息操作
l. 提供确认消息操作
-
信道管理:
a. 创建信道
b. 查询信道
c. 删除信道
#ifndef __M_CHANNEL_H__
#define __M_CHANNEL_H__// 引入必要的头文件
#include "muduo/net/TcpConnection.h" // muduo网络库的TCP连接
#include "muduo/proto/codec.h" // 协议编解码器
#include "muduo/proto/dispatcher.h" // 协议分发器
#include "../mqcommon/mq_logger.hpp" // MQ日志工具
#include "../mqcommon/mq_helper.hpp" // MQ辅助工具
#include "../mqcommon/mq_msg.pb.h" // MQ消息协议
#include "../mqcommon/mq_proto.pb.h" // MQ基础协议
#include "mq_consumer.hpp" // MQ消费者
#include <iostream> // 标准输入输出
#include <mutex> // 互斥锁
#include <condition_variable> // 条件变量
#include <unordered_map> // 哈希表namespace bitmq {// 定义常用类型别名typedef std::shared_ptr<google::protobuf::Message> MessagePtr;using ProtobufCodecPtr = std::shared_ptr<ProtobufCodec>;using basicConsumeResponsePtr = std::shared_ptr<basicConsumeResponse>;using basicCommonResponsePtr = std::shared_ptr<basicCommonResponse>;// Channel类 - 表示MQ的信道,负责与服务器通信class Channel {public:using ptr = std::shared_ptr<Channel>; // 智能指针类型别名// 构造函数Channel(const muduo::net::TcpConnectionPtr& conn, const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()), // 生成唯一信道ID_conn(conn), // 保存TCP连接_codec(codec) {} // 保存协议编解码器// 析构函数 - 自动取消消费者订阅~Channel() { basicCancel(); }// 获取信道IDstd::string cid() { return _cid; }// 打开信道bool openChannel() {std::string rid = UUIDHelper::uuid(); // 生成请求IDopenChannelRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道ID_codec->send(_conn, req); // 发送请求basicCommonResponsePtr resp = waitResponse(rid); // 等待响应return resp->ok(); // 返回操作是否成功}// 关闭信道void closeChannel() {std::string rid = UUIDHelper::uuid(); // 生成请求IDcloseChannelRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道ID_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return ;}// 声明交换机bool declareExchange(const std::string &name, // 交换机名称ExchangeType type, // 交换机类型bool durable, // 是否持久化bool auto_delete, // 是否自动删除google::protobuf::Map<std::string, std::string> &args) { // 额外参数std::string rid = UUIDHelper::uuid(); // 生成请求IDdeclareExchangeRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_exchange_name(name); // 设置交换机名称req.set_exchange_type(type); // 设置交换机类型req.set_durable(durable); // 设置持久化标志req.set_auto_delete(auto_delete); // 设置自动删除标志req.mutable_args()->swap(args); // 设置额外参数_codec->send(_conn, req); // 发送请求basicCommonResponsePtr resp = waitResponse(rid); // 等待响应return resp->ok(); // 返回操作是否成功}// 删除交换机void deleteExchange(const std::string &name) {std::string rid = UUIDHelper::uuid(); // 生成请求IDdeleteExchangeRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_exchange_name(name); // 设置交换机名称_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return ;}// 声明队列bool declareQueue(const std::string &qname, // 队列名称bool qdurable, // 是否持久化bool qexclusive, // 是否排他bool qauto_delete, // 是否自动删除google::protobuf::Map<std::string, std::string> &qargs) { // 额外参数std::string rid = UUIDHelper::uuid(); // 生成请求IDdeclareQueueRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_queue_name(qname); // 设置队列名称req.set_durable(qdurable); // 设置持久化标志req.set_auto_delete(qauto_delete); // 设置自动删除标志req.set_exclusive(qexclusive); // 设置排他标志req.mutable_args()->swap(qargs); // 设置额外参数_codec->send(_conn, req); // 发送请求basicCommonResponsePtr resp = waitResponse(rid); // 等待响应return resp->ok(); // 返回操作是否成功}// 删除队列void deleteQueue(const std::string &qname) {std::string rid = UUIDHelper::uuid(); // 生成请求IDdeleteQueueRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_queue_name(qname); // 设置队列名称_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return ;}// 绑定队列到交换机bool queueBind(const std::string &ename, // 交换机名称const std::string &qname, // 队列名称const std::string &key) { // 绑定键std::string rid = UUIDHelper::uuid(); // 生成请求IDqueueBindRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_exchange_name(ename); // 设置交换机名称req.set_queue_name(qname); // 设置队列名称req.set_binding_key(key); // 设置绑定键_codec->send(_conn, req); // 发送请求basicCommonResponsePtr resp = waitResponse(rid); // 等待响应return resp->ok(); // 返回操作是否成功}// 解绑队列与交换机void queueUnBind(const std::string &ename, const std::string &qname) {std::string rid = UUIDHelper::uuid(); // 生成请求IDqueueUnBindRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_exchange_name(ename); // 设置交换机名称req.set_queue_name(qname); // 设置队列名称_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return ;}// 发布消息void basicPublish(const std::string &ename, // 交换机名称const BasicProperties *bp, // 消息属性const std::string &body) { // 消息体std::string rid = UUIDHelper::uuid(); // 生成请求IDbasicPublishRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_body(body); // 设置消息体req.set_exchange_name(ename); // 设置交换机名称// 设置消息属性(如果存在)if (bp != nullptr) {req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return ;}// 消息确认void basicAck(const std::string &msgid) {if (_consumer.get() == nullptr) { // 检查是否有消费者DLOG("消息确认时,找不到消费者信息!");return ;}std::string rid = UUIDHelper::uuid(); // 生成请求IDbasicAckRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_queue_name(_consumer->qname); // 设置队列名称req.set_message_id(msgid); // 设置消息ID_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应return;}// 取消消费者订阅void basicCancel() {if (_consumer.get() == nullptr) { // 检查是否有消费者return ;}std::string rid = UUIDHelper::uuid(); // 生成请求IDbasicCancelRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_queue_name(_consumer->qname); // 设置队列名称req.set_consumer_tag(_consumer->tag); // 设置消费者标签_codec->send(_conn, req); // 发送请求waitResponse(rid); // 等待响应_consumer.reset(); // 重置消费者指针return;}// 订阅消息bool basicConsume(const std::string &consumer_tag, // 消费者标签const std::string &queue_name, // 队列名称bool auto_ack, // 是否自动确认const ConsumerCallback &cb) { // 消息回调函数if (_consumer.get() != nullptr) { // 检查是否已有消费者DLOG("当前信道已订阅其他队列消息!");return false;}std::string rid = UUIDHelper::uuid(); // 生成请求IDbasicConsumeRequest req; // 创建请求对象req.set_rid(rid); // 设置请求IDreq.set_cid(_cid); // 设置信道IDreq.set_queue_name(queue_name); // 设置队列名称req.set_consumer_tag(consumer_tag); // 设置消费者标签req.set_auto_ack(auto_ack); // 设置自动确认标志_codec->send(_conn, req); // 发送请求basicCommonResponsePtr resp = waitResponse(rid); // 等待响应if (resp->ok() == false) { // 检查操作是否成功DLOG("添加订阅失败!");return false;}// 创建消费者对象_consumer = std::make_shared<Consumer>(consumer_tag, queue_name, auto_ack, cb);return true;}public: // 添加基础响应到哈希表void putBasicResponse(const basicCommonResponsePtr& resp) {std::unique_lock<std::mutex> lock(_mutex); // 加锁_basic_resp.insert(std::make_pair(resp->rid(), resp)); // 插入响应_cv.notify_all(); // 通知所有等待线程}// 消费消息(由消费者回调处理)void consume(const basicConsumeResponsePtr& resp) {if (_consumer.get() == nullptr) { // 检查是否有消费者DLOG("消息处理时,未找到订阅者信息!");return;}if (_consumer->tag != resp->consumer_tag()) { // 检查消费者标签DLOG("收到的推送消息中的消费者标识,与当前信道消费者标识不一致!");return ;}// 调用消费者回调函数处理消息_consumer->callback(resp->consumer_tag(), resp->mutable_properties(), resp->body());}private:// 等待响应basicCommonResponsePtr waitResponse(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex); // 加锁// 等待直到收到指定rid的响应_cv.wait(lock, [&rid, this](){return _basic_resp.find(rid) != _basic_resp.end();});basicCommonResponsePtr basic_resp = _basic_resp[rid]; // 获取响应_basic_resp.erase(rid); // 从哈希表中移除return basic_resp; // 返回响应}private:std::string _cid; // 信道IDmuduo::net::TcpConnectionPtr _conn; // TCP连接ProtobufCodecPtr _codec; // 协议编解码器Consumer::ptr _consumer; // 消费者对象std::mutex _mutex; // 互斥锁std::condition_variable _cv; // 条件变量std::unordered_map<std::string, basicCommonResponsePtr> _basic_resp; // 响应哈希表};// ChannelManager类 - 管理多个Channelclass ChannelManager {public:using ptr = std::shared_ptr<ChannelManager>; // 智能指针类型别名ChannelManager(){} // 构造函数// 创建ChannelChannel::ptr create(const muduo::net::TcpConnectionPtr &conn, const ProtobufCodecPtr &codec) {std::unique_lock<std::mutex> lock(_mutex); // 加锁auto channel = std::make_shared<Channel>(conn, codec); // 创建Channel_channels.insert(std::make_pair(channel->cid(), channel)); // 添加到哈希表return channel;}// 移除Channelvoid remove(const std::string &cid) {std::unique_lock<std::mutex> lock(_mutex); // 加锁_channels.erase(cid); // 从哈希表中移除}// 获取ChannelChannel::ptr get(const std::string &cid) {std::unique_lock<std::mutex> lock(_mutex); // 加锁auto it = _channels.find(cid); // 查找Channelif (it == _channels.end()) { // 如果没找到return Channel::ptr(); // 返回空指针}return it->second; // 返回找到的Channel}private:std::mutex _mutex; // 互斥锁std::unordered_map<std::string, Channel::ptr> _channels; // Channel哈希表};
}#endif
8.3 异步工作线程实现
客户端这边存在两个异步工作线程:
- 一个是
muduo
库中客户端连接的异步循环线程EventLoopThread
- 一个是当收到消息后进行异步处理的工作线程池。
这两项都不是以连接为单元进行创建的,而是创建后,可以用以多个连接中,因此单独进行封装。
#ifndef __M_WORKER_H__
#define __M_WORKER_H__
#include "muduo/net/EventLoopThread.h"
#include "../mqcommon/mq_logger.hpp"
#include "../mqcommon/mq_helper.hpp"
#include "../mqcommon/mq_threadpool.hpp"namespace bitmq {class AsyncWorker {public:using ptr = std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;threadpool pool;};
}#endif
8.4 连接管理模块
在客户端这边,RabbitMQ
弱化了客户端的概念,因为用户所需的服务都是通过信道来提供的,因此操作思想转换为先创建连接,通过连接创建信道,通过信道提供服务这一流程。
这个模块同样是针对 muduo
库客户端连接的二次封装,向用户提供创建 channel
信道的接口,创建信道后,可以通过信道来获取指定服务。
#ifndef __M_CONNECTION_H__
#define __M_CONNECTION_H__
#include "muduo/proto/dispatcher.h"
#include "muduo/proto/codec.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpClient.h"
#include "muduo/net/EventLoopThread.h"
#include "muduo/base/CountDownLatch.h"#include "mq_channel.hpp"
#include "mq_worker.hpp"namespace bitmq {
/*** @class Connection* @brief 表示与消息队列服务器的连接,负责管理TCP连接、消息编解码和信道管理*/
class Connection {public:using ptr = std::shared_ptr<Connection>; // 智能指针别名/*** @brief 构造函数,初始化连接* @param sip 服务器IP地址* @param sport 服务器端口号* @param worker 异步工作线程对象*/Connection(const std::string &sip, int sport, const AsyncWorker::ptr &worker):_latch(1), // 初始化计数器为1,用于同步等待连接建立_client(worker->loopthread.startLoop(), // 使用工作线程的事件循环muduo::net::InetAddress(sip, sport), // 服务器地址"Client"), // 客户端名称_dispatcher(std::bind(&Connection::onUnknownMessage, this, // 未知消息回调std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>( // 创建协议编解码器std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, // 消息处理回调std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker), // 保存工作线程对象_channel_manager(std::make_shared<ChannelManager>()) { // 创建信道管理器// 注册基础响应消息回调_dispatcher.registerMessageCallback<basicCommonResponse>(std::bind(&Connection::basicResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 注册消费响应消息回调_dispatcher.registerMessageCallback<basicConsumeResponse>(std::bind(&Connection::consumeResponse, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置客户端消息回调_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));// 设置客户端连接状态回调_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1)); // 发起连接_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}/*** @brief 打开一个新的信道* @return 返回信道指针,失败返回空指针*/Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn, _codec); // 创建信道bool ret = channel->openChannel(); // 打开信道if (ret == false) {DLOG("打开信道失败!");return Channel::ptr(); // 返回空指针}return channel;}/*** @brief 关闭指定信道* @param channel 要关闭的信道指针*/void closeChannel(const Channel::ptr &channel) {channel->closeChannel(); // 关闭信道_channel_manager->remove(channel->cid()); // 从管理器中移除}private:/*** @brief 处理基础响应消息* @param conn TCP连接指针* @param message 响应消息指针* @param 时间戳*/void basicResponse(const muduo::net::TcpConnectionPtr& conn, const basicCommonResponsePtr& message, muduo::Timestamp) {// 1. 根据信道ID查找信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}// 2. 将响应消息存入信道的响应映射表中channel->putBasicResponse(message);}/*** @brief 处理消费响应消息* @param conn TCP连接指针* @param message 消费响应消息指针* @param 时间戳*/void consumeResponse(const muduo::net::TcpConnectionPtr& conn, const basicConsumeResponsePtr& message, muduo::Timestamp) {// 1. 根据信道ID查找信道Channel::ptr channel = _channel_manager->get(message->cid());if (channel.get() == nullptr) {DLOG("未找到信道信息!");return;}// 2. 将消息处理任务提交到线程池_worker->pool.push([channel, message]() {channel->consume(message); // 在子线程中处理消费消息});}/*** @brief 处理未知消息* @param conn TCP连接指针* @param message 消息指针* @param 时间戳*/void onUnknownMessage(const muduo::net::TcpConnectionPtr& conn, const MessagePtr& message, muduo::Timestamp) {LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown(); // 遇到未知消息关闭连接}/*** @brief 处理连接状态变化* @param conn TCP连接指针*/void onConnection(const muduo::net::TcpConnectionPtr& conn) {if (conn->connected()) {_latch.countDown(); // 连接建立,唤醒主线程_conn = conn; // 保存连接指针} else {// 连接关闭时的操作_conn.reset(); // 重置连接指针}}private:muduo::CountDownLatch _latch; // 同步计数器,用于等待连接建立muduo::net::TcpConnectionPtr _conn; // 客户端对应的TCP连接muduo::net::TcpClient _client; // TCP客户端对象ProtobufDispatcher _dispatcher; // Protobuf消息分发器ProtobufCodecPtr _codec; // Protobuf编解码器指针AsyncWorker::ptr _worker; // 异步工作线程对象ChannelManager::ptr _channel_manager; // 信道管理器
};
}#endif
9. 案例:基于MQ的生产者消费者模型
9.1 生产者客户端的实现
#include "mq_connection.hpp"int main()
{//1. 实例化异步工作线程对象bitmq::AsyncWorker::ptr awp = std::make_shared<bitmq::AsyncWorker>();//2. 实例化连接对象bitmq::Connection::ptr conn = std::make_shared<bitmq::Connection>("127.0.0.1", 8085, awp);//3. 通过连接创建信道bitmq::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需// 1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Map<std::string, std::string> tmp_map;channel->declareExchange("exchange1", bitmq::ExchangeType::TOPIC, true, false, tmp_map);// 2. 声明一个队列queue1channel->declareQueue("queue1", true, false, false, tmp_map);// 3. 声明一个队列queue2channel->declareQueue("queue2", true, false, false, tmp_map);// 4. 绑定queue1-exchange1,且binding_key设置为queue1channel->queueBind("exchange1", "queue1", "queue1");// 5. 绑定queue2-exchange1,且binding_key设置为news.music.#channel->queueBind("exchange1", "queue2", "news.music.#");//5. 循环向交换机发布消息for (int i = 0; i < 10; i++) {bitmq::BasicProperties bp;bp.set_id(bitmq::UUIDHelper::uuid());bp.set_delivery_mode(bitmq::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");channel->basicPublish("exchange1", &bp, "Hello World-" + std::to_string(i));}bitmq::BasicProperties bp;bp.set_id(bitmq::UUIDHelper::uuid());bp.set_delivery_mode(bitmq::DeliveryMode::DURABLE);bp.set_routing_key("news.music.sport");channel->basicPublish("exchange1", &bp, "Hello Bite");bp.set_routing_key("news.sport");channel->basicPublish("exchange1", &bp, "Hello chileme?");//6. 关闭信道conn->closeChannel(channel);return 0;
}
9.2 消费者客户端的实现
#include "mq_connection.hpp"void cb(bitmq::Channel::ptr &channel, const std::string consumer_tag, const bitmq::BasicProperties *bp, const std::string &body)
{std::cout << consumer_tag << "消费了消息:" << body << std::endl;channel->basicAck(bp->id());
}
int main(int argc, char *argv[])
{if (argc != 2) {std::cout << "usage: ./consume_client queue1\n";return -1;}//1. 实例化异步工作线程对象bitmq::AsyncWorker::ptr awp = std::make_shared<bitmq::AsyncWorker>();//2. 实例化连接对象bitmq::Connection::ptr conn = std::make_shared<bitmq::Connection>("127.0.0.1", 8085, awp);//3. 通过连接创建信道bitmq::Channel::ptr channel = conn->openChannel();//4. 通过信道提供的服务完成所需// 1. 声明一个交换机exchange1, 交换机类型为广播模式google::protobuf::Map<std::string, std::string> tmp_map;channel->declareExchange("exchange1", bitmq::ExchangeType::TOPIC, true, false, tmp_map);// 2. 声明一个队列queue1channel->declareQueue("queue1", true, false, false, tmp_map);// 3. 声明一个队列queue2channel->declareQueue("queue2", true, false, false, tmp_map);// 4. 绑定queue1-exchange1,且binding_key设置为queue1channel->queueBind("exchange1", "queue1", "queue1");// 5. 绑定queue2-exchange1,且binding_key设置为news.music.#channel->queueBind("exchange1", "queue2", "news.music.#");auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1", argv[1], false, functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));conn->closeChannel(channel);return 0;
}
10.项目总结
首先明确我们所实现的项目:仿 RabbitMQ
实现一个简化版的消息队列组件,其内部实现了消息队列服务器以及客户端的搭建,并支持不同主机间消息的发布与订阅及消息推送功能。
其次项目中所用到的技术:基于 muduo
库实现底层网络通信服务器和客户端的搭建,在应用层基于 protobuf
协议设计应用层协议接口,在数据管理上使用了轻量数据库sqlite
来进行数据的持久化管理,以及基于 AMQP 模型的理解,实现整个消息队列项目技术的整合,并在项目的实现过程中使用 gtest
框架进行单元测试,完成项目的最终实现。