使用Python多线程实现生产者消费者模型
“Talk is cheap, show me the code.”
废话不多说,直接上代码:
"""
生产者消费者模型 Python实现
"""
import queue
import threading
import random
import timeclass ConsProd:# 队列参数_que = None # 队列# 生产者参数_producer_num = 0 # 生产者数量_prod_block_time = 0.0 # 生产者阻塞时间_prod_work_time = 0.0 # 生产者生产耗时# 消费者参数_consumer_num = 0 # 消费者数量_cons_block_time = 0.0 # 消费者阻塞时间_cons_work_time = 0.0 # 消费者消费耗时def __init__(self, que_size=4, producer_num=4, consumer_num=3, prod_work_time=0.5, cons_work_time=0.3,prod_block_time=1.0, cons_block_time=1.0):self._que_size = que_sizeself._producer_num = producer_numself._consumer_num = consumer_numself._prod_work_time = prod_work_timeself._cons_work_time = cons_work_timeself._prod_block_time = prod_block_timeself._cons_block_time = cons_block_time# 生产者逻辑def __produce(self):while True:if self._que.full():print(f"生产者线程 {threading.currentThread().getName()}: 队列已满,阻塞中...")time.sleep(self._prod_block_time)else:time.sleep(self._prod_work_time)product = random.randint(0, 100)print(f"生产者线程 {threading.currentThread().getName()}: 生产了一个值为 {product} 的数字")self._que.put(product)# 消费者逻辑def __consume(self):while True:if self._que.empty():print(f"消费者线程 {threading.currentThread().getName()}: 队列已空,阻塞中...")time.sleep(self._cons_block_time)else:time.sleep(self._cons_work_time)consumption = self._que.get()print(f"消费者线程 {threading.currentThread().getName()}: 消费了一个值为 {consumption} 的数字")def start(self):# 初始化队列self._que = queue.Queue(self._que_size)# 生产者线程组pro = [threading.Thread(name="prod-" + str(i), target=self.__produce)for i in range(0, self._producer_num)]# 消费者线程组csm = [threading.Thread(name="cons-" + str(i), target=self.__consume)for i in range(0, self._consumer_num)]# 启动所有线程for c in csm:c.start()for p in pro:p.start()if __name__ == "__main__":demo = ConsProd(que_size=16,producer_num=6,consumer_num=4,cons_work_time=0.2,prod_work_time=0.4)demo.start()