子非线程池中物
线程池,又好上了
有任务队列
任务要处理就直接放到里面
预先创建好线程,本质上也是一个生产消费模型
线程池真是麻烦啊
我们可以直接沿用之前写过的代码,Thread.hpp:
#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <pthread.h>namespace ThreadMoudle
{class ThreadData{public:ThreadData(const std::string &name, pthread_mutex_t *lock) : _name(name), _lock(lock){}public: // 正常来说应该是私有但是我不想写接口了凑合看吧std::string _name;pthread_mutex_t *_lock;};// 线程要执行的方法using func_t = std::function<void()>; //返回值是void,参数是空//typedef void (*func_t)(ThreadData *td); // 函数指针类型class Thread{public:void Excute(){_isrunning = true;_func();_isrunning = false;}public:Thread(const std::string &name, func_t func) : _name(name), _func(func){std::cout<<"create"<<name<<"done"<<std::endl;}static void *ThreadRoutine(void *args) // 新线程执行的方法{Thread *self = static_cast<Thread *>(args); // 获得当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); // 这个::指用系统提供的if (n != 0){return false;}return true;}std::string Status() // 线程启动检测下状态{if (_isrunning){return "running";}return "sleep";}void Stop(){if (_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);std::cout << "join done" << std::endl;}std::string Name(){return _name;}~Thread(){Stop();Join();}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数//ThreadData *_td;// std::string _result; //返回值,不关心的话也可以不用写};
}
这是新写的热乎线程池:
#pragma once#include<iostream>
#include<unistd.h>
#include<string>
#include<vector>
#include<queue>
#include"Thread.hpp"using namespace ThreadMoudle;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello EPI?" << std::endl;sleep(1); }
}template<typename T>
class ThreadPool
{
public:ThreadPool(int thraed_num = gdefaultnum):_thread_num(thraed_num),_isrunning(false){}void Init(){for(int i=0;i<_thread_num;i++){std::string threadname = "thread-" + std::to_string(i+1);_threads.emplace_back(threadname,test);}}void Start(){for(auto &thread:_threads){thread.Start();}}void Stop(){}void Enqueue(const T &in){}~ThreadPool(){}
private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;
};
Makefile:
threadpool:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f threadpool
Main.cc:
#include"ThreadPool.hpp"int main()
{//std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>();ThreadPool<int> *tp = new ThreadPool<int>();tp->Init();tp->Start();while (true){//不断向线程池中推送任务sleep(1);}return 0;
}
这是成就:
我们的队列是一个临界资源,我们要对它进行保护捏
这个_task_queue队列是要被主线程访问的
为什么用锁不用信号量呢?
因为都是整体使用,会使用这里的各种属性,我们无法确定里面什么样子
这个线程池的设计思想就是
有任务就去执行
如果没任务就该休眠勒
终于可以正常运行勒,,,
遇到的bug:线程休眠的个数初始设置成0了导致检测的时候检测不到都没有被唤醒
就像是早八的时候一个宿舍都在睡觉
任务还是用之前的代码,唤醒的时候要用while以避免伪唤醒的情况
在初始化中用bind让一个模块能调用另一个模块的方法,,捏
ThreadPool.hpp:
#pragma once#include<iostream>
#include<unistd.h>
#include<functional>
#include<string>
#include<vector>
#include<pthread.h>
#include<queue>
#include"Thread.hpp"using namespace ThreadMoudle;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello EPI?" << std::endl;sleep(1); }
}template<typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}void Sleep(){pthread_cond_wait(&_cond,&_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(){while (true){LockQueue();while(IsEmpty()) //为了防止伪唤醒的情况发生{_sleep_thread_num++; //保证加锁和解锁都安全更新Sleep();_sleep_thread_num--;}//有任务T t = _task_queue.front(); //取出_task_queue.pop(); //老的弹出去UnLockQueue();t(); //处理任务,不能在临界区处理std::cout << t.debug() << std::endl;}}
public:ThreadPool(int thraed_num = gdefaultnum):_thread_num(thraed_num),_isrunning(false),_sleep_thread_num(0){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_cond,nullptr);}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask,this); //让this和handlertask强关联起来,能让一个模块调用另一个类中的方法for(int i=0;i<_thread_num;i++){std::string threadname = "thread-" + std::to_string(i+1);_threads.emplace_back(threadname,func);}}void Start(){for(auto &thread:_threads){thread.Start();}}void Stop(){}void Enqueue(const T &in){LockQueue(); //加锁_task_queue.push(in);if(_sleep_thread_num > 0){WakeUp();}UnLockQueue(); //解锁}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}
private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num; //我们定义一个计数器来确定什么时候唤醒它pthread_mutex_t _mutex;pthread_cond_t _cond;
};
Thread.hpp:
#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <pthread.h>namespace ThreadMoudle
{class ThreadData{public:ThreadData(const std::string &name, pthread_mutex_t *lock) : _name(name), _lock(lock){}public: // 正常来说应该是私有但是我不想写接口了凑合看吧std::string _name;pthread_mutex_t *_lock;};// 线程要执行的方法using func_t = std::function<void()>; //返回值是void,参数是空//typedef void (*func_t)(ThreadData *td); // 函数指针类型class Thread{public:void Excute(){_isrunning = true;_func();_isrunning = false;}public:Thread(const std::string &name, func_t func) : _name(name), _func(func){std::cout<<"create"<<name<<"done"<<std::endl;}static void *ThreadRoutine(void *args) // 新线程执行的方法{Thread *self = static_cast<Thread *>(args); // 获得当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); // 这个::指用系统提供的if (n != 0){return false;}return true;}std::string Status() // 线程启动检测下状态{if (_isrunning){return "running";}return "sleep";}void Stop(){if (_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);std::cout << "join done" << std::endl;}std::string Name(){return _name;}~Thread(){Stop();Join();}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数//ThreadData *_td;// std::string _result; //返回值,不关心的话也可以不用写};
}
Main.cc:
#include"ThreadPool.hpp"
#include"Task.hpp"int main()
{//std::unique_ptr<ThreadPool> tp = std::make_unique<ThreadPool>();ThreadPool<Task> *tp = new ThreadPool<Task>();tp->Init();tp->Start();while (true){//不断向线程池中推送任务sleep(1);Task t(1,1);tp->Enqueue(t);sleep(1);}return 0;
}
Makefile:
threadpool:Main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f threadpool
Task.hpp:
#pragma once#include <iostream>
#include <string>class Task
{
public:Task() : x(0), y(0) {}Task(int a, int b) : x(a), y(b) {}// 执行任务,计算结果void operator()() {result_value = x + y; // 示例操作:计算 x 和 y 的和}// 返回结果int result() const {return result_value;}// 调试输出std::string debug() const {return "Task: x = " + std::to_string(x) + ", y = " + std::to_string(y);}private:int x; // 第一个参数int y; // 第二个参数int result_value; // 计算结果
};
荷叶饭说如果把Makefile都交上去了那证明没什么可交勒
关于线程池退出 方面需要先厘清逻辑,就是如果它任务队列为空而且要退出那就退出捏:
#pragma once#include <iostream>
#include <unistd.h>
#include <functional>
#include <string>
#include <vector>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"using namespace ThreadMoudle;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello EPI?" << std::endl;sleep(1);}
}template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}void WakeUpAll(){pthraed_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(){while (true){LockQueue();while (IsEmpty() && _isrunning) // 为了防止伪唤醒的情况发生{_sleep_thread_num++; // 保证加锁和解锁都安全更新Sleep();_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning) // 空了并且退出那就退罢{std::cout << name << "quit" << std::endl;UnLockQueue();break;}// 有任务T t = _task_queue.front(); // 取出_task_queue.pop(); // 老的弹出去UnLockQueue();t(); // 处理任务,不能在临界区处理std::cout << t.debug() << std::endl;}}public:ThreadPool(int thraed_num = gdefaultnum) : _thread_num(thraed_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this); // 让this和handlertask强关联起来,能让一个模块调用另一个类中的方法for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);}}void Start(){_isrunning = true;for (auto &thread : _threads){thread.Start();}}void Stop(){LockQueue();_isrunning = false;WakeUpAll();UnLockQueue();}void Enqueue(const T &in){LockQueue(); // 加锁if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0){WakeUp();}}UnLockQueue(); // 解锁}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num; // 我们定义一个计数器来确定什么时候唤醒它pthread_mutex_t _mutex;pthread_cond_t _cond;
};
总结
线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
线程池的应用场景:
1. 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了
2. 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求
3. 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情 况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限, 出现错误
线程池示例:
1. 创建固定数量线程池,循环从任务队列中获取任务对象
2. 获取到任务对象后,执行任务对象中的任务接口
线程安全的单例模式
劳斯,什么是单例模式?
单例模式是一种经典的常用的常考的设计模式
什么是设计模式?(有一种递归的感觉由于什么都不知道导致需要连续的搜索一个概念中的另一个概念)
IT行业这么火, 涌入的人很多. 俗话说林子大了啥鸟都有. 大佬和菜鸡们两极分化的越来越严重. 为了让菜鸡们不太拖大佬的后腿, 于是大佬们针对一些经典的常见的场景, 给定了一些对应的解决方案, 这个就是设计模式,,,球球你们不要再跟我比了,计算机比肩土木就都老实勒
《未毕业勿扰》
单例模式的特点:
某些类, 只应该具有一个对象(实例), 就称之为单例,例如一个男人只能有一个媳妇,在很多服务器开发场景中,经常需要让服务器加载很多的数据 (上百G) 到内存中
此时往往要用一个单例的类来管理这些数据
饿汉方式和懒汉方式
关于单例模式的实现,有饿汉方式和懒汉方式
拿洗碗来举例子:
吃完饭, 立刻洗碗, 这种就是饿汉方式. 因为下一顿吃的时候可以立刻拿着碗就能吃饭
吃完饭, 先把碗放下, 然后下一顿饭用到这个碗了再洗碗, 就是懒汉方式
懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度
这是饿汉方式:
template <typename T>
class Singleton
{ static T data;
public: static T* GetInstance() { return &data; }
};
通过Single这个包装类来使用T对象,则一个进程只有一个T对象的实例
这是懒汉方式:
template <typename T>
class Singleton
{ static T* inst;
public: static T* GetInstance() { if (inst == NULL) { inst = new T(); } return inst; }
};
但是这个实现会造成线程不安全的问题
在第一次嗲用GetInstance的时候,如果两个线程同时调用,可能会创建出两份T对象的实例,但后续再次调用就没问题勒
日志是软件运行的记录信息,可以向显示器打印,也可以向文件中打印,这个要有特定的格式
[日志等级][pid][filename][filenumber]日志内容(支持可变参数)
日志等级分为debug(调试信息)和info(常规信息)和warning(警告)和error(错误)和fatal(致命错误)
我们来写一个日志罢,方便看到输出
耶,Log.hpp:
#pragma#include <iostream>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>enum
{DEBUG = 1,INFO,WARNING,ERRNO,FATAL
};std::string LevelToString(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERRNO:return "ERRNO";case FATAL:return "FATAL";default:return "UNKNOW";}
}std::string GetCurTime()
{time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer,sizeof(buffer),"%d-%02d-%02d : %02d-%02d-%02d",\curr_time->tm_year+1900,\curr_time->tm_mon+1,\curr_time->tm_mday,\curr_time->tm_hour,\curr_time->tm_min,\curr_time->tm_sec);return buffer;
}class logmessage
{
public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;
};
class Log
{
public:Log(){}void LogMessage(std::string filename,int filenumber,int level,const char* format,...) //介素可变参数{logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber =filenumber;lg._curr_time = GetCurTime();}~Log(){}
private:
};// int main()
// {// return 0;
// }
可变参数是从右到左入栈的
#pragma#include <iostream>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<stdarg.h>enum
{DEBUG = 1,INFO,WARNING,ERRNO,FATAL
};std::string LevelToString(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERRNO:return "ERRNO";case FATAL:return "FATAL";default:return "UNKNOW";}
}std::string GetCurTime()
{time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer,sizeof(buffer),"%d-%02d-%02d : %02d-%02d-%02d",\curr_time->tm_year+1900,\curr_time->tm_mon+1,\curr_time->tm_mday,\curr_time->tm_hour,\curr_time->tm_min,\curr_time->tm_sec);return buffer;
}class logmessage
{
public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;
};
class Log
{
public:Log(){}void LogMessage(std::string filename,int filenumber,int level,const char* format,...) //介素可变参数{logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber =filenumber;lg._curr_time = GetCurTime();va_list ap;va_start(ap,format); //初始化char log_info[1024];vsnprintf(log_info,sizeof(log_info),format,ap);va_end(ap);lg._message_info = log_info;std::cout << lg._message_info << std::endl;}~Log(){}
private:
};// int main()
// {// return 0;
// }
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{std::cout << GetCurTime() << std::endl;Log lg;lg.LogMessage("main.cc",10,DEBUG,"hello %d,world: %c,hello: %f\n",1000,'A',3.14);return 0;
}
我恨你啊
我的茶餐厅,,,
没了全没了
为什么呢?
可能因为我喜欢吧。
来看看怎么把消息打到文件里吧:
#pragma#include <iostream>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<stdarg.h>
#include<fstream>
#include<cstring>#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";enum
{DEBUG = 1,INFO,WARNING,ERRNO,FATAL
};std::string LevelToString(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERRNO:return "ERRNO";case FATAL:return "FATAL";default:return "UNKNOW";}
}std::string GetCurTime()
{time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer,sizeof(buffer),"%d-%02d-%02d : %02d-%02d-%02d",\curr_time->tm_year+1900,\curr_time->tm_mon+1,\curr_time->tm_mday,\curr_time->tm_hour,\curr_time->tm_min,\curr_time->tm_sec);return buffer;
}class logmessage
{
public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;
};
class Log
{
public:Log(const std::string &logfile = glogfile):_logfile(logfile),_type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time,lg._message_info);}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile);if(!out.is_open()){return;}char logtxt[2048];snprintf(logtxt,sizeof(logtxt),"[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time,lg._message_info);out.write(logtxt,strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){switch(_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void LogMessage(std::string filename,int filenumber,int level,const char* format,...) //介素可变参数{logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber =filenumber;lg._curr_time = GetCurTime();va_list ap;va_start(ap,format); //初始化char log_info[1024];vsnprintf(log_info,sizeof(log_info),format,ap);va_end(ap);lg._message_info = log_info;//打印出来日志FlushLog(lg);}~Log(){}
private:int _type;std::string _logfile;
};// int main()
// {// return 0;
// }
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{std::cout << GetCurTime() << std::endl;Log lg;lg.Enable(FILE_TYPE);lg.LogMessage("main.cc",10,DEBUG,"hello %d,world: %c,hello: %f\n",1000,'A',3.14);return 0;
}
这个打印乱码不知道为什么,好像是说什么强制类型转换出问题了
我也不知道哪错了那就不改了吧
do while(0)是保证宏替换的安全问题
#pragma#include <iostream>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<stdarg.h>
#include<fstream>
#include<cstring>
#include"LockGuard.hpp"#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;enum
{DEBUG = 1,INFO,WARNING,ERRNO,FATAL
};std::string LevelToString(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERRNO:return "ERRNO";case FATAL:return "FATAL";default:return "UNKNOW";}
}std::string GetCurTime()
{time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer,sizeof(buffer),"%d-%02d-%02d : %02d-%02d-%02d",\curr_time->tm_year+1900,\curr_time->tm_mon+1,\curr_time->tm_mday,\curr_time->tm_hour,\curr_time->tm_min,\curr_time->tm_sec);return buffer;
}class logmessage
{
public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;
};class Log
{
public:Log(const std::string &logfile = glogfile):_logfile(logfile),_type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s][%s]",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time,lg._message_info);}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile,std::ios::app);if(!out.is_open()){return;}char logtxt[2048];snprintf(logtxt,sizeof(logtxt),"[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time,lg._message_info);out.write(logtxt,strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){//加过滤逻辑,可以把等级过滤出去LockGuard lockguard(&glock);switch(_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void LogMessage(std::string filename,int filenumber,int level,const char* format,...) //介素可变参数{logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber =filenumber;lg._curr_time = GetCurTime();va_list ap;va_start(ap,format); //初始化char log_info[1024];vsnprintf(log_info,sizeof(log_info),format,ap);va_end(ap);lg._message_info = log_info;//打印出来日志FlushLog(lg);}~Log(){}
private:int _type;std::string _logfile;
};Log lg;#define LOG(Level,Format,...)do{lg.LogMessage(__FILE__,__LINE__,Level,Format,##__VA_ARGS__); }while(0)
#define EnableScreen() do {lg.Enable(SCREEN_TYPE);}while(0)
#define EnableFILE() do{lg.Enable(FILE_TYPE;)}while(0)
赫赫恨
亲爱的荷叶饭如果 你看到这里
和我一起改bug,我在这存个档
Log.hpp:
#pragma once#include <iostream>
#include<unistd.h>
#include<sys/types.h>
#include<ctime>
#include<stdarg.h>
#include<fstream>
#include<cstring>
#include"LockGuard.hpp"#define SCREEN_TYPE 1
#define FILE_TYPE 2
//下面这个是改进之后
Log lg;#define LOG(Level, Format, ...) do { \lg.LogMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \
} while(0)
//#define LOG(Level,Format,...) do{lg.LogMessage(__FILE__,__LINE__,Level,Format,##__VA_ARGS__);}while(0)
#define EnableScreen() do {lg.Enable(SCREEN_TYPE);}while(0)
#define EnableFILE() do{lg.Enable(FILE_TYPE);}while(0)const std::string glogfile = "./log.txt";
pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;enum LogLevel
{DEBUG = 1,INFO,WARNING,ERRNO,FATAL
};std::string LevelToString(int level)
{switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERRNO:return "ERRNO";case FATAL:return "FATAL";default:return "UNKNOW";}
}std::string GetCurTime()
{time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer,sizeof(buffer),"%d-%02d-%02d : %02d-%02d-%02d",\curr_time->tm_year+1900,\curr_time->tm_mon+1,\curr_time->tm_mday,\curr_time->tm_hour,\curr_time->tm_min,\curr_time->tm_sec);return buffer;
}class logmessage
{
public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _curr_time;std::string _message_info;
};class Log
{
public:Log(const std::string &logfile = glogfile):_logfile(logfile),_type(SCREEN_TYPE){}void Enable(int type){_type = type;}void FlushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s][%s]",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile,std::ios::app);if(!out.is_open()){return;}char logtxt[2048];snprintf(logtxt,sizeof(logtxt),"[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._curr_time.c_str(),lg._message_info.c_str());out.write(logtxt,strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){//加过滤逻辑,可以把等级过滤出去LockGuard lockguard(&glock);switch(_type){case SCREEN_TYPE:FlushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void LogMessage(std::string filename,int filenumber,int level,const char* format,...) //介素可变参数{logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber =filenumber;lg._curr_time = GetCurTime();va_list ap;va_start(ap,format); //初始化char log_info[1024];vsnprintf(log_info,sizeof(log_info),format,ap);va_end(ap);lg._message_info = log_info;//打印出来日志FlushLog(lg);}~Log(){}
private:int _type;std::string _logfile;
};
ThreadPool.hpp:
#pragma once#include"Log.hpp"
#include <iostream>
#include <unistd.h>
#include <functional>
#include <string>
#include <vector>
#include <pthread.h>
#include <queue>
#include "Thread.hpp"using namespace ThreadMoudle;static const int gdefaultnum = 5;void test()
{while (true){std::cout << "hello EPI?" << std::endl;sleep(1);}
}template <typename T>
class ThreadPool
{
private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}void WakeUp(){pthread_cond_signal(&_cond);}void WakeUpAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(){while (true){LockQueue();while (IsEmpty() && _isrunning) // 为了防止伪唤醒的情况发生{_sleep_thread_num++; // 保证加锁和解锁都安全更新Sleep();_sleep_thread_num--;}// 判定一种情况if (IsEmpty() && !_isrunning) // 空了并且退出那就退罢{std::cout << "quit" << std::endl;UnLockQueue();break;}// 有任务T t = _task_queue.front(); // 取出_task_queue.pop(); // 老的弹出去UnLockQueue();t(); // 处理任务,不能在临界区处理std::cout << t.debug() << std::endl;}}ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}ThreadPool(const ThreadPool<T> &) = delete; //赫赫这就是我们单例模式void operator = (const ThreadPool<T> &) = delete;
public:void Init(){func_t func = std::bind(&ThreadPool::HandlerTask, this); // 让this和handlertask强关联起来,能让一个模块调用另一个类中的方法for (int i = 0; i < _thread_num; i++){std::string threadname = "thread-" + std::to_string(i + 1);_threads.emplace_back(threadname, func);}}void Start(){_isrunning = true;for (auto &thread : _threads){thread.Start();}}void Stop(){LockQueue();_isrunning = false;WakeUpAll();UnLockQueue();}static ThreadPool<T> *GetInstance(){if(_tp==nullptr){LOG(INFO,"create threadpool\n");_tp=new ThreadPool();_tp->Init();_tp->Start();}else{LOG(INFO,"get threadpool\n");}return _tp;}void Enqueue(const T &in){LockQueue(); // 加锁if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0){WakeUp();}}UnLockQueue(); // 解锁}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;std::vector<Thread> _threads;std::queue<T> _task_queue;bool _isrunning;int _sleep_thread_num; // 我们定义一个计数器来确定什么时候唤醒它pthread_mutex_t _mutex;pthread_cond_t _cond;//单例模式static ThreadPool<T> *_tp;};//静态指针的初始化需要在类外
template<typename T>
ThreadPool<T> *ThreadPool<T>::_tp = nullptr;
Main.cc:
#include"ThreadPool.hpp"
#include"Task.hpp"
#include"Log.hpp"int main()
{Log::FlushLogToScreen();int cnt = 10;while (cnt){sleep(1);Task t(1,1);ThreadPool<Task>::GetInstance()->Enqueue(t);LOG(INFO,"enqueue a task,%s\n",t.debug().c_str());sleep(1);cnt--;}ThreadPool<Task>::GetInstance()->Stop();LOG(INFO,"thraed pool stop!\n");return 0;
}
LockGuard.hpp:
#pragma once
#include<pthread.h>class LockGuard
{
public:LockGuard(pthread_mutex_t *mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t *_mutex;
};
Thread.hpp:
#pragma once
#include <iostream>
#include <functional>
#include <string>
#include <pthread.h>namespace ThreadMoudle
{class ThreadData{public:ThreadData(const std::string &name, pthread_mutex_t *lock) : _name(name), _lock(lock){}public: // 正常来说应该是私有但是我不想写接口了凑合看吧std::string _name;pthread_mutex_t *_lock;};// 线程要执行的方法using func_t = std::function<void()>; //返回值是void,参数是空//typedef void (*func_t)(ThreadData *td); // 函数指针类型class Thread{public:void Excute(){_isrunning = true;_func();_isrunning = false;}public:Thread(const std::string &name, func_t func) : _name(name), _func(func){std::cout<<"create"<<name<<"done"<<std::endl;}static void *ThreadRoutine(void *args) // 新线程执行的方法{Thread *self = static_cast<Thread *>(args); // 获得当前对象self->Excute();return nullptr;}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this); // 这个::指用系统提供的if (n != 0){return false;}return true;}std::string Status() // 线程启动检测下状态{if (_isrunning){return "running";}return "sleep";}void Stop(){if (_isrunning){::pthread_cancel(_tid);_isrunning = false;}}void Join(){::pthread_join(_tid, nullptr);std::cout << "join done" << std::endl;}std::string Name(){return _name;}~Thread(){Stop();Join();}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数//ThreadData *_td;// std::string _result; //返回值,不关心的话也可以不用写};
}
拜托了小蜜蜂君