Python Day23程序、进程、线程及多线程实现全解析 例题分析
一、核心概念辨析
1. 程序、进程与线程
- 程序:静态的指令集合(如安装的软件、APP)
- 进程:程序运行的动态产物,CPU 任务分配的最小单位,处理计算密集型任务,资源消耗大,数据独立
- 线程:进程内的执行单元,CPU 调度的最小单位,处理 IO 密集型任务,资源消耗小,共享进程数据
2. 执行方式对比
- 串行:任务按顺序执行(效率低)
- 并行:多任务同时执行(单核 CPU 为宏观并行)
- 并发:多线程 / 进程争抢同一资源的现象
二、多线程实现(基于 threading 模块)
1. 面向过程实现
import threading
import timedef execute_task(a, b, *, name, age):time.sleep(0.000000000000001) # 模拟任务耗时print(f'子线程任务:a={a}, b={b}, name={name}, age={age}')if __name__ == '__main__':# 创建线程对象thread = threading.Thread(target=execute_task, # 目标任务函数args=(1, 2), # 位置参数kwargs={'name': 'test', 'age': 18}, # 关键字参数name='测试线程01' # 线程名称)# 启动线程(进入就绪状态,等待CPU调度)thread.start()print('主线程代码执行完毕')
2. 面向对象实现
import threadingclass MyThread(threading.Thread):def __init__(self, name):super().__init__(name=name) # 调用父类构造方法# 重写run方法(线程执行的任务)def run(self):print(f'子线程[{self.name}]执行任务')if __name__ == '__main__':# 创建线程对象thread = MyThread('测试线程01')# 启动线程(自动调用run方法)thread.start()print('主线程代码执行完毕')
三、线程安全与锁机制
1. 锁的类型
- Lock:互斥锁,同一时间仅一个线程持有,不可重入
- RLock:可重入锁,允许同一线程多次获取
2. 售票系统线程安全实现(Lock 应用)
from typing import List
import threading
import timeclass Ticket:def __init__(self, number):self.number = number # 调用setter方法@propertydef number(self):return self.__number@number.setterdef number(self, number):self.__number = f"NO.{number:0>8}" # 格式化票号def __repr__(self):return self.__numberclass TicketWindow(threading.Thread):def __init__(self, name, ticket_list, lock):super().__init__(name=name)self.ticket_list = ticket_listself.lock = lock # 传入锁对象def run(self):while len(self.ticket_list) > 0:# 使用with自动获取和释放锁,确保线程安全with self.lock:if len(self.ticket_list) > 0:ticket = self.ticket_list.pop(0) # 取出第一张票print(f"窗口[{self.name}]售票:{ticket},剩余{len(self.ticket_list)}张")time.sleep(0.25) # 模拟售票耗时else:print(f"窗口[{self.name}]票已售罄")if __name__ == '__main__':# 创建100张票(票号100~199)ticket_list = [Ticket(x) for x in range(100, 200)]lock = threading.Lock() # 创建锁对象threads = []# 创建5个售票窗口(5个线程)for i in range(5):window = TicketWindow(f"窗口-{i}", ticket_list, lock)threads.append(window)window.start()# 等待所有线程执行完毕for thread in threads:thread.join()
四、多线程实践:计算素数
import threadingdef get_primes(numbers, result, lock):"""计算素数并存储结果"""while True:try:number = next(numbers) # 从生成器获取数字# 判断是否为素数for i in range(2, int(number**0.5) + 1):if number % i == 0:breakelse: # 无整除情况,是素数with lock: # 加锁保证结果列表安全print(f"线程[{threading.current_thread().name}]找到素数:{number}")result.append(number)except StopIteration: # 生成器耗尽breakif __name__ == '__main__':# 生成1~100000000的数字(生成器节省内存)numbers = (x for x in range(2, 100000000))result = [] # 存储素数结果lock = threading.Lock() # 结果列表的锁threads = []# 创建200个线程for i in range(200):thread = threading.Thread(target=get_primes,args=(numbers, result, lock),name=f"计算线程-{i}")threads.append(thread)thread.start()# 等待所有线程完成for thread in threads:thread.join()print(f"找到的素数总数:{len(result)}")# print(result) # 如需查看具体素数可取消注释
关键说明
- 线程启动后进入就绪状态,需等待 CPU 调度才会执行
join()
方法用于阻塞主线程,等待子线程完成- 锁机制用于解决线程安全问题,保护共享资源
- 线程数量建议控制在 50~100 个(避免资源过度消耗)
- 生成器适合处理大量数据(减少内存占用)
一、多线程计算 10000 以内的所有素数
问题描述
使用多线程并行计算 1-10000 范围内的所有素数,通过线程协作提高计算效率,并用锁机制保证结果数据的线程安全。
实现代码
import threadingdef get_primes(numbers_generator, result_list, lock):"""计算素数的线程任务函数:param numbers_generator: 生成待计算数字的生成器:param result_list: 存储素数结果的列表:param lock: 保证结果列表操作安全的锁对象"""while True:try:# 从生成器获取下一个待计算数字num = next(numbers_generator)# 素数判断(1不是素数,直接跳过)if num < 2:continueis_prime = Truefor i in range(2, int(num**0.5) + 1):if num % i == 0:is_prime = Falsebreak# 如果是素数,加锁后存入结果列表if is_prime:with lock:result_list.append(num)print(f"线程:{threading.current_thread().name}计算得出素数{num}")except StopIteration:# 生成器耗尽,线程任务结束breakif __name__ == '__main__':# 生成1-10000的数字生成器(节省内存)numbers = (x for x in range(1, 10000))# 创建锁对象(保证多线程操作结果列表安全)lock = threading.Lock()# 存储素数结果的列表primes_result = []# 线程列表(用于管理所有线程)threads_list = []# 创建50个线程for i in range(50):thread = threading.Thread(target=get_primes,args=(numbers, primes_result, lock),name=f"素数计算线程-{i}")threads_list.append(thread)thread.start()# 等待所有线程执行完毕for thread in threads_list:thread.join()# 输出最终结果print(f"\n10000以内的素数共有{len(primes_result)}个")# 排序后输出(可选)# print(sorted(primes_result))
关键说明
- 素数判断逻辑修正:原代码中素数判断存在逻辑错误(循环内提前添加结果),修正后先完整判断是否为素数,再统一处理结果
- 线程安全保证:通过
threading.Lock()
实现对结果列表的互斥访问,避免多线程同时修改导致的数据混乱 - 任务分配方式:使用生成器
(x for x in range(1, 10000))
动态提供待计算数字,实现多线程间的任务自动分配 - 线程管理:通过
join()
方法确保主线程等待所有计算线程完成后再输出结果
二、多线程实现多窗口售票系统
问题描述
模拟多窗口同时售票场景,使用多线程技术实现票池资源的共享访问,通过锁机制防止超售、重复售票等线程安全问题。
实现代码
import threading
import timeclass Ticket:"""票券类,负责票号格式化和展示"""def __init__(self, number):self.number = number # 调用setter方法设置票号@propertydef number(self):return self.__number@number.setterdef number(self, number):self.__number = f"NO.{number:>06}" # 格式化票号为6位数字(如NO.000100)def __repr__(self):return self.__number # 打印对象时显示票号class TicketWindow(threading.Thread):"""售票窗口类,继承Thread实现多线程售票"""def __init__(self, name, ticket_list, lock):super().__init__(name=name) # 调用父类构造方法,设置线程名称self.ticket_list = ticket_list # 共享的票池列表self.lock = lock # 锁对象,保证售票操作安全def run(self):"""线程执行的核心方法,实现售票逻辑"""while len(self.ticket_list) > 0:# 使用with语句自动获取和释放锁,简化锁管理with self.lock:if len(self.ticket_list) > 0:# 取出第一张票(模拟售票)ticket = self.ticket_list.pop(0)print(f"{self.name}正在售卖票号为:{ticket}的票、剩余的张数是{len(self.ticket_list)}")else:print(f"{self.name}:票已售罄")# 模拟售票间隔时间time.sleep(0.1)if __name__ == '__main__':# 创建票池(200张票,票号100-299)ticket_list = [Ticket(x) for x in range(100, 300)]# 创建锁对象lock = threading.Lock()# 窗口(线程)列表window_list = []# 创建5个售票窗口for i in range(5):window = TicketWindow(f"窗口-{i}", ticket_list, lock)window_list.append(window)window.start() # 启动窗口线程
关键说明
- 面向对象设计:通过
Ticket
类封装票券信息,TicketWindow
类继承Thread
实现多线程售票 - 锁机制应用:使用
with self.lock
将售票核心操作(取票、打印)变为原子操作,防止多窗口同时操作票池导致的数据不一致 - 共享资源管理:所有窗口共享同一个票池列表
ticket_list
,通过锁实现对共享资源的安全访问 - 线程任务逻辑:
run()
方法中通过while
循环持续售票,直到票池为空
三、多线程模拟龟兔赛跑
问题描述
模拟龟兔赛跑场景:兔子每秒跑 90 毫米,跑 5 秒后休息 3 分钟;乌龟每秒跑 10 毫米,匀速前进;赛道长 2000 毫米。每 0.1 秒输出两者的距离,最终判断胜负。
实现代码
import threading
import timeclass RaceParticipant(threading.Thread):"""赛跑参与者类,继承Thread实现多线程赛跑模拟"""def __init__(self, name, speed, rest_time=0):super().__init__()self.name = name # 参与者名称(兔子/乌龟)self.speed = speed # 速度(毫米/秒)self.rest_time = rest_time # 休息时间(秒)self.distance = 0.0 # 已跑距离(毫米)def run(self):"""线程执行方法,实现赛跑逻辑"""global race_over # 全局变量,标记比赛是否结束while self.distance < 2000 and race_over:# 兔子特殊逻辑:跑5秒(450毫米)后休息if self.name == "兔子" and self.distance >= 450 and self.rest_time > 0:print(f"兔子开始休息...当前距离: {self.distance:.1f}mm")time.sleep(self.rest_time) # 休息3分钟(180秒)self.rest_time = 0 # 休息后重置,避免重复休息# 每0.1秒前进的距离(速度×时间)self.distance += self.speed * 0.1# 控制输出频率(每0.1秒一次)time.sleep(0.1)# 打印当前进度(由乌龟线程统一打印,避免输出混乱)if self.name == "乌龟":print(f"兔子跑了 {rabbit.distance:.1f}mm, 乌龟跑了 {self.distance:.1f}mm")# 判断是否到达终点if self.distance >= 2000:race_over = False # 标记比赛结束breakif __name__ == '__main__':race_over = True # 全局变量:控制比赛状态# 创建兔子线程(速度90mm/s,休息180秒)rabbit = RaceParticipant("兔子", speed=90, rest_time=180)# 创建乌龟线程(速度10mm/s,不休息)turtle = RaceParticipant("乌龟", speed=10)# 启动线程rabbit.start()turtle.start()# 等待比赛结束rabbit.join()turtle.join()# 判断并输出结果print("\n比赛结束!")if rabbit.distance >= 2000 and turtle.distance >= 2000:print("平局!")elif rabbit.distance >= 2000:print("兔子赢了!")else:print("乌龟赢了!")
关键说明
- 线程协作:通过全局变量
race_over
实现两个线程的状态同步,一方到达终点后立即结束比赛 - 行为模拟:
- 兔子:实现 "跑 5 秒(450mm)后休息 3 分钟" 的逻辑
- 乌龟:保持匀速前进(10mm/s)
- 输出控制:由乌龟线程统一打印两者距离,避免多线程同时输出导致的信息混乱
- 时间控制:通过
time.sleep(0.1)
实现每 0.1 秒更新一次距离,符合题目要求的输出频率
总结:多线程核心知识点应用
- 线程创建:两种方式(面向过程:
Thread(target=函数)
;面向对象:继承Thread
并重写run()
) - 线程安全:通过
threading.Lock()
实现共享资源的互斥访问,防止数据竞争 - 线程同步:使用
join()
方法控制线程执行顺序,确保主线程等待子线程完成 - 任务分配:通过生成器、共享列表等方式实现多线程间的任务分配
- 线程通信:通过全局变量实现线程间状态共享(如龟兔赛跑中的
race_over
)