当前位置: 首页 > news >正文

Python 多线程、多进程和协程

一、多线程

threading 模块

threading 模块对象

对象描述
Thread表示一个执行线程的对象
Lock锁原语对象(与 thread 模块中的锁一样)
RLock可重入锁对象,使单一线程可以(再次)获得已持有的锁(递归锁)
Condition条件变量对象,使得一个线程等待另一个线程苗族特定的条件,比如改变状态或某个数据值
Event添加变量的通用版本,任意数量的线程等待某个时间的发生,在该事件发生后所有线程将被激活
Semaphore为线程键共享的有限资源提供一个 计算器(信号量),如果没有可用资源时会被阻塞
BoundedSemaphore与 Semaphore 相似,不过不允许超过初始值
Timer与 Thread 相似,在运行前要等待一段时间
Barrier创建一个障碍,必须达到指定数量的线程后才可以继续

守护线程

1、thread 模块不支持守护线程,当主线程退出后,所有子线程也会退出,不管其是否在工作

2、threading 模块支持守护线程:等待一个客户端请求服务的服务器,如果客户端没有请求,守护线程是空闲的,如果把一个线程设置为守护线程,就表示这个线程不重要的。进程退出时不需要等待这个线程执行完成。

如果主线程退出时,不需要等待某些子线程完成,可以将子线程设置为 守护线程,标记为真时,表示该线程不重要。在启动线程前执行 thread.daemon=True 可以设置守护线程,检查线程的守护状态也可以判断它。

Thread 类

threading 模块的 Thread 类是主要的执行对象,下面是 Thread 对象的属性和方法列表:

属性描述方法描述
name线程名start()开始执行该线程
ident线程的标识符run()定义线程功能的方法,通常在子类中被应用开发者重写
daemon布尔值,表示这个线程是否是守护线程join(timeout=None)直至启动的线程终止之前一直挂起,除非给出 timeout,否则一直阻塞
Thread(group=None, target=None, name=None, agrs=(), kwargs={}, verbose=None, daemon=None)# 实例化一个线程对象,需要一个可调用的 target,一般是函数,及其参数 args(元组)或 kwargs
# 也可以传递 name 或 group 参数,daemon 将会设定 thread.daemon 属性/标志

创建线程的三种方法

创建 Thread 实例,传递给它一个函数

import threading
from time import sleep, ctimeloops = [4, 2]
def loop(nloop, nsec):print('loop 函数开始执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函数结束执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函数开始执行:', ctime())threads = []nloops = range(len(loops))for i in nloops:t = threading.Thread(target=loop, args=(i, loops[i]))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序结束:', ctime())if __name__ == '__main__':main()

上述程序将生成两个线程,将其添加到一个列表中,循环启动 start()join() 方法将会程序挂起,会等待所有线程结束或超时。因此要比
等待释放的无限循环更加清晰。

运行结果如下:

主函数开始执行: Sat Sep  7 17:31:40 2019
loop 函数开始执行 0,时间: Sat Sep  7 17:31:40 2019
loop 函数开始执行 1,时间: Sat Sep  7 17:31:40 2019
loop 函数结束执行 1,时间: Sat Sep  7 17:31:42 2019
loop 函数结束执行 0,时间: Sat Sep  7 17:31:44 2019
程序结束: Sat Sep  7 17:31:44 2019

创建 Thread 的实例,传给它一个可调用的类实例

import threading
from time import sleep, ctimeloops = [4, 2]class ThreadFunc:def __init__(self, func, args, name=''):self.name = nameself.func = funcself.args = argsdef __call__(self, *args, **kwargs):"""当调用 ThreadFunc() 时会自动执行 fun()"""self.func(*self.args)def loop(nloop, nsec):print('loop 函数开始执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函数结束执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函数开始执行:', ctime())threads = []nloops = range(len(loops))for i in nloops:            # 创建 Thread 的实例,传给它一个可调用的类实例t = threading.Thread(target=ThreadFunc(loop, (i, loops[i]), loop.__name__))threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序结束:', ctime())if __name__ == '__main__':main()

派生 Thread 的子类,并创建子类的实例

自定义的类要继承 threading.Thread,构造函数必须先调用其基类的构造函数,__call__() 在子类中必须要写 run()

import threading
from time import sleep, ctimeloops = [4, 2]class MyThread(threading.Thread):def __init__(self, func, args, name=''):threading.Thread.__init__(self)self.name = nameself.func = funcself.args = argsdef run(self, *args, **kwargs):self.func(*self.args)def loop(nloop, nsec):print('loop 函数开始执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})sleep(nsec)print('loop 函数结束执行 %(nloop)s,时间: %(ctime)s' % {'nloop': nloop, 'ctime': ctime()})def main():print('主函数开始执行:', ctime())threads = []nloops = range(len(loops))for i in nloops:t = MyThread(loop, (i, loops[i]), loop.__name__)threads.append(t)for i in nloops:threads[i].start()for i in nloops:threads[i].join()print('程序结束:', ctime())if __name__ == '__main__':main()

threading 模块的其他函数

  • active_count():当前活动的 Thread 对象个数
  • current_thread:返回当前的 Thread 对象
  • enumerate():返回当前活动的 Thread 对象列表
  • settrace(func):为所有线程设置一个 trace 函数
  • setprofile(func):为所有线程设置一个 profile 函数
  • stack_size(size=0):返回新建线程的栈大小,或为后续创建的线程设定栈的大小为 size

启动和停止线程

import time
import threadingdef countdown(n):while  n > 0:print('T-minus:%s,time:%s' % (n, time.ctime()))n -= 1time.sleep(1)if t.is_alive():print('Still running...')else:print('Completed')print('主线程...')if __name__ == '__main__':t = threading.Thread(target=countdown, args=(5, ))t.start()t.join()

判断一个线程是否存活,可以调用 is_alive() 方法,解释器会在所有线程都结束后才执行剩余的代码,如果需要长时间运行的线程
或者一直运行的后台任务,可以使用后台线程:

threading.Thread(target=countdown, args=(5, ), daemon=True)

后台线程无法等待,这些线程会在主线程终止时自动销毁。但是你也不能对线程做额外的高级操作,如:发送信号,调整它的调度,终止线程等。

join() 方法

join() 方法将悬挂当前子线程,直至所有子线程结束。

import time, threadingdef loop():print('thread %s is running...' % threading.current_thread().name)n = 0while n < 5:n += 1print('thread %s >>> %s' % (threading.current_thread().name, n))time.sleep(1)print('thread %s ended.' % threading.current_thread().name)if __name__ == '__main__':print('thread %s is running>>>>' % threading.current_thread().name)t = threading.Thread(target=loop, name='LoopThread')t.start()t.join()print('thread %s ended>>>>' % threading.current_thread().name)
  • name:指定子线程名字,不指定默认为 thread-1、thread-2
  • MainThread 为主线程
  • threading.current_thread().name:获取当前线程的实例(名字)

可以看到 join() 它将子线程添加到当前主线程中,并等待子线程终止,才允许它后面的代码:

thread MainThread is running>>>>
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended>>>>

停止线程

如果线程执行一些如 I/O 这样的阻塞操作,通过轮询来终止线程将使得线程间的协调变得非常棘手。如,如果一个线程一直阻塞在一个 I/O
操作上,就永远无法返回检查自己是否已经被结束了。要正确处理这些问题,需要利用 超时循环 来小心操作线程:

class IOTask:def terminate(self):self._running = Falsedef run(self, sock):# sock is a socketsock.settimeout(5)      # set timeout periodwhile self._running:try:data = sock.recv(8192)breakexcept socket.timeout:continue# continued processing...# Terminatedreturn

判断线程是否启动

启动了一个线程,但是你想知道它是否真的已经开始了,由于线程是独立运行的且状态不可预测的。如果程序中其他线程
要通过判断某个线程的状态来确定自己的下一步操作,这就会显得很棘手。

我们可以使用 Event 对象来解决这个问题,Event 对象包含一个可由线程设置信号的标志,允许线程等待某事的发生。

  • 初始时,标志为假
  • 标志未假时,这个线程会被一直阻塞直至标志为真
from threading import Thread, Event
import timedef countdown(n, started_evt):print('countdown 开始...')started_evt.set()           # 设置标志为真while n > 0:print('T-minus', n)n -= 1time.sleep(1)started_evt = Event()       # 创建 Event 对象print('启动 countdown 函数')
t = Thread(target=countdown, args=(5, started_evt))
t.start()# 等待线程开始
started_evt.wait()
print('countdown is running...')

可以看到 countdown is running... 一直在 countdown 开始... 输出之后才打印,这是因为 event 在协调线程。使得主线程要等
countdown() 函数输出启动信息后,才继续执行。

启动 countdown 函数
countdown 开始...
T-minus 5
countdown is running...
T-minus 4
T-minus 3
T-minus 2
T-minus 1

如果你将 started_evt.set() 注销掉,再运行程序,会发现程序一直被阻塞…,这是因为标志为假,线程被阻塞。

三、协程

Python 协程只能运行在时间循环中,但是一旦事件循环运行,又会阻塞当前任务。因此 动态添加任务/协程

需要再开一个线程,这个线程主要任务时运行事件循环,因为是无限循环,会阻塞当前线程:

import asyncio
from threading import Threadasync def production_task():i = 0while True:# 将 consumption 这个协程每秒注册一个运行到线程中的循环,thread_loop 每秒会获取一个一直打印 i 的无限循环任务# run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用asyncio.run_coroutine_threadsafe(consumption(i), thread_loop)await asyncio.sleep(1)  # 必须加 awaiti += 1async def consumption(i):while True:print('我是第{}任务'.format(i))await asyncio.sleep(1)def start_loop(loop):# 运行事件循环,loop 以参数的形式传递进来运行asyncio.set_event_loop(loop)loop.run_forever()thread_loop = asyncio.new_event_loop()  # 获取一个事件循环
run_loop_thread = Thread(target=start_loop, args=(thread_loop,))  # 将每次事件循环运行在一个线程中,防止阻塞当前主线程
run_loop_thread.start()  # 运行线程,同时协程事件循环也会运行advocate_loop = asyncio.get_event_loop()  # 将生产任务的协程注册到这个循环中
advocate_loop.run_until_complete(production_task())  # 运行次循环
我是第0任务
我是第1任务
我是第0任务
我是第1任务
我是第0任务
我是第2任务
我是第3任务
我是第1任务
我是第0任务

四、五种 unix IO 模型

epoll 并不代表一定比 select 好(效率高)

  • 在并发高的情况下,连接活跃度不是很高的情况下,epoll 比 select 好(比如:Web 连接,用户连接可能随时断开)
  • 在并发性不高,同时连接活跃,select 比 epoll 好(比如:游戏连接,一般要保持持续连接)

select、poll、epoll 本质还是同步阻塞的,之所以能支持高并发,是因为一个进程能同时监听多个文件描述符

非阻塞 IO 不一定比阻塞好,因为它要一直循环检测服务器是否有数据返回,如果后续的程序不依赖前面的连接,其效率要高,要是依赖前面的程序,效率不一定要好。

http://www.lryc.cn/news/22610.html

相关文章:

  • Xml 注解
  • 【CSS文字滚动】CSS实现文字横向循环无缝滚动,鼠标移入暂停移出继续(附实测源码)
  • 不使用implements关键字实现实现类(类似于mapper)
  • antd4里table的滚动是如何实现的?
  • 抓取namenode 50070 jmx的指标信息
  • aspnetcore-browser-refresh.js和Visual Studio Browser Link
  • hadoop 集群常用命令(学习笔记) —— 筑梦之路
  • ARC142D Deterministic Placing
  • 阶段八:服务框架高级(第二章:分布式事务)
  • RPC异步化原理
  • C# 多窗口切换的实现
  • 【深度学习】RNN
  • 招聘岗位,机会难得
  • web打印的几种方法(2023)
  • 代码随想录算法训练营day44 | 动态规划之完全背包 518. 零钱兑换 II 377. 组合总和 Ⅳ
  • IntelliJ IDEA 实用插件推荐(包含使用教程)
  • WideDeep模型
  • nacos集群模式+keepalived搭建高可用服务
  • 吉利「银河」负重突围
  • QT之图形视图框架概述——Graphics View Framework
  • 【SQL开发实战技巧】系列(二十二):数仓报表场景(上) 从分析函数效率一定快吗聊一聊结果集分页和隔行抽样实现方式
  • 小米无线AR眼镜探索版细节汇总
  • Web3中文|Litra:简洁而优美的NFT流动性协议,能给NFT市场带来什么?
  • SSL证书对虚拟主机的用处有哪些?
  • SpringCloud之MQ笔记分享
  • 动态规划背包问题
  • OpenCV4.x图像处理实例-张嘴和闭嘴检测
  • 软考高级系统分析师系列论文之十二:论实时控制系统与企业信息系统集成在工业控制的常规应用
  • 蓝桥杯入门即劝退(二十三)货物摆放问题
  • 经验之谈——指标异常了怎么办?