当前位置: 首页 > news >正文

基于互斥锁的生产者消费者模型

文章目录

  • 生产者消费者 定义
  • 代码实现 / 思路
    • 完整代码
      • 执行逻辑 / 思路
    • 局部具体分析
      • model.cc
      • func(消费者线程)
  • 执行结果

生产者消费者 定义

生产者消费者模型 是一种常用的 并发编程模型 ,用于解决多线程或多进程环境下的协作问题。该模型包含两类角色:生产者和消费者

生产者负责生成数据,并将数据存放到共享的缓冲区中。消费者则从缓冲区中获取数据并进行处理。生产者和消费者之间通过共享的缓冲区进行数据交互。

为了确保线程安全,生产者和消费者需要遵循一些规则

  1. 如果缓冲区已满,则生产者需要等待直到有空间可用。
  2. 如果缓冲区为空,则消费者需要等待直到有数据可用。
  3. 生产者和消费者都不能访问缓冲区的内部结构,只能通过特定的接口进行操作。

在这里插入图片描述


代码实现 / 思路

完整代码

#include <iostream>
#include <string>
#include <pthread.h>
#include <unistd.h>// 生产者消费者模型
using namespace std;#define TNUM 4 // 定义将使用的线程数
typedef void (*func_t)(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);
volatile bool quit = false; // 退出信号,默认为false// 定义一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构
// 用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步
class ThreadData
{
public:ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond): _name(name), _func(func), _pmtx(pmtx), _pcond(pcond) {}public:// 成员变量string _name; // 线程名func_t _func; // 函数指针pthread_mutex_t* _pmtx; // 互斥锁指针pthread_cond_t* _pcond; // 条件变量指针
};void func1(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){// wait 需要在加锁和解锁之间pthread_mutex_lock(pmtx); // 加锁//pthread_cond_wait(pcond, pmtx); // 默认该线程在执行时,wait 代码被执行,当前线程会被立即阻塞cout << name << " running <-> 播放" << endl;pthread_mutex_unlock(pmtx); // 解锁}
}void func2(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){// 加锁 等待 解锁pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx);cout << name << " running <-> 下载" << endl;pthread_mutex_unlock(pmtx);}
}void func3(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){// 加锁 等待 解锁pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx);cout << name << " running <-> 刷新" << endl;pthread_mutex_unlock(pmtx);}
}void func4(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){// 加锁 等待 解锁pthread_mutex_lock(pmtx);pthread_cond_wait(pcond, pmtx);cout << name << " running <-> 扫码用户信息" << endl;pthread_mutex_unlock(pmtx);}
}// 线程入口函数
void* Entry(void *args)
{ThreadData* td = (ThreadData*)args; // 获取线程所需的数据td->_func(td->_name, td->_pmtx, td->_pcond);delete td;return nullptr;
}int main()
{// 初始化互斥锁mtx 和 条件变量condpthread_mutex_t mtx;pthread_cond_t cond;pthread_mutex_init(&mtx, nullptr);pthread_cond_init(&cond, nullptr);// 创建 TNUM 个线程,并将每个线程相关的函数和共享的互斥锁、条件变量传递给线程的入口函数 Entry。// 每个线程都有一个不同的名称和要执行的函数(func)pthread_t tids[TNUM];func_t funcs[TNUM] = {func1, func2, func3, func4};for (int i = 0; i < TNUM; i++){string name = "Thread ";name += to_string(i+1);ThreadData *td = new ThreadData(name, funcs[i], &mtx, &cond);pthread_create(tids + i, nullptr, Entry, (void*)td); // 创建线程}// 调用 pthread_cond_signal 函数向条件变量发送信号,通知等待该条件的线程可以继续运行int cnt = 20;while(cnt){cout << "resume thread run code ...." << cnt-- << endl << endl; // 打印输出当前计数器的值,并将计数器减一pthread_cond_signal(&cond); // 恢复线程sleep(1);}// 代码设置 quit 标志为 true,// 调用 pthread_cond_broadcast 函数向所有等待该条件的线程广播信号cout << "ctrl done" << endl;quit = true;pthread_cond_broadcast(&cond); // 唤醒所有等待在条件变量 cond 上的线程// 使用 pthread_join 等待所有线程的完成,然后销毁互斥锁和条件变量for(int i = 0; i < TNUM; i++){pthread_join(tids[i], nullptr);cout << "thread: " << tids[i] << "quit" << endl;}pthread_mutex_destroy(&mtx);pthread_cond_destroy(&cond);return 0;
}
  1. 定义了4个线程函数 func1、func2、func3、func4,分别代表4个线程的执行逻辑。
  2. 定义了一个ThreadData类,用于封装线程相关的信息和共享资源
  3. 主函数中,创建了4个线程,并将每个线程的名称、函数指针、互斥锁和条件变量传递给ThreadData对象,然后通过pthread_create函数创建线程
  4. 主线程通过循环调用pthread_cond_signal函数向条件变量发送信号,唤醒一个等待该条件的线程,然后休眠1秒钟。
  5. 当计数器cnt减为0时,主线程设置quit标志为true,并通过pthread_cond_broadcast函数向所有等待该条件的线程广播信号,通知它们可以退出。
  6. 使用pthread_join函数等待所有线程的完成,然后销毁互斥锁和条件变量

其中,在整段代码中,func1、func2、func3和func4函数分别代表消费者,而主函数中通过循环调用pthread_cond_signal函数唤醒等待条件变量的线程部分代表生产者

具体来说:

  • func1函数代表一个消费者,它的执行逻辑是"播放"。
  • func2函数代表另一个消费者,它的执行逻辑是"下载"。
  • func3函数代表第三个消费者,它的执行逻辑是"刷新"。
  • func4函数代表第四个消费者,它的执行逻辑是"扫描用户信息"。

而在主函数中的循环调用pthread_cond_signal函数,将信号发送给条件变量cond,可以唤醒等待该条件的线程。这里的循环调用部分代表生产者,通过不断唤醒等待的消费者线程来模拟生产者产生了数据(信号)。

执行逻辑 / 思路

  1. 首先,主函数开始执行。在主函数中,初始化了互斥锁mtx条件变量cond

  2. 接下来,使用循环创建了4个线程,并将每个线程对应的名称、函数指针、互斥锁和条件变量传递给ThreadData对象,然后通过pthread_create函数创建线程。这样就创建了4个消费者线程。

  3. 主线程进入一个循环,循环执行20次。在每次循环中,输出当前计数器的值,并将计数器减一。然后通过pthread_cond_signal函数向条件变量发送信号唤醒一个等待该条件的线程。主线程休眠1秒钟,再进行下一次循环。这部分模拟了生产者产生数据的过程。

  4. 当计数器cnt减为0时,主线程quit标志设置为true,表示停止生产数据

  5. 主线程调用pthread_cond_broadcast函数向所有等待条件变量的线程广播信号,通知它们可以退出。这部分模拟了生产者通知消费者停止消费的过程

  6. 最后,主线程通过pthread_join函数等待所有线程的完成。每个消费者线程会不断地等在条件变量上,在接收到信号后执行相应的操作,直到收到停止信号。

  7. 当所有线程完成后,主线程销毁互斥锁和条件变量,程序结束。

总结起来,这段代码的逻辑是创建了4个消费者线程,每个线程都等待条件变量的信号,然后执行相应的操作。主线程作为生产者,通过发送信号唤醒消费者线程来模拟生产数据的过程。最后,当需要停止生产数据时,主线程发送停止信号给消费者线程,消费者线程收到信号后执行完当前操作后退出。整个过程实现了一个简单的生产者消费者模型。


局部具体分析

model.cc

正常编写代码时,为了不污染命名空间,避免命名冲突,一般不会直接进行 using namespcade std; 这里为了方便,直接进行引用。

#define TNUM 4 // 定义将使用的线程数
typedef void (*func_t)(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond);
volatile bool quit = false; // 退出信号,默认为false// 定义一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构
// 用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步
class ThreadData
{
public:ThreadData(const string& name, func_t func, pthread_mutex_t* pmtx, pthread_cond_t* pcond): _name(name), _func(func), _pmtx(pmtx), _pcond(pcond) {}public:// 成员变量string _name; // 线程名func_t _func; // 函数指针pthread_mutex_t* _pmtx; // 互斥锁指针pthread_cond_t* _pcond; // 条件变量指针
};

解释:

  • func_t 是一个函数指针类型,可以指向一个接受 const string& 类型参数、 pthread_mutex_t* 类型参数和 pthread_cond_t* 类型参数的函数,返回类型为 void用于后续对接线程的功能函数
  • ThreadData 是 一个具有名称、函数和同步机制(互斥锁和条件变量)的线程数据结构。用于传递线程相关的信息和共享资源给不同的线程,实现线程间的通信和同步

func(消费者线程)

void func1(const string& name, pthread_mutex_t* pmtx, pthread_cond_t* pcond)
{while(!quit){// wait 需要在加锁和解锁之间pthread_mutex_lock(pmtx); // 加锁//pthread_cond_wait(pcond, pmtx); // 默认该线程在执行时,wait 代码被执行,当前线程会被立即阻塞cout << name << " running <-> 播放" << endl;pthread_mutex_unlock(pmtx); // 解锁}
}
  • func1 为例:
  1. 进入一个无限循环,直到全局变量quittrue才退出。
  2. 在循环内部,首先使用pthread_mutex_lock加锁,保证线程独占互斥锁
  3. 调用pthread_cond_wait等待条件变量,当前线程会被阻塞并释放互斥锁,直到其他线程调用pthread_cond_signalpthread_cond_broadcast来发送信号唤醒该线程。
  4. 线程被唤醒后,输出名称和"running <-> 播放"的信息
  5. 最后使用pthread_mutex_unlock解锁互斥锁

执行结果

在linux下,可以看出来:

当我们执行程序后,四个线程会不断地执行四种操作,并且在一个线程结束当前任务之前,其他线程会进行等待,最后输出线程退出信息。

在这里插入图片描述

http://www.lryc.cn/news/130920.html

相关文章:

  • USB隔离器电路分析,SA8338矽塔sytatek电机驱动,源特科技VPS8701,开关电源,电源 大师
  • TPC-DS 测试是否支持 Glue Data Catalog?
  • 网络编程(8.14)TCP并发服务器模型
  • 认识负载均衡||WEBSHELL
  • Chapter 15: Object-Oriented Programming | Python for Everybody 讲义笔记_En
  • 模板编程-成员特化
  • 信安通用基础知识
  • 网上购物系统的设计与实现/在线商城/基于spring boot的电商平台/基于Java的商品销售系统
  • uniapp项目-配置store文件夹
  • element表格多选实现
  • 宠物智能自动喂食器方案设计
  • 学习笔记230818---对于promise失败状态处理的重要性
  • 【Redis】什么是缓存击穿,如何预防缓存击穿?
  • Android 13.0 强制app横屏显示
  • 平方数之和(力扣)双指针 JAVA
  • 深入浅出Pytorch函数——torch.nn.init.sparse_
  • OpenCV实现BGR2BayerGB/BG格式的转换
  • Gateway网关路由以及predicates用法(项目中使用场景)
  • 深入浅出Pytorch函数——torch.nn.init.constant_
  • centos mysql8解决Access denied for user ‘root‘@‘localhost‘ (using password: YES)
  • Docker实战:Docker常用命令
  • 基于51单片机直流电机转速数码管显示控制系统
  • 小程序商品如何指定打印机
  • LLaMA-7B微调记录
  • 域名子目录发布问题(nginx、vue-element-admin、uni-app)
  • 【环境配置】Windows 10 安装 PyTorch 开发环境,以及验证 YOLOv8
  • 数学建模之“层次分析法”原理和代码详解
  • 使用IText导出复杂pdf
  • 多线程并发服务器(TCP)
  • uni-app的Vue.js实现微信小程序的紧急事件登记页面功能