浅谈对linux进程池的理解
linux进程池
包括主函数、Channel类、ChannelManager类、ProcessPool类和TaskManager类。进程池创建多个子进程,通过管道与父进程通信,父进程分配任务给子进程。
主要思路
- main函数:创建ProcessPool对象,启动进程池,运行10次任务(每次sleep(1)),然后停止进程池。
- Channel类:代表一个通道,包含写端文件描述符(wfd)、子进程ID(subid)和名称。有方法Send()发送整数代码,Close()关闭wfd,Wait()等待子进程结束。
- ChannelManager类:管理多个Channel对象。有方法Insert()添加新通道,Select()以轮询方式选择通道,PrintChannel()打印所有通道名,StopSubProcess()关闭所有通道的wfd,WaitSubProcess()等待所有子进程。
- ProcessPool类:进程池核心。构造函数初始化TaskManager并注册任务函数(PrintLog, Download, Upload)。Start()方法创建多个子进程,每个子进程有自己的管道;子进程执行Work()方法,从管道读取任务码并执行。Run()方法选择任务码和通道,发送任务码给子进程。Stop()方法关闭所有通道并等待子进程。
- TaskManager类:管理任务函数。Register()添加任务,Code()生成随机任务码,Execute()执行对应任务。
#include "ProcessPool.hpp"int main()
{// 创建进程池对象ProcessPool pp(gdefaultnum);// 启动进程池pp.Start();// 自动派发任务int cnt = 10;while (cnt--){pp.Run();sleep(1);}// 回收,结束进程池pp.Stop();return 0;
}
Channel类
class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}void Send(int code){int n=write(_wfd,&code,sizeof(code));(void)n;}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}~Channel(){}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }
private:int _wfd;pid_t _subid;std::string _name;
};
1、构造函数Channel,指i构造一个管道由父进程写入,子进程读,这里参数取wfd和子进程的pid
初始化的是管道名称,wfd就是这个管道在文件描述符表的位置(因为连续创建子进程和管道所以序号是连续的)
2、Send函数是父进程通过write写入、发送code编码(0、1、2表示的PrintLog、Download、Upload)(write写入的是而二进制编码,虽然返回的是字节数)。这里n要void化是因为要显示忽略返回值消除编译器警告,这类操作通常在c/c++我们调用有返回值的函数但不使用其返回值的时候进行的。
ChannelManager类
class ChannelManager
{
public:ChannelManager() : _next(0){}void Insert(int wfd, pid_t subid){_Channel.emplace_back(wfd,subid);}Channel& Select(){Channel &c=_Channel[_next];_next++;_next%=_Channel.size();return c;}void PrintChannel(){for (auto &channel : _Channel){std::cout << channel.Name() << std::endl;}}void StopSubProcess(){for (auto &channel : _Channel){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}void WaitSubProcess(){for (auto &channel : _Channel){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}~ChannelManager() {}private:std::vector<Channel> _Channel;int _next;
};
顾名思义ChannelManager类就是用来管理Channel的,所以我们通过vector数据结构管理,因为后面要轮询使用Channel我们还定义了下标_next
1、CM(ChannelManager)的默认构造并初始化_next
2、使用emplace插入管道避免了临时对象的创建和拷贝,更高效
3、Slelct函数轮询的实现
TaskManager类
#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();//===========================================================
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
//===========================================================class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand() % _tasks.size();}void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code]();}}~TaskManager(){}
private:std::vector<task_t> _tasks;
};
管理任务函数不过多说明
ProcessPool类
const int gdefaultnum = 5;
class ProcessPool
{
public:ProcessPool(int num):_process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}void Stop(){// 关闭父进程所有的wfd即可_cm.StopSubProcess();// 回收所有子进程_cm.WaitSubProcess();}void Work(int rfd){while (true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n != sizeof(code)){continue;}std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if (n == 0){std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}bool Start(){for(int i=0;i<_process_num;i++){//创建管道int fds[2]={0};int n=pipe(fds);if(n<0){return false;}//创建子进程pid_t subid=fork();if(subid<0)return false;else if(subid==0){//子进程//s->r f->wclose(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}else{close(fds[0]);_cm.Insert(fds[1], subid);}}return true;}void Run(){// 1. 选择一个任务int taskcode = _tm.Code();// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务auto &c = _cm.Select();std::cout << "选择了一个管道: " << c.Name() << std::endl;// 2. 发送任务c.Send(taskcode);std::cout << "发送了一个任务码: " << taskcode << std::endl;}~ProcessPool(){}
private:ChannelManager _cm;int _process_num;TaskManager _tm; };
1、pp的构造函数初始化并加载认为函数
2、Start进程池的创建,一个父进程,5个子进程,父进程在循环中插入了五次Channel,每个Channel对应相应的子进程。父写子读(work)
3、work函数,读取父进程Send过来的code,当n!=code时重新进入循环
完整代码
#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include <iostream>
#include <cstdlib> // stdlib.h stdio.h -> cstdlib cstdio
#include <vector>
#include <unistd.h>
#include <sys/wait.h>
#include"Task.hpp"class Channel
{
public:Channel(int fd, pid_t id) : _wfd(fd), _subid(id){_name = "channel-" + std::to_string(_wfd) + "-" + std::to_string(_subid);}void Send(int code){int n=write(_wfd,&code,sizeof(code));(void)n;}void Close(){close(_wfd);}void Wait(){pid_t rid = waitpid(_subid, nullptr, 0);(void)rid;}~Channel(){}int Fd() { return _wfd; }pid_t SubId() { return _subid; }std::string Name() { return _name; }
private:int _wfd;pid_t _subid;std::string _name;
};class ChannelManager
{
public:ChannelManager() : _next(0){}void Insert(int wfd, pid_t subid){_Channel.emplace_back(wfd,subid);}Channel& Select(){Channel &c=_Channel[_next];_next++;_next%=_Channel.size();return c;}void PrintChannel(){for (auto &channel : _Channel){std::cout << channel.Name() << std::endl;}}void StopSubProcess(){for (auto &channel : _Channel){channel.Close();std::cout << "关闭: " << channel.Name() << std::endl;}}void WaitSubProcess(){for (auto &channel : _Channel){channel.Wait();std::cout << "回收: " << channel.Name() << std::endl;}}~ChannelManager() {}private:std::vector<Channel> _Channel;int _next;
};const int gdefaultnum = 5;class ProcessPool
{
public:ProcessPool(int num):_process_num(num){_tm.Register(PrintLog);_tm.Register(Download);_tm.Register(Upload);}void Stop(){// 关闭父进程所有的wfd即可_cm.StopSubProcess();// 回收所有子进程_cm.WaitSubProcess();}void Work(int rfd){while (true){int code = 0;ssize_t n = read(rfd, &code, sizeof(code));if (n > 0){if (n != sizeof(code)){continue;}std::cout << "子进程[" << getpid() << "]收到一个任务码: " << code << std::endl;_tm.Execute(code);}else if (n == 0){std::cout << "子进程退出" << std::endl;break;}else{std::cout << "读取错误" << std::endl;break;}}}bool Start(){for(int i=0;i<_process_num;i++){//创建管道int fds[2]={0};int n=pipe(fds);if(n<0){return false;}//创建子进程pid_t subid=fork();if(subid<0)return false;else if(subid==0){//子进程//s->r f->wclose(fds[1]);Work(fds[0]);close(fds[0]);exit(0);}else{close(fds[0]);_cm.Insert(fds[1], subid);}}return true;}void Run(){// 1. 选择一个任务int taskcode = _tm.Code();// 2. 选择一个信道[子进程],负载均衡的选择一个子进程,完成任务auto &c = _cm.Select();std::cout << "选择了一个管道: " << c.Name() << std::endl;// 2. 发送任务c.Send(taskcode);std::cout << "发送了一个任务码: " << taskcode << std::endl;}~ProcessPool(){}
private:ChannelManager _cm;int _process_num;TaskManager _tm; };#endif
#pragma once#include <iostream>
#include <vector>
#include <ctime>typedef void (*task_t)();//===========================================================
void PrintLog()
{std::cout << "我是一个打印日志的任务" << std::endl;
}void Download()
{std::cout << "我是一个下载的任务" << std::endl;
}void Upload()
{std::cout << "我是一个上传的任务" << std::endl;
}
//===========================================================class TaskManager
{
public:TaskManager(){srand(time(nullptr));}void Register(task_t t){_tasks.push_back(t);}int Code(){return rand() % _tasks.size();}void Execute(int code){if(code >= 0 && code < _tasks.size()){_tasks[code]();}}~TaskManager(){}
private:std::vector<task_t> _tasks;
};
#include "ProcessPool.hpp"int main()
{// 创建进程池对象ProcessPool pp(gdefaultnum);// 启动进程池pp.Start();// 自动派发任务int cnt = 10;while (cnt--){pp.Run();sleep(1);}// 回收,结束进程池pp.Stop();return 0;
}