libevent高并发网络编程 - 06_基于libevent的C++线程池实现
文章目录
- 1 功能简介
- 线程池的初始化
- 线程池执行流程
- 2 线程池类的设计
- 线程类XThread
- XThread.h
- XThread.cpp
- 线程池类XThreadPool
- XThreadPool.h
- XThreadPool.cpp
- 任务基类task
- XTask.h
- 3 自定义任务的例子
- 自定义任务类ServerCMD
- ServerCMD.h
- ServerCMD.cpp
- 测试程序
- 运行效果
1 功能简介
本文利用libevent,实现一个C++线程池,,可自定义用户任务类,继承于任务task基类,重写任务基类的纯虚函数实现多态。比如将定义定义处理客户端的请求任务类,实现对客户端请求的并发处理。
-
工作队列:可以理解为线程的队列,一个线程同时可以处理一个任务,空闲的线程回从任务队列取出任务执行。当工作队列空时,线程会睡眠。
-
任务队列:用户将任务加入任务队列,然后通知工作队列,取出一个任务到线程中执行。
线程池的初始化
线程池执行流程
2 线程池类的设计
线程类XThread
线程类的接口功能
Start() -> 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务
Setup() -> 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
Main() -> 此函数只进入事件循环,等待事件循环退出Notify() -> 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
AddTask() -> 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
Activate() -> 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。调用多次Activate表示加入多个任务,任务顺序被执行。
XThread.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThread.cpp
#include "XThread.h"
#include "XTask.h"
#include <thread>
#include <iostream>
#include <event2/event.h>
#include <unistd.h>using namespace std;XThread::XThread()
{}XThread::~XThread()
{}//sock 文件描述符,which 事件类型 arg传递的参数
/*
* 函数名: NotifyCB
* 作用: 管道可读事件触发回调函数
*/
static void NotifyCB(evutil_socket_t fd, short which, void *arg)
{XThread *th = (XThread*)arg;th->Notify(fd, which);
}/*
* 函数名: XThread::Start
* 作用: 启动线程
* 解释: 管道可读就激活线程;设置管道属性;进入事件循环,等待管道可读激活线程执行任务。
*/
void XThread::Start()
{//安装线程,初始化event_base和管道监听事件用于激活Setup();//启动线程thread th(&XThread::Main, this);//线程分离th.detach();
}/*
* 函数名: XThread::Main
* 作用: 线程入口函数
* 解释: 此函数只进入事件循环,等待事件循环退出
*/
void XThread::Main()
{cout << id << " XThread::Main() begin" << endl;event_base_dispatch(base); //进入事件循环event_base_free(base);cout << id << " XThread::Main() end" << endl;
}/*
* 函数名: XThread::Setup
* 作用: 安装线程
* 解释: 设置管道属性,将管道读事件绑定到event_base中,等待触发,调用回调
*/
bool XThread::Setup()
{//windows用配对socket linux用管道//创建的管道int fds[2];if(pipe(fds)){cerr << "pipe failed!" << endl;return false; }//读取绑定到event事件中,写入要保存//保存管道的写fdnotify_send_fd = fds[1];//创建一个新的事件处理器对象this->base = event_base_new();//创建一个新的事件对象//添加管道监听事件读fd,用于激活线程执行任务event *ev = event_new(base, fds[0], EV_READ|EV_PERSIST, NotifyCB, this);//将事件对象(struct event)添加到指定的事件处理器(event_base)中event_add(ev, 0);return true;
}/*
* 函数名: XThread::Notify
* 作用: 线程激活执行任务
* 解释: 读取管道数据,从当前线程对象的任务队列中取出任务,执行任务
*/
void XThread::Notify(evutil_socket_t fd, short which)
{//水平触发 只要没有接受完成,会再次进来char buf[2] = {0};int len = read(fd, buf, 1);if (len <= 0)return;cout << id << " thread " << buf << endl;//获取任务,并初始化任务XTask* task = NULL;tasks_mutex.lock();if(tasks.empty()){ //队列为空tasks_mutex.unlock();return;}task = tasks.front(); //先进先出tasks.pop_front();tasks_mutex.unlock();task->Init();
}/*
* 函数名: XThread::Activate
* 作用: 激活线程
* 解释: 通过管道发送启动标志,来激活线程,发送一个字符'c'激活相当于加入一个任务对象到当前线程的任务队列,通过Notify()处理。
* 调用多次Activate表示加入多个任务,任务顺序被执行。
*/
void XThread::Activate()
{char act[10] = {0};int len = write(this->notify_send_fd, "c", 1);if (len <= 0){cerr << "XThread::Activate() failed!" << endl;}cout << "currect thread:" << id << ", notify_send_fd:" << this->notify_send_fd << endl;
}/*
* 函数名: XThread::AddTask
* 作用: 将任务对象加入线程对象的任务队列,将线程的事件处理器base,保存到任务对象中
*/
void XThread::AddTask(XTask* task)
{if(!task)return;task->base = this->base;tasks_mutex.lock();tasks.push_back(task);tasks_mutex.unlock();
}
线程池类XThreadPool
线程类的接口功能
GetInstance() -> 单例模式创建返回唯一对象
Init() -> 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
Dispatch() -> 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务
XThreadPool.h
#pragma once
#include <vector>/*线程类声明*/
class XThread;/*任务类声明*/
class XTask;/*线程池类*/
class XThreadPool
{
public://单例模式创建返回唯一对象static XThreadPool* GetInstance();//初始化所有线程并启动线程void Init(int threadCount);//分发线程void Dispatch(XTask* task);private://将构造函数的访问属性设置为 private//将构造函数构造声明成私有不使用//声明成私有不使用XThreadPool(){} //无参构造XThreadPool(const XThreadPool&); //拷贝构造XThreadPool& operator= (const XThreadPool&); //赋值运算符重载//线程数量int threadCount = 0;//用来标记下一个使用的线程号int lastThread = -1;//线程对象数组std::vector<XThread *> threads;//线程池对象static XThreadPool* pInstance;
};
XThreadPool.cpp
#include "XThreadPool.h"
#include "XThread.h"
#include <thread>
#include <iostream>
//#include <chrono>using namespace std;//静态成员变量类外初始化
XThreadPool* XThreadPool::pInstance = NULL;/*
* 函数名: XThreadPool::GetInstance
* 作用: 单例模式创建返回唯一对象
*/
XThreadPool* XThreadPool::GetInstance()
{//当需要使用对象时,访问instance 的值//空值:创建对象,并用instance 标记//非空值: 返回instance 标记的对象if( pInstance == NULL ){pInstance = new XThreadPool();}return pInstance;
}/*
* 函数名: XThreadPool::Init
* 作用: 初始化所有线程并启动线程
* 解释: 创建指定数量线程对象,启动线程,并把线程对象加入到线程池的线程对象数组
*/
void XThreadPool::Init(int threadCount)
{this->threadCount = threadCount;this->lastThread = -1;for (int i = 0; i < threadCount; i++){XThread *t = new XThread();t->id = i + 1;cout << "Create thread " << i << endl;//启动线程t->Start();threads.push_back(t);this_thread::sleep_for(std::chrono::microseconds(10));}
}/*
* 函数名: XThreadPool::Dispatch
* 作用: 分发线程
* 解释: 从线程对象数组取出线程对象,并把任务加入线程对象的任务队列中,激活该线程执行任务。
*/
void XThreadPool::Dispatch(XTask* task)
{//轮询if(!task)return;int tid = (lastThread + 1) % threadCount;lastThread = tid;cout << "lastThread:" << lastThread << endl;XThread *XTh = threads[tid];//添加任务XTh->AddTask(task);//线程激活XTh->Activate();
}
任务基类task
XTask.h
#pragma once
#include <iostream>class XTask
{
public://事件处理器对象struct event_base* base = NULL;//客户端连接的socketint sock = 0;//初始化任务 纯虚函数virtual bool Init() = 0;
};
3 自定义任务的例子
自定义任务类ServerCMD
线程类的接口功能
Init() -> 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数
ReadCB() -> 读事件回调函数
EventCB() -> 客户端超时未发请求,断开连接退出任务
ServerCMD.h
#pragma once#include "XTask.h"class XFtpServerCMD : public XTask
{
public://初始化任务virtual bool Init();XFtpServerCMD();~XFtpServerCMD();
};
ServerCMD.cpp
#include "XFtpServerCMD.h"
#include <event2/event.h>
#include <event2/bufferevent.h>
#include <iostream>
#include <string.h>using namespace std;/*
* 函数名: EventCB
* 作用: 超时事件回调函数
* 解释: 客户端超时未发请求,断开连接退出任务
*/
void EventCB(struct bufferevent *bev, short what, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;//如果对方网络断掉,或者机器死机有可能收不到BEV_EVENT_EOF数据if(what & (BEV_EVENT_EOF | BEV_EVENT_ERROR | BEV_EVENT_TIMEOUT)){cout << "BEV_EVENT_EOF | BEV_EVENT_ERROR |BEV_EVENT_TIMEOUT" << endl;bufferevent_free(bev);delete cmd;}
}/*
* 函数名: ReadCB
* 作用: 读事件回调函数
*/
void ReadCB(struct bufferevent *bev, void *arg)
{XFtpServerCMD* cmd = (XFtpServerCMD*)arg;char data[1024] = {0};for (;;){int len = bufferevent_read(bev, data, sizeof(data)-1);if(len <= 0)break;data[len] = '\0';cout << data << endl << flush;//测试代码,要清理掉if(strstr(data, "quit")){bufferevent_free(bev);delete cmd;break;}}
}/*
* 函数名: XFtpServerCMD::Init
* 作用: 初始化任务
* 解释: 初始化任务,注册当前socket的读事件和超时事件,绑定回调函数。
*/
bool XFtpServerCMD::Init()
{cout << "XFtpServerCMD::Init() sock:" << sock << endl;//监听socket bufferevent// base socketbufferevent* bev = bufferevent_socket_new(base, sock, BEV_OPT_CLOSE_ON_FREE);bufferevent_setcb(bev, ReadCB, 0 ,EventCB, this);bufferevent_enable(bev, EV_READ | EV_WRITE);//添加超时timeval rt = {10, 0}; //10秒bufferevent_set_timeouts(bev, &rt, 0); //设置读超时回调函数return true;
}XFtpServerCMD::XFtpServerCMD()
{}XFtpServerCMD::~XFtpServerCMD()
{}
测试程序
#include <event2/event.h>
#include <event2/listener.h>
#include <string.h>
#include "XThreadPool.h"
#include <signal.h>
#include <iostream>#include "XFtpServerCMD.h"using namespace std;
#define SPORT 5001/*
* 函数名: listen_cb
* 作用: 接收到连接的回调函数
* 解释: 通过多态来创建任务对象,将当前socket保存到任务对象中,分发任务执行
*/
void listen_cb(struct evconnlistener *e, evutil_socket_t s, struct sockaddr *a, int socklen, void *arg)
{cout << "listen_cb" << endl;XTask* task = new XFtpServerCMD();task->sock = s;XThreadPool::GetInstance()->Dispatch(task);
}int main()
{//忽略管道信号,发送数据给已关闭的socketif (signal(SIGPIPE, SIG_IGN) == SIG_ERR)return 1;//1 初始化线程池XThreadPool::GetInstance()->Init(5);std::cout << "test thread pool!\n"; //创建libevent的上下文event_base* base = event_base_new();if (base){cout << "event_base_new success!" << endl;}//监听端口//socket ,bind,listen 绑定事件sockaddr_in sin;memset(&sin, 0, sizeof(sin));sin.sin_family = AF_INET;sin.sin_port = htons(SPORT);evconnlistener* ev = evconnlistener_new_bind(base, // libevent的上下文listen_cb, //接收到连接的回调函数base, //回调函数获取的参数 argLEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, //地址重用,evconnlistener关闭同时关闭socket10, //连接队列大小,对应listen函数(sockaddr*)&sin, //绑定的地址和端口sizeof(sin));//事件分发处理if(base)event_base_dispatch(base);if(ev)evconnlistener_free(ev);if(base)event_base_free(base);return 0;
}
运行效果
初始化线程池,创建5个线程,通过telnet和网络调试软件模拟客户端的接入,客户端发送信息服务器打印出来,当客户端超时未发请求,断开连接退出任务。