当前位置: 首页 > news >正文

Python 生产者消费者模型是什么?

本文首发自「慕课网」,想了解更多IT干货内容,程序员圈内热闻,欢迎关注!

作者| 慕课网精英讲师 朱广蔚

1. 简介

生产者和消费者问题是线程模型中的经典问题:

  • 生产者和消费者共享同一个存储空间
  • 生产者往存储空间中添加产品,消费者从存储空间中取走产品
  • 当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞

Python 的内置模块 queue 提供了对生产者和消费者模型的支持,模块 queue 定义了类 Queue,类 Queue 表示一个被生产者和消费者共享的队列,类 Queue 提供如下常用方法:

方法

功能

get()

从队列中取走数据,如果队列为空,则阻塞

put(item)

向队列中放置数据,如果队列为慢,则阻塞

join()

如果队列不为空,则等待队列变为空

task_done()

消费者从队列中取走一项数据,当队列变为空时,唤醒调用 join() 的线程

2. 实现生产者消费者模型

创建生产者线程和消费者线程,使用一个共享队列连接这两个线程,代码如下:

import threading
import queueq = queue.Queue()
代码块1234
  • 导入 threading 模块和 queue 模块
  • 创建共享队列 q
def produce():for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:q.put(item)print('produce %s' % item)
代码块1234
  • 创建生产者线程的入口函数 produce
  • 生产者生产 8 个数据
  • 调用 q.put(item) 将生产的数据放入到共享队列 q 中
def consume():for i in range(8):item = q.get()print('  consume %s' % item)
代码块1234
  • 创建消费者线程的入口函数 consume
  • 消费者消费 8 个数据
  • 调用 q.get() 从共享队列 q 中取走数据
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
producer.join()
consumer.join()
代码块123456
  • 创建生产者线程 producer,线程入口为 produce
  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程和消费者线程,并等待它们结束

运行程序,输出结果如下:

produce a
produce bconsume a
produce cconsume bconsume c
produce dconsume d
produce econsume e
produce fconsume f
produce gconsume g
produce hconsume h
代码块12345678910111213141516
  • 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
  • 消费者取走了 8 个数据:a、b、c、d、e、f、g、h

3. 实现生产者、计算者、消费者模型

创建生产者、计算者、消费者线程:

  • 生产者生产 8 个数据
  • 计算者对生产者输出的数据进行加工,将加工后的数据送往消费者
  • 消费者取走计算者输出的数据
import threading
import queueq0 = queue.Queue()
q1 = queue.Queue()
代码块12345
  • 导入模块 threading 和模块 queue
  • 使用两个共享队列连接这三个线程共享队列 q0 连接生产者和计算者共享队列 q1 连接计算者和消费者
def produce():for item in ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h']:q0.put(item)print('produce %s' % item)
代码块1234
  • 创建生产者线程的入口函数 produce
  • 生产者生产 8 个数据
  • 调用 q0.put(item) 将生产的数据放入到共享队列 q0 中
def compute():for i in range(8):item = q0.get()item = item.upper() q1.put(item)
代码块12345
  • 创建计算者线程的入口函数 compute
  • 调用 q0.get() 读取生产者输出数据,并进行加工
  • 调用 q1.put(item) 将加工后的数据放入到共享队列 q1 中
def consume():for i in range(8):item = q1.get()print('  consume %s' % item)
代码块1234
  • 创建消费者线程的入口函数 consume
  • 消费者消费 8 个数据
  • 调用 q1.get() 从共享队列 q1 中取走数据
producer = threading.Thread(target=produce, args=())
computer = threading.Thread(target=compute, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
computer.start()
consumer.start()producer.join()
computer.join()
consumer.join()
代码块12345678910
  • 创建生产者线程 producer,线程入口为 produce
  • 创建计算者线程 computer,线程入口为 compute
  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程、计算者线程、消费者线程,并等待它们结束

运行程序,输出结果如下:

produce a
produce b
produce cconsume A
produce d
produce econsume B
produce fconsume C
produce gconsume D
produce hconsume Econsume Fconsume Gconsume H
代码块12345678910111213141516
  • 生产者生产了 8 个数据:a、b、c、d、e、f、g、h
  • 计算者将数据加工为:A、B、C、D、E、F、G、H
  • 消费者取走了 8 个数据:A、B、C、D、E、F、G、H

4. 同步生产者与消费者的推进速度

在生产者、消费者模型中,可能会存在两者推进速度不匹配的问题:生产者生产数据的速度较快,但是,消费者取走数据的速度较慢。

可以使用 queue 的 task_done() 方法和 join() 方法同步生产者与消费者的推进速度:

  • 生产者调用 join() 方法,等待队列中所有的数据被取走
  • 消费者调用 task_done() 方法,表示取走了队列中的一项数据,当队列为空时,唤醒阻塞在 join() 方法中的生产者
import threading
import queueq = queue.Queue()
代码块1234
  • 导入 threading 模块和 queue 模块
  • 创建共享队列 q
def produce():for item in ['A', 'B', 'C', 'D']:q.put(item)print('produce %s' % item)q.join()print('------------ q is empty')for item in ['E', 'F', 'G', 'H']:q.put(item)            print('produce %s' % item)q.join()        print('------------ q is empty')
代码块123456789101112
  • 创建生产者线程的入口函数 produce
  • 首先,生产 4 个数据:A、B、C、D调用 q.put(item) 将它们放入到队列 q 中调用 q.join() 等待消费者将它们全部取走
  • 然后,生产 4 个数据:E、F、G、G调用 q.put(item) 将它们放入到队列 q 中调用 q.join() 等待消费者将它们全部取走
def consume():for i in range(8):item = q.get()print('  consume %s' % item)q.task_done()
代码块12345
  • 创建消费者线程的入口函数 consume
  • 调用 q.get() 从队列 q 中取走一个数据
  • 调用 q.task_done(),表示已经从队列 q 中取走了一个数据,当队列为空时,唤醒生产者
producer = threading.Thread(target=produce, args=())
consumer = threading.Thread(target=consume, args=())
producer.start()
consumer.start()
代码块1234
  • 创建生产者线程 producer,线程入口为 produce
  • 创建消费者线程 consumer,线程入口为 consume
  • 启动生产者线程和消费者线程,并等待它们结束

运行程序,输出结果如下:

produce A
produce Bconsume Aconsume B
produce Cconsume C
produce Dconsume D
------------ q is empty
produce Econsume E
produce Fconsume F
produce G
produce Hconsume Gconsume H
------------ q is empty
代码块123456789101112131415161718
  • 生产者生产第一批数据 A、B、C、D,消费者将其取走
  • 当第一批数据完全被消费者取走后,生产者才开始生产第二批数据
  • 生产者生产第二批数据 E、F、G、H,消费者将其取走

欢迎关注「慕课网」,发现更多IT圈优质内容,分享干货知识,帮助你成为更好的程序员!

http://www.lryc.cn/news/44620.html

相关文章:

  • 手机银行评测系列:北京银行“京彩生活”7.0从用户视角出发,实现沉浸式体验重塑
  • ZJYC2023 浙江省大学生程序设计竞赛校内选拔赛部分题解 C J B L
  • 百科创建:7种有效的百科词条创建技巧
  • ThreeJS-dat.gui界面控制颜色、隐藏、位置(六)
  • 接口自动化测试,完整入门篇
  • 利用ControlNet重新定义你的AI姿势
  • 中医药NER命名实体识别基于SPANNER方式
  • Vue必掌握
  • SSM部分
  • 【Springboot系列】Springboot接管所有Controller,magic-api源码阅读
  • 二、LED子系统数据结构详解
  • Kubernetes(11):数据存储详解
  • 随想录Day43--动态规划: 1049. 最后一块石头的重量 II , 494. 目标和 , 474.一和零
  • Qt中对TCP粘包的处理
  • 贪心-单调递增的数字
  • 你真的会用搜索引擎吗?
  • KDCJ-20kV冲击耐压测试仪
  • 【Mybatis源码分析】TypeAliasRegistry源码分析
  • 节点高负载
  • 动态规划(一) part1
  • Ubuntu显卡报错:Failed to initialize NVML Driver/library version mismatch
  • JAVA企业电子采购系统源码:采购过程更规范,更透明
  • 5.5G产业再提速!高通5GAdvanced-ready芯片商用终端下半年面世
  • 基于B站王阿华的视频——为什么当下自媒体都在制造焦虑以及如何摆脱
  • 一、Docker介绍:
  • Vue进阶(一篇进入Vue3的世界)
  • 功能测试的分类,分别有什么作用?
  • 51单片机学习笔记_14 红外遥控
  • 【我是土堆 - Pytorch教程】 知识点 学习总结笔记(五)
  • JUC篇:CopyOnWriteArrayList的应用与原理