当前位置: 首页 > news >正文

生产者消费者模型线程池(纯代码)

目录

生产者消费者模型

条件变量&&互斥锁(阻塞队列)

makefile

Task.hpp

BlockQueue.hpp

BlockQueueTest.cc

信号量&&互斥锁(环形队列)

makefile

RingQueue.hpp

RingQueueTest.cc

线程池(封装任务) 

makefile

Lock.hpp(RAII)

Log.hpp

Task.hpp

ThreadPool.hpp

ThreadPoolTest.cc


生产者消费者模型

条件变量&&互斥锁(阻塞队列)

makefile

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=blockQueue
src=BlockQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:rm -f $(bin)

Task.hpp

#pragma once#include <iostream>
#include <string>class Task
{
public:Task() : elemOne_(0), elemTwo_(0), operator_('0'){}Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op){}int operator() (){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;}
private:int elemOne_;int elemTwo_;char operator_;
};

BlockQueue.hpp

#pragma once#include <iostream>
#include <queue>
#include <cstdlib>
#include <unistd.h>
#include <pthread.h>using namespace std;const uint32_t gDefaultCap = 5;template <class T>
class BlockQueue
{
public:BlockQueue(uint32_t cap = gDefaultCap) : cap_(cap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&conCond_, nullptr);pthread_cond_init(&proCond_, nullptr);}~BlockQueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&conCond_);pthread_cond_destroy(&proCond_);}public://生产接口void push(const T &in) // const &: 纯输入{// 加锁// 判断->是否适合生产->bq是否为满->程序员视角的条件->1. 满(不生产) 2. 不满(生产)// if(满) 不生产,休眠// else if(不满) 生产,唤醒消费者// 解锁lockQueue();while (isFull()) // ifFull就是我们在临界区中设定的条件{// before: 当我等待的时候,会自动释放mutex_proBlockWait(); //阻塞等待,等待被唤醒。 被唤醒 != 条件被满足(概率虽然很小),被唤醒 && 条件被满足// after: 当我醒来的时候,我是在临界区里醒来的!!}// 条件满足,可以生产pushCore(in); //生产完成// wakeupCon(); // 唤醒消费者unlockQueue();wakeupCon(); // 唤醒消费者}//消费接口T pop(){// 加锁// 判断->是否适合消费->bq是否为空->程序员视角的条件->1. 空(不消费) 2. 有(消费)// if(空) 不消费,休眠// else if(有) 消费,唤醒生产者// 解锁lockQueue();while (isEmpty()){conBlockwait(); //阻塞等待,等待被唤醒,?}// 条件满足,可以消费T tmp = popCore();unlockQueue();wakeupPro(); // 唤醒生产者return tmp;}private:void lockQueue(){pthread_mutex_lock(&mutex_);}void unlockQueue(){pthread_mutex_unlock(&mutex_);}bool isEmpty(){return bq_.empty();}bool isFull(){return bq_.size() == cap_;}void proBlockWait() // 生产者一定是在临界区中的!{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&proCond_, &mutex_);}void conBlockwait() //阻塞等待,等待被唤醒{// 1. 在阻塞线程的时候,会自动释放mutex_锁pthread_cond_wait(&conCond_, &mutex_);// 2. 当阻塞结束,返回的时候,pthread_cond_wait,会自动帮你重新获得mutex_,然后才返回// 为什么我们上节课,写的代码,批量退出线程的时候,发现无法退出?}void wakeupPro() // 唤醒生产者{pthread_cond_signal(&proCond_);}void wakeupCon() // 唤醒消费者{pthread_cond_signal(&conCond_);}void pushCore(const T &in){bq_.push(in); //生产完成}T popCore(){T tmp = bq_.front();bq_.pop();return tmp;}private:uint32_t cap_;           //容量queue<T> bq_;            // blockqueuepthread_mutex_t mutex_;  //保护阻塞队列的互斥锁pthread_cond_t conCond_; // 让消费者等待的条件变量pthread_cond_t proCond_; // 让生产者等待的条件变量
};

BlockQueueTest.cc

#include "Task.hpp"
#include "BlockQueue.hpp"#include <ctime>const std::string ops = "+-*/%";// 并发,并不是在临界区中并发(一般),而是生产前(before blockqueue),消费后(after blockqueue)对应的并发void *consumer(void *args)
{BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){Task t = bqp->pop(); // 消费任务int result = t();    //处理任务 --- 任务也是要花时间的!int one, two;char op;t.get(&one, &two, &op);cout << "consumer[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 消费了一个任务: " << one << op << two << "=" << result << endl;}
}
void *productor(void *args)
{BlockQueue<Task> *bqp = static_cast<BlockQueue<Task> *>(args);while (true){// 1. 制作任务 --- 要不要花时间?? -- 网络,磁盘,用户int one = rand() % 50;int two = rand() % 20;char op = ops[rand() % ops.size()];Task t(one, two, op);// 2. 生产任务bqp->push(t);cout << "producter[" << pthread_self() << "] " << (unsigned long)time(nullptr) << " 生产了一个任务: " << one << op << two << "=?" << endl;sleep(1);}
}int main()
{srand((unsigned long)time(nullptr) ^ getpid());// 定义一个阻塞队列// 创建两个线程,productor, consumer// productor -----  consumer// BlockQueue<int> bq;// bq.push(10);// int a = bq.pop();// cout << a << endl;// 既然可以使用int类型的数据,我们也可以使用自己封装的类型,包括任务// BlockQueue<int> bq;BlockQueue<Task> bq;pthread_t c, p;pthread_create(&c, nullptr, consumer, &bq);pthread_create(&p, nullptr, productor, &bq);pthread_join(c, nullptr);pthread_join(p, nullptr);return 0;
}

信号量&&互斥锁(环形队列)

makefile

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=ringQueue
src=RingQueueTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:rm -f $(bin)

RingQueue.hpp

#pragma once#include <iostream>
#include <vector>
#include <string>
#include <semaphore.h>using namespace std;const int gCap = 10;template <class T>
class RingQueue
{
public:RingQueue(int cap = gCap): ringqueue_(cap), pIndex_(0), cIndex_(0){// 生产sem_init(&roomSem_, 0, ringqueue_.size());// 消费sem_init(&dataSem_, 0, 0);pthread_mutex_init(&pmutex_ ,nullptr);pthread_mutex_init(&cmutex_ ,nullptr);}// 生产void push(const T &in){sem_wait(&roomSem_); //无法被多次的申请pthread_mutex_lock(&pmutex_);ringqueue_[pIndex_] = in; //生产的过程pIndex_++;   // 写入位置后移pIndex_ %= ringqueue_.size(); // 更新下标,保证环形特征pthread_mutex_unlock(&pmutex_);sem_post(&dataSem_);}// 消费T pop(){sem_wait(&dataSem_);pthread_mutex_lock(&cmutex_);T temp = ringqueue_[cIndex_];cIndex_++;cIndex_ %= ringqueue_.size();// 更新下标,保证环形特征pthread_mutex_unlock(&cmutex_);sem_post(&roomSem_);return temp;}~RingQueue(){sem_destroy(&roomSem_);sem_destroy(&dataSem_);pthread_mutex_destroy(&pmutex_);pthread_mutex_destroy(&cmutex_);}
private:vector<T> ringqueue_; // 环形队列sem_t roomSem_;       // 衡量空间计数器,productorsem_t dataSem_;       // 衡量数据计数器,consumeruint32_t pIndex_;     // 当前生产者写入的位置, 如果是多线程,pIndex_也是临界资源uint32_t cIndex_;     // 当前消费者读取的位置,如果是多线程,cIndex_也是临界资源pthread_mutex_t pmutex_;pthread_mutex_t cmutex_;
};

RingQueueTest.cc

#include "RingQueue.hpp"
#include <ctime>
#include <unistd.h>// 我们是单生产者,单消费者
// 多生产者,多消费者??代码怎么改?
// 为什么呢???多生产者,多消费者?
// 不要只关心把数据或者任务,从ringqueue 放拿的过程,获取数据或者任务,处理数据或者任务,也是需要花时间的!void *productor(void *args)
{RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){int data = rand()%10;rqp->push(data);cout << "pthread[" << pthread_self() << "]" << " 生产了一个数据: " << data << endl;sleep(1);}
}void *consumer(void *args)
{RingQueue<int> *rqp = static_cast<RingQueue<int> *>(args);while(true){//sleep(10);int data = rqp->pop();cout << "pthread[" << pthread_self() << "]" << " 消费了一个数据: " << data << endl;}
}int main()
{srand((unsigned long)time(nullptr)^getpid());RingQueue<int> rq;pthread_t c1,c2,c3, p1,p2,p3;pthread_create(&p1, nullptr, productor, &rq);pthread_create(&p2, nullptr, productor, &rq);pthread_create(&p3, nullptr, productor, &rq);pthread_create(&c1, nullptr, consumer, &rq);pthread_create(&c2, nullptr, consumer, &rq);pthread_create(&c3, nullptr, consumer, &rq);pthread_join(c1, nullptr);pthread_join(c2, nullptr);pthread_join(c3, nullptr);pthread_join(p1, nullptr);pthread_join(p2, nullptr);pthread_join(p3, nullptr);return 0;
}

线程池(封装任务) 

makefile

CC=g++
FLAGS=-std=c++11
LD=-lpthread
bin=threadpool
src=ThreadPoolTest.cc$(bin):$(src)$(CC) -o $@ $^ $(LD) $(FLAGS)
.PHONY:clean
clean:rm -f $(bin)

Lock.hpp(RAII)

#pragma once#include <iostream>
#include <pthread.h>class Mutex
{
public:Mutex(){pthread_mutex_init(&lock_, nullptr);}void lock(){pthread_mutex_lock(&lock_);}void unlock(){pthread_mutex_unlock(&lock_);}~Mutex(){pthread_mutex_destroy(&lock_);}private:pthread_mutex_t lock_;
};class LockGuard
{
public:LockGuard(Mutex *mutex) : mutex_(mutex){mutex_->lock();std::cout << "加锁成功..." << std::endl;}~LockGuard(){mutex_->unlock();std::cout << "解锁成功...." << std::endl;}private:Mutex *mutex_;
};

Log.hpp

#pragma once#include <iostream>
#include <ctime>
#include <pthread.h>std::ostream &Log()
{std::cout << "Fot Debug |" << " timestamp: " << (uint64_t)time(nullptr) << " | " << " Thread[" << pthread_self() << "] | ";return std::cout;
}

Task.hpp

#pragma once#include <iostream>
#include <string>class Task
{
public:Task() : elemOne_(0), elemTwo_(0), operator_('0'){}Task(int one, int two, char op) : elemOne_(one), elemTwo_(two), operator_(op){}int operator() (){return run();}int run(){int result = 0;switch (operator_){case '+':result = elemOne_ + elemTwo_;break;case '-':result = elemOne_ - elemTwo_;break;case '*':result = elemOne_ * elemTwo_;break;case '/':{if (elemTwo_ == 0){std::cout << "div zero, abort" << std::endl;result = -1;}else{result = elemOne_ / elemTwo_;}}break;case '%':{if (elemTwo_ == 0){std::cout << "mod zero, abort" << std::endl;result = -1;}else{result = elemOne_ % elemTwo_;}}break;default:std::cout << "非法操作: " << operator_ << std::endl;break;}return result;}int get(int *e1, int *e2, char *op){*e1 = elemOne_;*e2 = elemTwo_;*op = operator_;}
private:int elemOne_;int elemTwo_;char operator_;
};

ThreadPool.hpp

#pragma once#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Log.hpp"
#include "Lock.hpp"using namespace std;int gThreadNum = 5;template <class T>
class ThreadPool
{
private:ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false){assert(threadNum_ > 0);pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&cond_, nullptr);}ThreadPool(const ThreadPool<T> &) = delete;void operator=(const ThreadPool<T>&) = delete;public:static ThreadPool<T> *getInstance(){static Mutex mutex;if (nullptr == instance) //仅仅是过滤重复的判断{LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁if (nullptr == instance){instance = new ThreadPool<T>();}}return instance;}//类内成员, 成员函数,都有默认参数thisstatic void *threadRoutine(void *args){pthread_detach(pthread_self());ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);prctl(PR_SET_NAME, "follower");while (1){tp->lockQueue();while (!tp->haveTask()){tp->waitForTask();}//这个任务就被拿到了线程的上下文中T t = tp->pop();tp->unlockQueue();// for debugint one, two;char oper;t.get(&one, &two, &oper);//规定,所有的任务都必须有一个run方法Log() << "新线程完成计算任务: " << one << oper << two << "=" << t.run() << "\n";}}void start(){assert(!isStart_);for (int i = 0; i < threadNum_; i++){pthread_t temp;pthread_create(&temp, nullptr, threadRoutine, this);}isStart_ = true;}void push(const T &in){lockQueue();taskQueue_.push(in);choiceThreadForHandler();unlockQueue();}~ThreadPool(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&cond_);}private:void lockQueue() { pthread_mutex_lock(&mutex_); }void unlockQueue() { pthread_mutex_unlock(&mutex_); }bool haveTask() { return !taskQueue_.empty(); }void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }void choiceThreadForHandler() { pthread_cond_signal(&cond_); }T pop(){T temp = taskQueue_.front();taskQueue_.pop();return temp;}private:bool isStart_;int threadNum_;queue<T> taskQueue_;pthread_mutex_t mutex_;pthread_cond_t cond_;static ThreadPool<T> *instance;// const static int a = 100;
};template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;

ThreadPoolTest.cc

#include "ThreadPool.hpp"
#include "Task.hpp"
#include <ctime>
#include <thread>// 如何对一个线程进行封装, 线程需要一个回调函数,支持lambda
// class tread{
// };int main()
{prctl(PR_SET_NAME, "master");const string operators = "+/*/%";// unique_ptr<ThreadPool<Task> > tp(new ThreadPool<Task>());unique_ptr<ThreadPool<Task> > tp(ThreadPool<Task>::getInstance());tp->start();srand((unsigned long)time(nullptr) ^ getpid() ^ pthread_self());// 派发任务的线程while(true){int one = rand()%50;int two = rand()%10;char oper = operators[rand()%operators.size()];Log() << "主线程派发计算任务: " << one << oper << two << "=?" << "\n";Task t(one, two, oper);tp->push(t);sleep(1);}
}
http://www.lryc.cn/news/43924.html

相关文章:

  • K8s 应用的网络可观测性: Cilium VS DeepFlow
  • 3.29面试题
  • 操作系统漏洞发现
  • Linux gdb调试底层原理
  • LC-1647. 字符频次唯一的最小删除次数(哈希+计数)
  • HTTP状态码
  • 【Linux】初见“which命令”,“find命令”以及linux执行命令优先级
  • update case when 多字段,多条件, mysql中case when用法
  • mysql隐式转换 “undefined“字符串匹配到mysql int类型0值字段
  • Redis八股文
  • InnoDB——详细解释锁的应用,一致性读,自增长与外键
  • C++模板基础(四)
  • pycharm使用记录
  • Linux命令·kill·killall
  • Linux /proc/version 文件解析
  • 【Django 网页Web开发】15. 实战项目:管理员增删改查,md5密码和密码重置(08)(保姆级图文)
  • STL容器之<array>
  • flask教程6:cookie和session
  • 【JavaEE初阶】第六节.网络原理TCP/IP协议
  • 模式识别 —— 第六章 支持向量机(SVM)与核(Kernel)
  • 总结 synchronized
  • 360周鸿祎又“开炮”:GPT 6-8就将产生自主意识!我们来测算一下对错
  • python——飞机大战小游戏
  • 数组(完全二叉树)向下建堆法与堆排序O(N*logN)
  • Lua require 函数使用
  • 【面试】如何定位线上问题?
  • 字节二面,原来我对自动化测试的理解太浅了
  • Android11.0 应用升级成功后立即断电重启,版本恢复
  • 关于python常用软件用法:Pycharm 常用功能
  • SOLIDWORKS你不知道的小技巧