当前位置: 首页 > news >正文

适配qnx和linux平台的线程管理类封装

概述

封装代码仓库: https://gitee.com/liudegui/my_thread

尝试封装一个基于C++11的多线程控制与调度类,适配QNX和Linux平台,且代码符合Misra标准。它提供了以下主要功能:

  • 线程的创建与销毁管理。
  • 线程的优先级调度。
  • 线程的CPU亲和性设置。
  • 线程的等待与唤醒机制。

类的结构及主要成员函数如下:

  • 构造函数与析构函数:负责初始化线程相关参数,并在析构时确保线程安全退出。
  • thread_start():启动线程,创建指定数量的线程实例。
  • thread_shutdown():关闭线程,释放资源并等待所有线程退出。
  • timed_wait():线程的超时等待机制,防止线程无限等待。
  • set_self_thread_priority():设置当前线程的优先级。

实现细节

MyThread类内部实现主要涉及以下几个方面:

  • 使用std::thread来创建线程实例。
  • 利用std::mutex和std::condition_variable来实现线程同步。
  • 使用pthread库来设置线程的优先级和CPU亲和性。
/**
* my_thread.h
*/#ifndef UTILS_MY_THREAD_H_
#define UTILS_MY_THREAD_H_#include <atomic>
#include <cstdint>
#include <functional>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
#include <condition_variable>
#include <memory>namespace MyTest {
class MyThread {public:explicit MyThread(const std::string& in_name, int32_t in_priority, uint32_t in_worker_num,const std::function<void()> in_func, int32_t in_cpusetsize, const cpu_set_t*const in_cpuset);~MyThread();void thread_start();void thread_shutdown();bool has_shutdown() const {return is_shutdown_;}void timed_wait(uint32_t useconds);static void set_self_thread_priority(int32_t in_priority);private:static void my_thread_func_(MyThread* thread_ins);private:std::string thread_name_;int32_t thread_priority_;uint32_t thread_worker_num_;std::vector<std::shared_ptr<std::thread>> my_threads_;std::function<void()> func_main_;int32_t thread_cpusetsize_;const cpu_set_t* thread_cpuset_ = nullptr;std::mutex thread_mutex_;std::condition_variable thread_cond_;bool is_shutdown_;
};}  // namespace MyTest#endif  // UTILS_MY_THREAD_H_
/**
* my_thread.cpp
*/#include "my_thread.h"#include <ctime>
#include <sstream>
#ifndef _QNX_
#include <sys/syscall.h>
#endif
#include <sys/types.h>
#include "fake_log.h"namespace MyTest {MyThread::MyThread(const std::string& in_name, int32_t in_priority, uint32_t in_worker_num,const std::function<void()> in_func, int32_t in_cpusetsize, const cpu_set_t* const in_cpuset): thread_name_(in_name),thread_priority_(in_priority),thread_worker_num_(in_worker_num),func_main_(in_func),thread_cpusetsize_(in_cpusetsize),thread_cpuset_(in_cpuset),is_shutdown_(true) {
}MyThread::~MyThread() {if (!is_shutdown_) {thread_shutdown();}
}void MyThread::my_thread_func_(MyThread* thread_ins) {if (thread_ins == nullptr) {MY_LOG_ERROR("thread_ins is nullptr");} else {
#ifndef _QNX_if ((thread_ins->thread_cpuset_ != nullptr) && (thread_ins->thread_cpusetsize_ > 0)) {const pthread_t tid = pthread_self();const auto np_return =pthread_setaffinity_np(tid, static_cast<size_t>(thread_ins->thread_cpusetsize_), thread_ins->thread_cpuset_);if (np_return != 0) {MY_LOG_ERROR("pthread_setaffinity_np failed. return=%d", np_return);}}
#else// qnx ...
#endifstd::stringstream thread_id_stream;thread_id_stream << std::this_thread::get_id();MY_LOG_INFO("thread %s starts. pid=%s target_priority=%d", thread_ins->thread_name_.c_str(),thread_id_stream.str().c_str(), thread_ins->thread_priority_);MyThread::set_self_thread_priority(thread_ins->thread_priority_);try {thread_ins->func_main_();} catch (...) {MY_LOG_ERROR("Exception occurred in thread %s", thread_ins->thread_name_.c_str());}}
}void MyThread::thread_start() {std::lock_guard<std::mutex> lck(thread_mutex_);is_shutdown_ = false;my_threads_.resize(thread_worker_num_);uint32_t thread_idx = 0;for (; thread_idx < thread_worker_num_; thread_idx++) {my_threads_[thread_idx] = std::make_shared<std::thread>(my_thread_func_, this);}
}void MyThread::thread_shutdown() {std::lock_guard<std::mutex> lck(thread_mutex_);if (!is_shutdown_) {is_shutdown_ = true;thread_cond_.notify_all();for (const auto& my_t : my_threads_) {if (my_t->joinable()) {my_t->join();}}}
}void MyThread::timed_wait(uint32_t useconds) {std::unique_lock<std::mutex> lck(thread_mutex_);const auto timeout_val = std::chrono::microseconds(useconds);do {const auto return_val = thread_cond_.wait_for(lck, timeout_val);if (return_val == std::cv_status::timeout) {MY_LOG_ERROR("thread timed_wait timeout");}} while (false);
}void MyThread::set_self_thread_priority(int32_t in_priority) {bool go_on = true;struct sched_param params;struct sched_param current_params;int32_t set_policy{0};int32_t current_policy{0};const pthread_t this_thread = pthread_self();int32_t status_ret = pthread_getschedparam(this_thread, &current_policy, &current_params);if (status_ret != 0) {MY_LOG_ERROR("getschedparam %d", status_ret);go_on = false;} else {MY_LOG_DEBUG("thread current priority is %d (%d), target is %d", current_params.sched_priority, current_policy,in_priority);  // MY_LOG_TRACEif (in_priority == 0) {go_on = false;} else if (in_priority > 0) {set_policy = SCHED_FIFO;params.sched_priority = current_params.sched_priority + in_priority;} else {set_policy = SCHED_IDLE;params.sched_priority = 0;}}if (go_on) {if (params.sched_priority > 99) {params.sched_priority = 99;}if (params.sched_priority < 0) {params.sched_priority = 0;}status_ret = pthread_setschedparam(this_thread, set_policy, &params);if (status_ret != 0) {MY_LOG_WARN("setschedparam(%d)", params.sched_priority);go_on = false;}}if (go_on) {status_ret = pthread_getschedparam(this_thread, &current_policy, &current_params);if (status_ret != 0) {MY_LOG_ERROR("getschedparam 2 %d", status_ret);} else {if (current_params.sched_priority != params.sched_priority) {MY_LOG_ERROR("current priority=%d (%d), target is %d", current_params.sched_priority, current_policy,params.sched_priority);} else {MY_LOG_INFO("set thread priority to %d (%d)", current_params.sched_priority, current_policy);}}}
}}  // namespace MyTest

测试代码

#include "my_thread.h"#include <chrono>
#include <iostream>
#include <string>namespace MyTest {
static void worker_function() {std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Thread ID: " << std::this_thread::get_id() << " is working." << std::endl;
}
}  // namespace MyTestint32_t main() {int32_t main_res{0};try {const uint32_t num_threads = 4;cpu_set_t cpuset1;CPU_ZERO(&cpuset1);CPU_SET(0, &cpuset1);std::vector<std::shared_ptr<MyTest::MyThread>> test_threads;uint32_t thread_idx = 0;for (; thread_idx < num_threads; ++thread_idx) {const std::string thread_name1 = std::string("Thread_") + std::to_string(thread_idx);const std::shared_ptr<MyTest::MyThread> my_t = std::make_shared<MyTest::MyThread>(thread_name1, 1, 1, MyTest::worker_function, sizeof(cpuset1), &cpuset1);test_threads.push_back(my_t);}for (const auto& my_t : test_threads) {my_t->thread_start();}std::this_thread::sleep_for(std::chrono::seconds(2));for (const auto& my_t : test_threads) {my_t->thread_shutdown();}for (const auto& my_t : test_threads) {while (!my_t->has_shutdown()) {// std::this_thread::yield();my_t->timed_wait(100);}}std::cout << "All threads have been shutdown." << std::endl;} catch (...) {std::cerr << "Exception occurred" << std::endl;main_res = 1;}return main_res;
}
http://www.lryc.cn/news/346700.html

相关文章:

  • 【信息系统项目管理师】复习~第十五章
  • ARM单片机实现流水灯(GD32)
  • 操作系统基础之磁盘
  • 【Unity Shader入门精要 第6章】基础光照(一)
  • JavaEE概述 + Maven
  • python把png转成jpg
  • 信息系统架构基本概念及发展_2.信息系统架构的定义
  • ctfshow SSRF 351-358
  • 优化学习方法,事半功倍
  • Spring STOMP-开启STOMP
  • Python 开发 框架安全:Django SQL注入漏洞测试.(CVE-2021-35042)
  • 《Python编程从入门到实践》day25
  • Unity 性能优化之光照优化(七)
  • C语言 | Leetcode C语言题解之第84题柱状图中最大的矩形
  • AI办公自动化-用kimi批量重命名Word文档
  • Golang 并发 Mutex 互斥锁的使用
  • 20232906 2023-2024-2 《网络与系统攻防技术》第九次作业
  • 常见的十二种软件架构
  • 数据库出现死锁的解决方法参考
  • HCIP-Datacom-ARST自选题库_01_防火墙【6道题】
  • 力扣/leetcode383.比特位记数
  • react18【系列实用教程】useEffect —— 副作用操作 (2024最新版)
  • Excel 分组汇总后删除明细
  • docker runc升级1.1.12
  • C++接口:构建模块化与可扩展的软件架构
  • 【讲解下目标追踪】
  • 实时Linux对EtherCAT工业自动化协议的支持
  • ViLT 浅析
  • 7-117 死亡隧道
  • java数据结构与算法(链表归并排序)