当前位置: 首页 > news >正文

Linux多线程(十二)之【生产者消费者模型】

文章目录

    • 生产者消费者模型
      • 为何要使用生产者消费者模型
        • 生产者消费者模型优点
      • 基于BlockingQueue的生产者消费者模型
        • BlockingQueue
        • C++ queue模拟阻塞队列的生产消费模型
          • 单线程生产消费模型
          • 多线程生产消费模型

生产者消费者模型

consumer/productor

321原则

321原则(便于记忆)

为何要使用生产者消费者模型

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。

生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,

所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,

消费者不找生产者要数据,而是直接从阻塞队列里取,

阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。

生产者消费者模型优点
  1. 解耦
  2. 支持并发
  3. 支持忙闲不均

image-20250425234123376

生产消费模型的高效问题

cp问题高效

基于BlockingQueue的生产者消费者模型

BlockingQueue

在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列区别在于,

当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;

当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出

(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

image-20250425234454539

C++ queue模拟阻塞队列的生产消费模型

代码:

单线程生产消费模型

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);low_water=maxcap_/3;high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);if(q_.size()==0){pthread_cond_wait(&c_cond_,&mutex_);//1.调用的时候,自动释放锁}T t=q_.front();             // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足q_.pop();if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);if(q_.size()==maxcap_){pthread_cond_wait(&p_cond_,&mutex_);//1.调用的时候,自动释放锁}q_.push(in);                // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;int low_water;int high_water;
};

main.cc

#include<iostream>
#include"blockqueue.hpp"
#include<unistd.h>using namespace std;void*Consumer(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);while(1){int data=bq->pop();cout<<"消费了一个数据: "<<data<<endl;// sleep(1);}
}void*Productor(void*args)
{blockqueue<int> *bq=static_cast<blockqueue<int> *>(args);int data=0;while(1){data++;bq->push(data);cout<<"生产了一个数据: "<<data<<endl;sleep(1);}
}int main()
{blockqueue<int> *bq=new blockqueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);delete bq;return 0;
}

image-20250508225919230

消费者sleep

image-20250508230034577

两者都不sleep

image-20250508230244057

多线程生产消费模型

补充:

  1. 为什么判断条件要放到加锁之后?

因为判断临界资源条件是否满足,也是在访问临界资源!

判断临界资源是否就绪,是通过在临界区内部判断的!

  1. 如果临界资源未就绪,那么线程就要进行等待。

等待的时候,线程是持有锁的!所以调用wait时,自动释放锁。

如果不释放锁,直接等待,那么等待的线程就没有线程可以唤醒了,

因为其他线程都在锁外,进不去临界区。

该线程因为唤醒而返回的时候,重新持有锁了。

  1. 如果线程在wait时,被误唤醒了呢?

伪唤醒的概念

伪唤醒

解决方法:判断条件时用while,不用if

blockqueue.hpp

#pragma once#include <iostream>
#include <pthread.h>
#include <queue>using namespace std;template <class T>
class blockqueue
{static const int defaultnum = 20;public:blockqueue(int maxcap = defaultnum): maxcap_(maxcap){pthread_mutex_init(&mutex_, nullptr);pthread_cond_init(&c_cond_, nullptr);pthread_cond_init(&p_cond_, nullptr);// low_water=maxcap_/3;// high_water=maxcap_*2/3;}~blockqueue(){pthread_mutex_destroy(&mutex_);pthread_cond_destroy(&c_cond_);pthread_cond_destroy(&p_cond_);}T pop(){pthread_mutex_lock(&mutex_);while(q_.size()==0)//做到防止代码被伪唤醒!{pthread_cond_wait(&c_cond_,&mutex_);//1.调用的时候,自动释放锁}T t=q_.front();             // 你想消费,就直接能消费吗?不一定。你得先确保消费条件满足q_.pop();// if(q_.size()<low_water)pthread_cond_signal(&p_cond_);pthread_cond_signal(&p_cond_);//pthread_cond_broadcastpthread_mutex_unlock(&mutex_);return t;}void push(const T &in){pthread_mutex_lock(&mutex_);while(q_.size()==maxcap_)//做到防止代码被伪唤醒!{pthread_cond_wait(&p_cond_,&mutex_);//1.调用的时候,自动释放锁}q_.push(in);                // 你想生产,就直接能生产吗?不一定。你得先确保生产条件满足// if(q_.size()>high_water)pthread_cond_signal(&c_cond_);pthread_cond_signal(&c_cond_);pthread_mutex_unlock(&mutex_);}private:queue<T> q_;int maxcap_;// int mincap_;pthread_mutex_t mutex_;pthread_cond_t c_cond_;pthread_cond_t p_cond_;// int low_water;// int high_water;
};

Task.hpp

#pragma once
#include <iostream>
#include<string>
using namespace std;enum
{Div_zero = 1,Mod_zero,Unknown
};class Task
{
public:Task(int x, int y, char op): a(x), b(y), op_(op), ret(0), exitcode(0){}void Run(){switch (op_){case '+':ret = a + b;break;case '-':ret = a - b;break;case '*':ret = a * b;break;case '/':{if (b == 0)exitcode = Div_zero;elseret = a / b;}break;case '%':{if (b == 0)exitcode = Mod_zero;elseret = a % b;}break;default:exitcode=Unknown;break;}}string GetTask(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=???";return r;}string Getret(){string r=to_string(a);r+=op_;r+=to_string(b);r+="=";r+=to_string(ret);r+=" [ exitcode: ";r+=to_string(exitcode);r+=" ]";return r;}void operator()(){Run();}~Task() {}private:int a;int b;char op_;int ret;int exitcode;
};

main.cc

#include <iostream>
#include "blockqueue.hpp"
#include <unistd.h>
#include "Task.hpp"
#include <ctime>using namespace std;string oper = "+-*/%";void *Consumer(void *args)
{blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);while (1){Task data = bq->pop();// data.Run();// 计算// data.Run();data();cout << "处理了一个任务 , 运算结果为: " << data.Getret() << " ,thread id: " << pthread_self() << endl;// sleep(1);}
}void *Productor(void *args)
{int len = oper.size();blockqueue<Task> *bq = static_cast<blockqueue<Task> *>(args);// int x=0,y=0;// Task data(x,y);while (1){// 模拟生产者生产数据int x = rand() % 10 + 1; //[1,10]int y = rand() % 10;     //[0,9];char op = oper[rand() % len];Task data(x, y, op);usleep(10);// 计算bq->push(data);cout << "生产了一个任务: " << data.GetTask() << " ,thread id: " << pthread_self() << endl;sleep(1);}
}int main()
{srand(time(nullptr));blockqueue<Task> *bq = new blockqueue<Task>();pthread_t c[3], p[5];for (int i = 0; i < 3; i++){pthread_create(c + i, nullptr, Consumer, bq);}for (int i = 0; i < 5; i++){pthread_create(p + i, nullptr, Productor, bq);}for (int i = 0; i < 3; i++){pthread_join(c[i], nullptr);}for (int i = 0; i < 5; i++){pthread_join(p[i], nullptr);}delete bq;return 0;
}

image-20250511010553986

http://www.lryc.cn/news/579488.html

相关文章:

  • 汽车ECU产线烧录和检测软件怎么做?
  • Flutter 3.29+使用isar构建失败
  • HarmonyOS ArkTS卡片堆叠滑动组件实战与原理详解(含源码)
  • Java网络编程:TCP/UDP套接字通信详解
  • I/O 进程 7.2
  • 在Ubuntu 24.04主机上创建Ubuntu 14.04编译环境的完整指南
  • (一)复习(模块注入/minimal api/EF和Dapper实现CQRS)
  • Ubuntu Gnome 安装和卸载 WhiteSur-gtk-theme 类 Mac 主题的正确方法
  • Frida:配置自动补全 in VSCode
  • TCP 三次握手与四次挥手详解
  • MyBatis 之基础概念与框架原理详解
  • RabbitMQ 通过HTTP API删除队列命令
  • 【如何判断Linux系统是Ubuntu还是CentOS】
  • Centrifugo 深度解析:构建高性能实时应用的开源引擎
  • 记忆翻牌记忆力小游戏流量主微信小程序开源
  • 网创vip课程视频教程、付费网络课程以及网赚培训,学习引流、建站、赚钱。8个T的全套课程
  • 【2.3 漫画SpringSecurity - 守护应用安全的钢铁卫士】
  • ATE FT ChangeKit学习总结-20250630
  • Easy-excel监听器中对批量上传的工单做错误收集
  • Redisson使用示例
  • 请求未达服务端?iOS端HTTPS链路异常的多工具抓包排查记录
  • 【Bug Recod】更新中...
  • Day50
  • 一文详解Character AI:实用指南+ ChatGPT、Gemini对比分析
  • contenteditable网页富文本编辑无法选中图片
  • Swift 的基础设计哲学是 “通过模块化组合实现安全与效率的平衡“,就像用标准化工业零件建造摩天大楼
  • 一台香港原生ip站群服务器多少钱?
  • 如何在Ubuntu上检查MySQL是否启动并放开3306端口
  • C++笔记-位图和布隆过滤器
  • P1155 [NOIP 2008 提高组] 双栈排序