当前位置: 首页 > news >正文

【RabbitMQ 项目】服务端:数据管理模块之消息队列管理

文章目录

  • 一.编写思路
  • 二.代码实践

一.编写思路

  1. 定义消息队列
    1. 名字
    2. 是否持久化
  2. 定义队列持久化类(持久化到 sqlite3)
    1. 构造函数(只能成功,不能失败)
      1. 如果数据库(文件)不存在则创建
      2. 打开数据库
      3. 打开 msg_queue_table 数据库表
    2. 插入队列
    3. 移除队列
    4. 将数据库中的队列恢复到内存中
      传入一个哈希表,key 为名字,value 为队列的智能指针,填充该哈希表
  3. 定义队列管理类(包含内存管理和持久化管理)
    1. 构造函数:从数据库中恢复队列
    2. 声明队列
    3. 移除队列
    4. 获取队列

二.代码实践

MsgQueue.hpp:

#pragma once
#include "../common/Log.hpp"
#include "../common/Util.hpp"
#include "../common/Util.hpp"
#include <memory>
#include <unordered_map>
#include <mutex>
namespace ns_data
{class MsgQueue;using MsgQueuePtr = std::shared_ptr<MsgQueue>;/************* 定义消息队列* ****************/struct MsgQueue{std::string _name;bool _isDurable;MsgQueue(const std::string &name, bool isDurable): _name(name),_isDurable(isDurable){}};/****************** 定义消息队列持久化类* ******************/class MsgQueueMapper{private:ns_util::Sqlite3Util _sqlite;public:MsgQueueMapper(const std::string &dbName): _sqlite(dbName){// 确保数据库文件已经存在,不存在就创建if (!ns_util::FileUtil::createFile(dbName)){LOG(FATAL) << "create database " << dbName << " fail" << endl;exit(1);}if (!_sqlite.open()){LOG(FATAL) << "open database " << dbName << " fail" << endl;exit(1);}createTable();}/************** 插入消息队列* *************/bool insertMsgQueue(MsgQueuePtr msgQueuePtr){char insertSql[1024];sprintf(insertSql, "insert into msg_queue_table values('%s', '%d');",msgQueuePtr->_name.c_str(), msgQueuePtr->_isDurable);if (!_sqlite.exec(insertSql, nullptr, nullptr)){LOG(WARNING) << "insert MsgQueue fail, MsgQueue: " << msgQueuePtr->_name << endl;return false;}return true;}/*********** 移除消息队列* ***************/void removeMsgQueue(const std::string &name){char deleteSql[1024];sprintf(deleteSql, "delete from msg_queue_table where name='%s';", name.c_str());if (!_sqlite.exec(deleteSql, nullptr, nullptr)){LOG(WARNING) << "remove MsgQueue fail, MsgQueue: " << name << endl;}}/************ 从数据库中恢复消息队列到内存* *****************/void recoverMsgQueue(std::unordered_map<std::string, MsgQueuePtr> *mapPtr){const std::string selectSql = "select * from msg_queue_table;";if (!_sqlite.exec(selectSql.c_str(), selectCallback, mapPtr)){LOG(FATAL) << "recover MsgQueue from msg_queue_table fail" << endl;exit(1);}}/*************** 删除数据库表(仅调试)* ***************/void removeTable(){const std::string dropSql = "drop table if exists msg_queue_table;";if (_sqlite.exec(dropSql.c_str(), nullptr, nullptr)){LOG(WARNING) << "remove table msg_queue_table fail" << endl;}}private:void createTable(){const std::string createSql = "create table if not exists msg_queue_table(\name varchar(32) primary key,\durable int\);";if (!_sqlite.exec(createSql.c_str(), nullptr, nullptr)){LOG(FATAL) << "create table msg_queue_table fail" << endl;exit(1);}}static int selectCallback(void *arg, int colNum, char **line, char **fields){auto mapPtr = static_cast<std::unordered_map<std::string, MsgQueuePtr> *>(arg);std::string name = line[0];bool isDurable = std::stoi(line[1]);auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);mapPtr->insert({name, msgQueuePtr});return 0;}};class MsgQueueManager{private:MsgQueueMapper _mapper;std::unordered_map<std::string, MsgQueuePtr> _msgQueues;std::mutex _mtx;public:MsgQueueManager(const std::string &dbName): _mapper(dbName){_mapper.recoverMsgQueue(&_msgQueues);}/************ 声明队列* ************/bool declareMsgQueue(const std::string &name, bool isDurable){std::unique_lock<std::mutex> lck(_mtx);if (_msgQueues.count(name)){return true;}auto msgQueuePtr = std::make_shared<MsgQueue>(name, isDurable);_msgQueues[name] = msgQueuePtr;if (isDurable){return _mapper.insertMsgQueue(msgQueuePtr);}return true;}/*********** 移除队列* ***********/void removeMsgQueue(const std::string &name){std::unique_lock<std::mutex> lck(_mtx);auto it = _msgQueues.find(name);if (it == _msgQueues.end()){return;}if (it->second->_isDurable){_mapper.removeMsgQueue(name);}_msgQueues.erase(name);}/************* 获取指定队列* ***************/MsgQueuePtr getMsgQueue(const std::string &name){std::unique_lock<std::mutex> lck(_mtx);if (_msgQueues.count(name) == 0){return nullptr;}return _msgQueues[name];}/************** 清理所有队列(仅调试)* ******************/void clearMsgQueues(){std::unique_lock<std::mutex> lck(_mtx);_msgQueues.clear();_mapper.removeTable();}};}
http://www.lryc.cn/news/443589.html

相关文章:

  • SDKMAN!软件开发工具包管理器
  • 《使用 LangChain 进行大模型应用开发》学习笔记(四)
  • gbase8s数据库常见的索引扫描方式
  • 边缘智能-大模型架构初探
  • 《python语言程序设计》2018版第8章18题几何circle2D类(上部)
  • nginx upstream转发连接错误情况研究
  • alias 后门从入门到应急响应
  • 【远程调用PythonAPI-flask】
  • [今日Arxiv] 思维迭代:利用内心对话进行自主大型语言模型推理
  • glTF格式:WebGL应用的3D资产优化解决方案
  • Unity3D入门(一) : 第一个Unity3D项目,实现矩形自动旋转,并导出到Android运行
  • 数据结构与算法——Java实现 8.习题——移除链表元素(值)
  • 如何理解MVCC
  • 在 Qt 中使用 QLabel 设置 GIF 动态背景
  • Flyway 数据库差异处理
  • CSS 选择器的分类与使用要点一
  • 无人机集群路径规划:麻雀搜索算法(Sparrow Search Algorithm, SSA)​求解无人机集群路径规划,提供MATLAB代码
  • harbor集成trivy镜像扫描工具
  • DMA学习
  • C语言18--头文件
  • vscode软件在 C发中常用插件
  • 【C++ Primer Plus习题】17.2
  • Vue Props传值
  • FreeSWITCH event_socket 配置从其他地址连接
  • 信息安全数学基础(19)同余式的基本概念及一次同余式
  • 网关过滤器:Spring Cloud Gateway
  • 力扣最热一百题——除自身以外数组的乘积
  • 监控易监测对象及指标之:全面监控SQL Server数据库
  • 计算机视觉的应用34-基于CV领域的人脸关键点特征智能提取的技术方法
  • What is new in .NET 8 and C#12