当前位置: 首页 > news >正文

C++ - 仿 RabbitMQ 实现消息队列--服务端核心模块实现(二)

目录

交换机数据管理

交换机数据类

交换机数据持久化类

交换机数据管理类

测试

 


交换机数据管理

  • 定义交换机数据类
  1. 交换机名称
  2. 交换机类型
  3. 是否持久化标志
  4. 是否自动删除标志
  5. 其他参数
  •  定义交换机数据持久化类(数据持久化的 sqlite3 数据库中)
  1. 创建/删除交换机数据表
  2. 新增交换机数据
  3. 移除交换机数据
  4. 查询所有交换机数据
  5. 查询指定交换机数据(根据名称)
  • 定义交换机数据管理类
  1. 声明交换机,并添加管理(存在则 OK,不存在则创建)
  2. 删除交换机
  3. 获取指定交换机
  4. 销毁所有交换机数据

交换机数据类

    struct Exchange{using ptr = std::shared_ptr<Exchange>;// 交换机名称std::string name;// 交换机类型ExchangeType type;// 持久化标志bool durable;// 自动删除标志bool auto_delete;// 其他参数std::unordered_map<std::string, std::string> args;Exchange() {}Exchange(const std::string &ename,ExchangeType etype,bool edurable,bool eauto_delete,const std::unordered_map<std::string, std::string> &eargs): name(ename), type(etype), durable(edurable), auto_delete(eauto_delete), args(eargs){}void setArgs(const std::string &str_args){// key=val&key=val.....std::vector<std::string> sub_args;StrHelper::split(str_args, "&", sub_args);for (auto &arg : sub_args){size_t pos = arg.find("=");std::string key = arg.substr(0, pos);std::string val = arg.substr(pos + 1);args.insert(std::make_pair(key, val));}}std::string getArgs(){if (args.empty())return "";std::string result;for (auto &arg : args){result += arg.first + "=" + arg.second + "&";}result.pop_back();return result;}};

需要说明的有:

  • 对于其他参数,我们用一个unordered_map容器存储,与相应的字符串转化的格式是:key=val&key=val&key=val....
  • 在getArgs函数中必须添加对args的判空操作,否则,args为空时,result会为空,调用pop_back函数会发生段错误。

交换机数据持久化类

    class ExchangeMapper{public:ExchangeMapper(const std::string &dbfile) : _sql_helper(dbfile){std::string path = FileHelper::parentDirectory(dbfile);FileHelper::createDirectory(path);assert(_sql_helper.open());createTable();}void createTable(){
#define CREATE_TABLE "create table if not exists exchange_table(\name varchar(32) primary key, \type int, \durable int, \auto_delete int, \args varchar(128));"bool ret = _sql_helper.exec(CREATE_TABLE, nullptr, nullptr);if (!ret){ERROR("创建交换机数据库表失败");abort();}}void removeTable(){
#define DROP_TABLE "drop table if exists exchange_table;"bool ret = _sql_helper.exec(DROP_TABLE, nullptr, nullptr);if (!ret){ERROR("删除交换机数据库表失败");abort();}}void insert(Exchange::ptr &exchange){std::stringstream ss;ss << "insert into exchange_table values('"<< exchange->name << "',"<< exchange->type << ","<< exchange->durable << ","<< exchange->auto_delete << ","<< "'" << exchange->getArgs() << "');";bool ret = _sql_helper.exec(ss.str(), nullptr, nullptr);if (!ret)return;}void remove(const std::string &name){std::stringstream ss;ss << "delete from exchange_table where name = "<< "'" << name << "';";_sql_helper.exec(ss.str(), nullptr, nullptr);}using ExchangMap = std::unordered_map<std::string, Exchange::ptr>;ExchangMap recovery(){ExchangMap result;std::string sql = "select name, type, durable, auto_delete, args from exchange_table;";_sql_helper.exec(sql, selectCallback, &result);return result;}private:static int selectCallback(void *arg, int numcol, char **row, char **fields){ExchangMap *result = (ExchangMap *)arg;auto exp = std::make_shared<Exchange>();exp->name = row[0];exp->type = (ExchangeType)std::stoi(row[1]);exp->durable = (bool)std::stoi(row[2]);exp->auto_delete = (bool)std::stoi(row[3]);if (row[4])exp->setArgs(row[4]);result->insert(std::make_pair(exp->name, exp));return 0;}private:SqliteHelper _sql_helper;};

说明:

  • 在构造对象时就把相应的数据库打开,如果没有就创建,但要注意一定要先创建好数据库所在目录,在创建表(如果不存在)。
  • 前四个函数就是对sql语言的一个使用。
  • recovery方法中我们将查询出来的结果以交换机名称为键值,存储在一个unordered_map容器中,以拿到所有交换机。

交换机数据管理类

    class ExchangeManager{public:using ptr = std::shared_ptr<ExchangeManager>;ExchangeManager(const std::string &dbfile) : _mapper(dbfile){_exchanges = _mapper.recovery();}// 声明交换机void declareExchange(const std::string &name,ExchangeType type,bool durable,bool auto_delete,std::unordered_map<std::string, std::string> &args){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it != _exchanges.end())return;// 新增交换机auto exp = std::make_shared<Exchange>(name, type, durable, auto_delete, args);_exchanges.insert(std::make_pair(name, exp));if (durable)_mapper.insert(exp);}// 删除交换机void deleteExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return;if (_exchanges[name]->durable)_mapper.remove(name);_exchanges.erase(name);}// 获取指定交换机Exchange::ptr selectExchange(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return nullptr;return it->second;}// 判断交换机是否存在bool exists(const std::string &name){std::unique_lock<std::mutex> lock(_mutex);// 先判断是否存在auto it = _exchanges.find(name);if (it == _exchanges.end())return false;return true;}size_t size(){std::unique_lock<std::mutex> lock(_mutex);return _exchanges.size();}// 清理所有交换机数据void clear(){std::unique_lock<std::mutex> lock(_mutex);_mapper.removeTable();_exchanges.clear();}private:std::mutex _mutex;ExchangeMapper _mapper;std::unordered_map<std::string, Exchange::ptr> _exchanges;};

说明:

  • 此类是真正要让用户使用的类,包含有几个成员属性:
    • 互斥锁:支持多线程同时访问。
    • ExchangeMapper对象:对数据库进行操作,获取所有交换机。
    • _exchanges:存储所有交换机,方便用户获取。
  • 在构造时就调用_mapper的recovery方法拿到所有交换机。
  • 所有方法对于判断交换机是否存在的操作必须独自进行,不能复用exists方法,否则会导致死锁。

测试

 

#include "../mqserver/exchange.hpp"
#include <gtest/gtest.h>jiuqi::ExchangeManager::ptr emp;class ExchangeTest : public testing::Environment
{
public:virtual void SetUp() override{emp = std::make_shared<jiuqi::ExchangeManager>("./data/meta.db");}virtual void TearDown() override{// emp->clear();}
};TEST(ExchangeTest, insert_test)
{std::unordered_map<std::string, std::string> map = {{"k1", "v1"}, {"k2", "v2"}, {"k3", "v3"}};std::unordered_map<std::string, std::string> map_empty;emp->declareExchange("exchange1", jiuqi::ExchangeType::DIRECT, true, false, map);emp->declareExchange("exchange2", jiuqi::ExchangeType::TOPIC, true, false, map);emp->declareExchange("exchange3", jiuqi::ExchangeType::FANOUT, true, false, map);emp->declareExchange("exchange4", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange5", jiuqi::ExchangeType::FANOUT, true, false, map_empty);emp->declareExchange("exchange6", jiuqi::ExchangeType::FANOUT, true, false, map_empty);ASSERT_EQ(emp->size(), 6);
}TEST(ExchangeTest, select_test)
{jiuqi::Exchange::ptr exp = emp->selectExchange("exchange3");ASSERT_EQ(exp->name, "exchange3");ASSERT_EQ(exp->type, jiuqi::ExchangeType::FANOUT);ASSERT_EQ(exp->durable, true);ASSERT_EQ(exp->auto_delete, false);ASSERT_EQ(exp->getArgs(), std::string("k1=v1&k2=v2&k3=v3"));
}TEST(ExchangeTest, delete_test)
{emp->deleteExchange("exchange1");jiuqi::Exchange::ptr exp = emp->selectExchange("exchange1");ASSERT_EQ(exp.get(), nullptr);ASSERT_EQ(emp->exists("exchange1"), false);
}int main(int argc, char *argv[])
{testing::InitGoogleTest(&argc, argv);testing::AddGlobalTestEnvironment(new ExchangeTest);return RUN_ALL_TESTS();
}

        select测试可能会失败,原因是在交换机中我们使用unordered_map存储其他参数,由于顺序随机,所有使用getArgs获取的字符串中键值对的顺序也是随机的,但是不影响功能。

http://www.lryc.cn/news/594725.html

相关文章:

  • 学习秒杀系统-异步下单(包含RabbitMQ基础知识)
  • ASP.NET Core Web API 中集成 DeveloperSharp.RabbitMQ
  • 关于校准 ARM 开发板时间的步骤和常见问题:我应该是RTC电池没电了才导致我设置了重启开发板又变回去2025年的时间
  • Android NDK ffmpeg 音视频开发实战
  • 什么是“差分“?
  • 包装类简单了解泛型
  • 图片转 PDF三个免费方法总结
  • 支持不限制大小,大文件分段批量上传功能(不受nginx /apache 上传大小限制)
  • 网络设备功能对照表
  • 【Spark征服之路-3.6-Spark-SQL核心编程(五)】
  • Linux 文件操作详解:结构、系统调用、权限与实践
  • 第二阶段-第二章—8天Python从入门到精通【itheima】-134节(SQL——DQL——分组聚合)
  • leetcode-sql-627变更性别
  • 深入解析IP协议:组成、地址管理与路由选择
  • Tomato靶机通关教程
  • 安装docker可视化工具 Portainer中文版(ubuntu上演示,所有docker通用) 支持控制各种容器,容器操作简单化 降低容器门槛
  • 板凳-------Mysql cookbook学习 (十二--------4)
  • 技能学习PostgreSQL中级专家
  • 借助AI学习开源代码git0.7之六write-cache
  • 基于 STM32 的数字闹钟系统 Proteus 仿真设计与实现
  • 从一开始的网络攻防(六):php反序列化
  • 金仓数据库:融合进化,智领未来——2025年数据库技术革命的深度解析
  • STM32 USB键盘实现指南
  • 最严电动自行车新规,即将实施!
  • FreeSwitch通过Websocket(流式双向语音)对接AI实时语音大模型技术方案(mod_ppy_aduio_stream)
  • 朝歌智慧盘古信息:以IMS MOM V6重构国产化智能终端新生态
  • 【初识数据结构】CS61B中的最小生成树问题
  • Car Kit重构车机开发体验,让车载应用开发驶入快车道
  • 【PTA数据结构 | C语言版】拓扑排序
  • OR条件拆分:避免索引失效的查询重构技巧