当前位置: 首页 > news >正文

multiprocessing.Queue 多个进程生产和多个进程消费怎么处理

在这个示例中,我们创建了一个队列 q,并通过 multiprocessing.Manager().Queue() 来确保队列可以在多个进程之间共享。我们定义了 consumerproducer 函数,分别用于从队列中获取数据和向队列中放入数据。

在主进程中,我们创建了多个消费者和生产者进程,并将它们启动。生产者进程将数据放入队列,消费者进程从队列中取出数据并处理。生产者进程完成后,我们向队列发送 None 作为结束信号,告知消费者没有更多数据。每个消费者在接收到 None 后会停止工作。

注意,我们在 consumer 函数中使用了 queue.task_done() 来标记任务完成。这是可选的,但在使用 join() 方法等待队列中的所有任务完成时很有用。

这个模式允许多个生产者并发地向队列中放入数据,同时多个消费者并发地从队列中取出并处理数据,直到所有生产者完成生产,消费者接收到结束信号。

当使用 multiprocessing.Queue 进行多个生产者和多个消费者的场景时,队列可以很好地协调这些进程。以下是一个示例,展示了如何创建多个生产者和多个消费者,它们共享同一个队列:

# encoding:utf-8
import multiprocessing
import time
import randomdef consumer(queue):"""作者:阙辉"""while True:item = queue.get()  # 从队列中获取数据if item is None:print(f"Consumer {multiprocessing.current_process().name} received end signal.")queue.task_done()  # 标记任务完成breakprint(f"Consumer {multiprocessing.current_process().name} received {item}")time.sleep(random.uniform(0.5, 1.5))  # 模拟处理时间queue.task_done()  # 标记任务完成def producer(queue, items):"""作者:阙辉"""for item in items:print(f"Producer {multiprocessing.current_process().name} sent {item}")queue.put(item)time.sleep(random.uniform(0.5, 1.5))  # 模拟生产时间if __name__ == '__main__':manager = multiprocessing.Manager()q = manager.Queue()  # 使用 Manager.Queue 来支持多个生产者和消费者模式# 创建多个消费者进程consumers = [multiprocessing.Process(target=consumer, args=(q,)) for _ in range(4)]# 创建多个生产者进程producers = [multiprocessing.Process(target=producer, args=(q, range(20))) for _ in range(4)]# 启动所有消费者进程for c in consumers:c.start()# 启动所有生产者进程for p in producers:p.start()# 等待所有生产者完成for p in producers:p.join()# 发送结束信号,告知所有消费者没有更多数据for _ in consumers:q.put(None)# 等待所有消费者完成for c in consumers:c.join()print("All tasks completed.")

http://www.lryc.cn/news/386966.html

相关文章:

  • 配置 Python 解释器及虚拟环境
  • JeecgBoot中如何对敏感信息进行脱敏处理?
  • 【Docker】存储数据卷
  • 《昇思25天学习打卡营第12天 | 昇思MindSpore基于MindSpore的GPT2文本摘要》
  • 深入解析npm unpublish命令:使用场景与实践指南
  • 有趣的仿神经猫html5圈小猫游戏源码
  • Redis 7.x 系列【10】数据类型之有序集合(ZSet)
  • 操作系统-文件的物理结构(文件分配方式)
  • Spring Boot集成jsoup实现html解析
  • [240629] 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练 | Jina AI 发布最新的神经网络重排序模型
  • 【Docker0】网络更改
  • IDEA中导入Maven项目
  • px、em、rem、rpx 作用和用法详解
  • Linux 常用命令 - dd 【复制及转换文件内容】
  • 全网唯一免费无水印AI视频工具!
  • kafka(四)消息类型
  • Emacs之显示blame插件:blamer、git-messenger(一百四十四)
  • 【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】
  • 设计模式深入解析与实例应用
  • 服务器数据恢复—异常断电导致RAID6阵列中磁盘出现坏扇区的数据恢复案例
  • 前端工程化08-新的包管理工具pnpm
  • 章十九、JavaVUE —— 框架、指令、声明周期、Vue-cli、组件路由、Element
  • 正则表达式阅读理解
  • Apache Calcite Linq4j学习
  • FPGA SATA高速存储设计
  • MySQL----为什么选择使用MySQL
  • 01.音视频小白系统入门(新专栏)
  • C++:enum枚举共用体union
  • 动手学深度学习(Pytorch版)代码实践 -计算机视觉-47转置卷积
  • LinkedIn被封原因和解封方法