匿名管道+进程池+命名管道
mkfifo name_pipe
创建管道文件。
命名管道:
路径+文件名具有唯一性。
匿名管道:
进程池代码:
#include<iostream>
#include<unistd.h>
#include<cstdlib>
#include<cassert>
#include<vector>
#include<string>
#include<ctime>
#include<sys/types.h>
#include<sys/wait.h>#define PROCESS_NUM 5
#define MakeSeed() srand((unsigned long)time(nullptr))typedef void(*func_t)();using namespace std;
class subEp
{
public: subEp(pid_t subId,int wirteFd):subId_(subId),wirteFd_(wirteFd){char nameBuffer[1024];snprintf(nameBuffer,sizeof nameBuffer,"process-%d[pid(%d)-fd(%d)]",num++,subId_,wirteFd_);name_=nameBuffer;}static int num;string name_;pid_t subId_;//进程idint wirteFd_;//fd[0] fd[1]文件描述符
};
int subEp::num=0;void downLoadTask()
{cout<<getpid()<<":下载任务\n"<<endl;sleep(1);
}
void ioTask()
{cout<<getpid()<<"IO任务\n"<<endl;
}
void flushTask()
{cout<<getpid()<<"刷新任务\n"<<endl;
}
void loadTaskFunc(vector<func_t>* out)
{assert(out);out->push_back(downLoadTask);out->push_back(ioTask);out->push_back(flushTask);
}
int recvTask(int readFd)
{int code=0;ssize_t s=read(readFd,&code,sizeof code);assert(s==sizeof(int));if(s==4) return code;else if(s<=0) return -1;
}void createSubProcess(vector<subEp>* subs,vector<func_t>& funcMap)
{for(int i=0;i<PROCESS_NUM;i++){int fds[2];int n=pipe(fds);assert(n==0);(void)n;pid_t id=fork();if(id==0){//子进程,进行读close(fds[1]); while(true){//1.获取命令码,如果没有发送,子进程应该阻塞int commenCode=recvTask(fds[0]);//2.完成任务if(commenCode>=0&&commenCode<funcMap.size())funcMap[commenCode]();else if(commenCode==-1)break;}exit(0);}close(fds[0]);subEp sub(id,fds[1]);}
}
void sendTask(const subEp& process,int tasknum)
{cout<<"send task num"<<tasknum<<"sent to -> "<< process.name_<<endl;int n=write(process.wirteFd_,&tasknum,sizeof(tasknum));assert(n==sizeof(int));(void)n;
}
void loadBalanceContrl(vector<subEp> subs,vector<func_t> funcMap,int count)
{//进程个数int processnum=subs.size();//任务个数int tasknum=funcMap.size();bool quit=false;bool forever=(count==0?true:false);while(!quit){//1.选择一个字进程 --> vector<subEp> -> index 随机数int subIdx=rand()%processnum;//2.选择一个任务 --> vector<func_t> -> indexint taskIdx=rand()%tasknum;//3.将任务发送给选择的进程sendTask(subs[subIdx],taskIdx);sleep(1);if(!forever){count--;if(count==0)break;}//write quit -> read 0for(int i=0;i<processnum;i++){close(subs[i].wirteFd_);}}
}
void waitProcess(vector<subEp> process)
{int processnum=process.size();for(int i=0;i<processnum;i++){waitpid(process[i].subId_,nullptr,0);cout<<"wait sub process success ..."<<process[i].subId_<<endl;}
}
int main()
{MakeSeed();//1.建立子进程并建立和子进程通信的管道//1.1 加载方法表vector<func_t> funcMap;loadTaskFunc(&funcMap);//1.2创建子进程,并且维护好父进程通信信道vector<subEp> subs;createSubProcess(&subs,funcMap);//2.到达父进程,父进程控制子进程,负责负载均衡的想子进程发送命令码int taskCnt=3;loadBalanceContrl(subs,funcMap,taskCnt);//3.回收子进程信息waitProcess(subs);return 0;
}
#include <sys/ipc.h>#include <sys/shm.h>int shmget(key_t key, size_t size, int shmflg);
通过ftok()创建key
key能进行唯一性标识
共享内存原理理解:
1.进程间通信,是专门设计的,用来ipc
2.共享内存时一种通信方式,所有需要通信的进程均可以使用
3.OS中一定可能会同时存在很多共享内存
ipcs -m------ Shared Memory Segments --------
key shmid owner perms bytes nattch status
查看共享内存。
ipc资源的特征:
共享内存的生命周期是随OS的,不是随进程的。
client.cc
#include"comm.hpp"int main()
{int wfd=open(NAMED_PIPE,O_WRONLY);if(wfd<0) exit(1);char buffer[1024];while(true){cout<<"Please Say# ";fgets(buffer,sizeof(buffer),stdin);ssize_t n=write(wfd,buffer,strlen(buffer));assert(n=strlen(buffer));(void)n;}close(wfd);return 0;
}
server.cc
#include"comm.hpp"int main()
{bool r=createFifo(NAMED_PIPE);// assert(r);// (void)r;int rfd=open(NAMED_PIPE,O_RDONLY);if(rfd<0) exit(1);//readchar buffer[1024];while(true){ssize_t s=read(rfd,buffer,sizeof(buffer)-1);if(s>0){buffer[s]=0;cout<<"client->server# "<<buffer<<endl;}else if(s==0){cout<<"client quit, me too!!!"<<endl;break;}else{cout<<"err string: "<<strerror(errno)<<endl;break;}}close(rfd);removeFifo(NAMED_PIPE);return 0;
}
comm.hpp
#pragma once#include<iostream>
#include<cstring>
#include<cerrno>
#include<string>
#include<cassert>
#include<unistd.h>
#include<fcntl.h>
#include<sys/types.h>
#include<sys/stat.h>#define NAMED_PIPE "/tmp/mypipe.xxx"using namespace std;
bool createFifo(const string &path)
{umask(0);int n=mkfifo(path.c_str(),0600);if(n==0) return true;else{cout<<"errno: "<<errno<<"err string: "<<strerror(errno)<<endl;}
}
void removeFifo(const string &path)
{int n=unlink(path.c_str());assert(n==0);(void)n;
}