当前位置: 首页 > news >正文

进程通信:进程池的实现

一、进程池

实现进程池,需要了解一些管道知识,具体请看从ELF到进程间通信:剖析Linux程序的加载与交互机制这篇文章。

在写进程池之前,先简单了解一下进程池。

将来,我们通过父进程创建出一批子进程,父进程要与子进程进行通信,就要建立信道。如果父进程不给子进程写入数据,子进程在读取数据时,会怎么样呢

子进程会被阻塞,父进程向子进程写入数据时,子进程读取数据之后,就可以继续做接下来的工作。

所以,父进程可以通过向管道写入数据,来控制子进程的启停

. processpool.hpp

#ifndef __PROCESS_POOL_HPP__
#define __PROCESS_POOL_HPP__#include <stdio.h>
#include <iostream>
#include <string>
#include <vector>
#include <functional>
#include <stdlib.h>
#include <time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include"Task.hpp"
const int gdefault_process_num = 5;using callback_t = std::function<void(int fd)>;class Channel
{
public:Channel(){}Channel(int wfd, std::string name, pid_t id) : _wfd(wfd), _name(name), _sub_target(id){}void DebugPrint(){printf("channel name:%s, wfd:%d, target pid:%d\n", _name.c_str(), _wfd, _sub_target);}int Fd() { return _wfd; }std::string Name() { return _name; }pid_t Target() { return _sub_target; }void Close() { close(_wfd); }void wait(){int n = waitpid(_sub_target, nullptr, 0);}~Channel(){}private:int _wfd;          // 写描述符std::string _name; // 管道名pid_t _sub_target; // 目标子进程
};class ProcessPool
{
public:ProcessPool(int num = gdefault_process_num) : _processnum(num){srand((unsigned int)time(NULL));}bool InitProcessPool(callback_t cb){Init it;// error//  int pipefd[2] = {0};//  int n = pipe(pipefd);//  if (n < 0)//      return false;// 创建子进程for (int i = 0; i < gdefault_process_num; ++i){// 管道放在循环外面的话,会导致多个子进程共用一个文件描述符,从而导致读失败了,因为子进程访问了其它进程的fd// 创建信道int pipefd[2] = {0};int n = pipe(pipefd);if (n < 0)return false;pid_t id = fork();if (id < 0)return false;if (id == 0){// 子进程// 关闭不需要的读写端close(pipefd[1]);//子进程关闭掉所有的写文件描述符for(auto& c: _channels){c.Close();}// printf("pipefd[0] = %d, pipefd[1] = %d\n", pipefd[0], pipefd[1]);//  做自己的事情cb(pipefd[0]);exit(0);}// 父进程close(pipefd[0]);std::string name = "channel-" + std::to_string(i);_channels.emplace_back(pipefd[1], name, id);// sleep(1);}return true;}// sleep(3);// std::cout << "父进程开始控制" << std::endl;void CtrlSubProcessHelper(int& index){int who = index;index++;index %= _channels.size();int x = rand() % tasks.size();std::cout << "选择信道: " << _channels[who].Name() << ", subtarget : " << _channels[who].Target() << std::endl;write(_channels[who].Fd(), &x, sizeof(x));sleep(2);}// 父进程控制子进程// 采用轮询方案唤醒子进程-------负载均衡void PollingCtrlSubProcess(){int index = 0;while (true){CtrlSubProcessHelper(index);}}void PollingCtrlSubProcess(int count){int index = 0;while (count){CtrlSubProcessHelper(index);count--;}}void WaitAllSubProcess(){// 子进程退出,关闭父进程的写描述符for (auto &c : _channels){c.Close();}for (auto &c : _channels){c.wait();}}~ProcessPool(){}private:std::vector<Channel> _channels; // 所有信道int _processnum;                // 多少个子进程
};#endif

. Task.hpp

#pragma once#include<iostream>
#include<functional>
#include<vector>using task_t = std::function<void()>;void DownLoad()
{std::cout << "我是一个 DownLoad 任务" << std::endl;
}void MySql()
{std::cout << "我是一个 MySql 任务" << std::endl;
}void Sync()
{std::cout << "我是一个数据刷新同步的任务" << std::endl;
}void Log()
{std::cout << "我是一个日志打印的任务" << std::endl;
}std::vector<task_t> tasks;class Init
{
public:Init(){tasks.push_back(DownLoad);tasks.push_back(MySql);tasks.push_back(Sync);tasks.push_back(Log);}
};

. main.cc

#include "processpool.hpp"
#include"Task.hpp"int main()
{ProcessPool pp;pp.InitProcessPool([](int fd){while(true){std::cout << "子进程阻塞: " << getpid() << std::endl;int code = 0;//父进程写端未关闭,写端不写,子进程读取数据就会被阻塞ssize_t n = read(fd, &code, sizeof(code));if(n == sizeof(code)){std::cout << "子进程被唤醒" << getpid() << std::endl;if(code >= 0 && code < tasks.size()){tasks[code]();}else{std::cerr << "父进程给我的任务码是不对的" << code << std::endl;}}else if(n == 0){std::cout << "子进程应该退出了" << std::endl;break;}else{std::cout << "子进程读取失败" << std::endl;break;}sleep(1);} });pp.PollingCtrlSubProcess(10);// std::cout << "父进程退出了" << std::endl;pp.WaitAllSubProcess();return 0;
}

. Makefile

process_pool:main.ccg++ -o $@ $^ -g -std=c++11
.PHONY:clean
clean:rm -f process_pool

这就是进程池的实现啦。但是有一些细节问题需要解决。

void WaitAllSubProcess()
{for(auto& c: _channels){c.Close();c.wait();}
}

关一个写文件描述符,就让父进程回收子进程,是否可以呢

在这里插入图片描述

可以看到,是有问题的。程序并没有退出,子进程也没有被回收,那么,这是为什么呢

在解决这个问题之前,先来想想,为什么上面先关闭全部的写文件描述符,再回收子进程就可以呢

这是因为,父进程关闭了全部的写文件描述符,此时父进程的写端关闭,写端不写,而子进程还在读取数据,那么就会读到文件末尾,返回 0,这时就会跳出循环,子进程就会相继退出,父进程就可以一个个回收子进程了

现在,再来解析刚才提出的问题,为什么程序没有退出,子进程没有被回收?

父进程创建管道,接着创建子进程,那么子进程就会继承父进程的文件描述符表第一个子进程拿到的是 3 和 4 ,接着子进程关闭 4 号文件描述符表,父进程关闭 3 号文件描述符表,第二个子进程拿到的是 3 和 5,子进程关闭 5 号文件描述符,父进程继续关闭 3 号文件描述符,以此类推。

是不是看着挺完美的,但是还有一点细节哦。我们说子进程会继承父进程的文件描述符表所以第二个子进程继承时也会继承父进程第一次打开的文件描述符,也就是 4 号文件描述符,此时这个 4 号文件描述符指向的依然是第一个管道,以此类推,第三个子进程就会有两个文件描述符分别指向第一个管道和第二个管道所以第一个管道的引用计数就会随着子进程增多而逐步增多,最后一个子进程的管道才只有一个写文件描述符指向

所以,我们的第一种做法可以倒着关闭

void WaitAllSubProcess()
{for(int end = gdefault_process_num - 1; end >= 0; --end){_channels[end].Close();_channels[end].wait();}
}

第二种做法就是需要每一个子进程先关闭自己所有的写文件描述符,然后就可以像刚才那样正向关闭

那么,要怎么关闭呢?不要忘了,我们的写文件描述符是保存在 Channel 里面的,子进程在继承的时候,也继承了类里面的对象。父进程先创建子进程,此时父进程还没有将描述符添加进去,所以子进程继承的是父进程之前的版本,比如,第一个子进程继承的对象 _channels 是空的,第二个子进程继承的 _channels里有第一个子进程的写文件描述符,所以,只需要在子进程的执行流里关闭掉所有的写文件描述符就可以了

InitProcessPool函数

在这里插入图片描述

在这里插入图片描述

进程池到这里就结束啦。觉得不错的小伙伴给个一键三连吧。

http://www.lryc.cn/news/625624.html

相关文章:

  • Java 大视界 -- Java 大数据在智能物流无人配送车路径规划与协同调度中的应用
  • 【什么是非晶合金?非晶电机有什么优点?】
  • k8sday11服务发现(2/2)
  • Kubernetes 的 YAML 配置文件-kind
  • 在 Kotlin 中 使用泛型类和泛型函数
  • WRC大会精彩回顾 | NanoLoong机器人足球首秀青龙机械臂咖啡服务双线出击
  • 【论文阅读】DETR3D: 3D Object Detection from Multi-view Images via 3D-to-2D Queries
  • 【新启航】航空飞机起落架深孔型腔的内轮廓检测方法探究 - 激光频率梳 3D 轮廓检测
  • 主流 3D 模型格式(FBX/OBJ/DAE/GLTF)材质支持与转换操作指南
  • STranslate:一键聚合翻译+OCR,效率翻倍
  • CVPR 2025 | 具身智能 | HOLODECK:一句话召唤3D世界,智能体的“元宇宙练功房”来了
  • Chrome原生工具网页长截图方法
  • [Linux] 网络中的 `tun` 模式
  • 神经网络拆解:用Excel模拟手写数字识别
  • Chrome 插件开发实战技术文章大纲
  • 从密度到聚类:DBSCAN算法的第一性原理解析
  • 【数据可视化-93】使用 Pyecharts 绘制旭日图:步骤与数据组织形式
  • 从接口自动化测试框架设计到开发(三)主流程封装、返回数据写入excel
  • 传统艾灸VS七彩喜艾灸机器人:同样的艾香,多了4分“巧”
  • JetBrains系列产品-IDEA/PyCharm/GoLand自动生成方法返回值的快捷键,查看方法参数的快捷键。
  • 0819 使用IP多路复用实现TCP并发服务器
  • Java -- 用户线程和守护线程--线程同步机制
  • Java开发过程中实用的技术点(一)
  • LIA-X - 一张照片生成任意表情肖像动画视频 精准操控面部动作 支持50系显卡 一键整合包下载
  • 免费dll修复?缺少xxx.dll?【图文详解】Visual C++运行库安装?无法定位程序输入点于动态链接库?
  • VulKan笔记(九)-着色器
  • 机器学习--决策树2
  • 力扣57:插入区间
  • 决策树二-泰坦尼克号幸存者
  • 决策树(2)