当前位置: 首页 > news >正文

交通 | python网络爬虫:“多线程并行 + 多线程异步协程

推文作者:Amiee

编者按:

常规爬虫都是爬完一个网页接着爬下一个网页,不适应数据量大的网页,本文介绍了多线程处理同时爬取多个网页的内容,提升爬虫效率。

1.引言​


一般而言,常规爬虫都是爬完一个网页接着爬下一个网页。如果当爬取的数据量非常庞大时,爬虫程序的时间开销往往很大,这个时候可以通过多线程或者多进程处理即可完成多个网页内容同时爬取的效果,数据获取速度大大提升。​


2.基础知识​


简单来说,CPU是进程的父级单位,一个CPU可以控制多个进程;进程是线程的父级单位,一个进程可以控制多个线程,那么到底什么是进程,什么是线程呢?​


对于操作系统来说,一个任务就是一个进程(Process),比如打开一个浏览器就是启动一个浏览器进程;打开一个QQ就启动一个QQ进程;打开一个Word就启动了一个Word进程,打开两个Word就启动两个Word进程。​


那什么叫作线程呢?在一个进程内部往往不止同时干一件事,比如浏览器,它可以同时浏览网页、听音乐、看视频、下载文件等。在一个进程内部这同时运行的多个“子任务”,便称之称为线程(Thread),线程是程序工作的最小单元。​


此外有个注意点,对于单个CPU而言,某一个时点只能执行一个任务,那么如果这样是怎么在现实中同时执行多个任务(进程)的呢?比如一边用浏览器听歌,一边用QQ和好友聊天是如何实现的呢?​


答案是操作系统会通过调度算法,轮流让各个任务(进程)交替执行。以时间片轮转算法为例:有5个正在运行的程序(即5个进程) : QQ、微信、谷歌浏览器、网易云音乐、腾讯会议,操作系统会让CPU轮流来调度运行这些进程,一个进程每次运行0.1ms,因为CPU执行的速度非常快,这样看起来就像多个进程同时在运行。同理,对于多个线程,例如通过谷歌浏览器(进程)可以同时访问网页(线程1)、听在线音乐(线程2)和下载网络文件(线程3)等操作,也是通过类似的时间片轮转算法使得各个子任务(线程)近似同时执行。​


2.1 Thread()版本案例

from threading import Thread
def func():for i in range(10):print('func', i)
if name == '__main__':t = Thread(target=func) # 创建线程t.start() # 多线程状态,可以开始工作了,具体时间有CPU决定for i in range(10):print('main', i)
执行结果如下:func 0func 1func 2func 3func 4main 0main 1main 2main 3main 4mainfunc 5 5func main 66func 7mainfunc 8func 97main 8main 9

2.2 MyTread() 版本案例​


大佬是这个写法

from threading import Thread​
class MyThread(Thread):​def run(self):​for i in range(10):​print('MyThread', i)​
if name == '__main__':​t = MyThread()​# t.run() # 调用run就是单线程​t.start() # 开启线程​for i in range(10):​print('main', i)​
执行结果:​
MyThread 0​
MyThread 1​
MyThread 2​
MyThread 3​
MyThread 4​
MyThread 5main 0​
main 1​
main 2​
main 3​
main 4​
MyThread ​
main 5​
6​
main MyThread 67​
mainMyThread  78​
mainMyThread  89​
main 9

2.3 带参数的多线程版本

from threading import Thread
def func(name):for i in range(10):print(name, i)
if name == '__main__':t1 = Thread(target=func, args=('子线程1',)) # 创建线程t1.start() # 多线程状态,可以开始工作了,具体时间又CPU决定t2 = Thread(target=func, args=('子线程2',))  # 创建线程t2.start()  # 多线程状态,可以开始工作了,具体时间又CPU决定for i in range(10):print('main', i)

2.4 多进程

一般不建议使用,因为开进程比较费资源

from multiprocessing import Process
def func():for i in range(1000000):print('func', i)
if name == '__main__':p = Process(target=func)p.start() # 开启线程for i in range(100000):print('mainn process', i)

3. 线程池和进程池

线程池:一次性开辟一些线程,我们用户直接给线程池提交任务,线程任务的调度由线程池来完成

from concurrent.futures import ThreadPoolExecutor
def func(name):for i in range(10):print(name, i)
if name == '__main__':# 创建线程池with ThreadPoolExecutor(50) as t:for i in range(100):t.submit(func, name=f'Thread{i}=')# 等待线程池中的人物全部执行完成,才继续执行;也称守护进程
print('执行守护线程')

进程池

from concurrent.futures import ProcessPoolExecutor
def func(name):for i in range(10):print(name, i)
if name == '__main__':# 创建线程池with ProcessPoolExecutor(50) as t:for i in range(100):t.submit(func, name=f'Thread{i}=')# 等待线程池中的人物全部执行完成,才继续执行;也称守护进程print('执行守护进程')

4. 爬虫实战-爬取新发地菜价

单个线程怎么办;上线程池,多个页面同时爬取

import requests
from lxml import etree
import csv
from concurrent.futures import ThreadPoolExecutorf = open('xifadi.csv', mode='w', newline='')
csv_writer = csv.writer(f)
def download_one_page(url):resp = requests.get(url)resp.encoding = 'utf-8'html = etree.HTML(resp.text)table = html.xpath(r'/html/body/div[2]/div[4]/div[1]/table')[0]# trs = table.xpath(r'./tr')[1:] # 跳过表头trs = table.xpath(r'./tr[position()>1]')for tr in trs:td = tr.xpath('./td/text()')# 处理数据中的 \\ 或 /txt = (item.replace('\\','').replace('/','') for item in td)csv_writer.writerow(txt)resp.close()print(url, '提取完毕')if name == '__main__':with ThreadPoolExecutor(50) as t:for i in range(1, 200):t.submit(download_one_page,f'http://www.xinfadi.com.cn/marketanalysis/0/list/{i}.shtml')print('全部下载完毕')

5. python网络爬虫:多线程异步协程与实验案例​


5.1 基础理论​


程序处于阻塞状态的情形包含以下几个方面:​
•input():等待用户输入​
•requests.get():网络请求返回数据之前​
•当程序处理IO操作时,线程都处于阻塞状态​
•time.sleep():处于阻塞状态​

协程的逻辑是当程序遇见IO操作时,可以选择性的切换到其他任务上;协程在微观上任务的切换,切换条件一般就是IO操作;在宏观上,我们看到的是多个任务都是一起执行的;上方的一切都是在在单线程的条件下,充分的利用单线程的资源。​

要点梳理:​
•函数被asyn修饰,函数被调用时,它不会被立即执行;该函数被调用后会返回一个协程对象。​
•创建一个协程对象:构建一个asyn修饰的函数,然后调用该函数返回的就是一个协程对象​
•任务对象是一个高级的协程对象,

import  asyncio​
import time​
async def func1():​print('你好呀11')​
if name == '__main__':​g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象​asyncio.run(g1) # 协程城西执行需要asyncio模块的支持

5.2 同步/异步睡眠​

普通的time.sleep()是同步操作,会导致异步操作中断

import  asyncio
import time
async def func1():print('你好呀11')time.sleep(3) # 当程序中除了同步操作时,异步就中端了print('你好呀12')
async def func2():print('你好呀21')time.sleep(2)print('你好呀22')
async def func3():print('你好呀31')time.sleep(4)print('你好呀32')
if name == '__main__':g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象g2 = func2()g3 = func3()tasks = [g1, g2, g3]t1 = time.time()asyncio.run(asyncio.wait(tasks)) # 协程城西执行需要asyncio模块的支持t2 = time.time()
print(t2 - t1)你好呀21你好呀22你好呀11你好呀12你好呀31你好呀329.003259658813477

使用异步睡眠函数,遇到睡眠时,挂起;

import  asyncio
import time
async def func1():print('你好呀11')await asyncio.sleep(3) # 异步模块的sleepprint('你好呀12')
async def func2():print('你好呀21'))await asyncio.sleep(4)  # 异步模块的sleepprint('你好呀22')
async def func3():print('你好呀31')await asyncio.sleep(4)  # 异步模块的sleepprint('你好呀32')
if name == '__main__':g1 = func1() # 此时的函数是异步协程函数,此时函数执行得到的是一个协程对象g2 = func2()g3 = func3()tasks = [g1, g2, g3]t1 = time.time()asyncio.run(asyncio.wait(tasks)) # 协程城西执行需要asyncio模块的支持t2 = time.time()
print(t2 - t1)你好呀21你好呀11你好呀31你好呀12你好呀22你好呀324.0028839111328125

整体耗时为最长时间 + 切换时间

5.3 官方推荐多线程异步协程写法

import  asyncio
import time
async def func1():print('你好呀11')# time.sleep(3) # 当程序中除了同步操作时,异步就中端了await asyncio.sleep(3) # 异步模块的sleepprint('你好呀12')
async def func2():print('你好呀21')# time.sleep(2)await asyncio.sleep(4)  # 异步模块的sleepprint('你好呀22')
async def func3():print('你好呀31')# time.sleep(4)await asyncio.sleep(4)  # 异步模块的sleepprint('你好呀32')
async def main():# 写法1:不推荐# f1 = func1()# await f1 # await挂起操作,一般放在协程对象前边# 写法2:推荐,但是在3.8废止,3.11会被移除# tasks = [func1(), func2(), func3()]# await asyncio.wait(tasks)# 写法3:python3.8以后使用tasks = [asyncio.create_task(func1()), asyncio.create_task(func2()),asyncio.create_task(func3())]await asyncio.wait(tasks))if name == '__main__':t1 = time.time()asyncio.run(main())t2 = time.time()
print(t2 - t1)你好呀21你好呀31你好呀11你好呀12你好呀32你好呀224.001523017883301

6. 异步协程爬虫实战

安装包:

pip install aiohttp

pip install aiofiles

基本框架:​
•获取所有的url​
•编写每个url的爬取函数​
•每个url建立一个线程任务,爬取数据

6.1 爬取图片实战代码

import asyncio
import aiohttp
import aiofiles
headers = {'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Mobile Safari/537.36 Edg/91.0.864.41'}
urls = {r'http://kr.shanghai-jiuxin.com/file/mm/20210503/xy2edb1kuds.jpg',r'http://kr.shanghai-jiuxin.com/file/mm/20210503/g4ok0hh2utm.jpg',r'http://kr.shanghai-jiuxin.com/file/mm/20210503/sqla2defug0.jpg',r'http://d.zdqx.com/aaneiyi_20190927/001.jpg'}
async def aio_download(url):async with aiohttp.ClientSession() as session:async with session.get(url, headers=headers) as resp:async with aiofiles.open('img/' + url.split('/')[-1],mode='wb') as f:await f.write(await resp.content.read())await f.close() # 异步代码需要关闭文件,否则会输出0字节的空文件# with open(url.split('/')[-1],mode='wb') as f: # 使用with代码不用关闭文件#      f.write(await resp.content.read()) # 等价于resp.content, resp.json(), resp.text()
async def main():tasks = []for url in urls:tasks.append(asyncio.create_task(aio_download(url)))await asyncio.wait(tasks)if name == '__main__':# asyncio.run(main()) # 可能会报错 Event loop is closed 使用下面的代码可以避免asyncio.get_event_loop().run_until_complete(main())
print('over')

知识点总结:​
•在python 3.8以后,建议使用asyncio.create_task()创建人物​
•aiofiles写文件,需要关闭文件,否则会生成0字节空文件​
•aiohttp中生成图片、视频等文件时,使用resp.content.read(),而requests库时,并不需要read()​
•报错 Event loop is closed时, 将 asyncio.run(main()) 更改为 asyncio.get_event_loop().run_until_complete(main())​
•使用with打开文件时,不用手动关闭

6.2 爬取百度小说

注意:以下代码可能会因为 百度阅读 页面改版而无法使用

主题思想:

  • 分析那些请求需要异步,那些不需要异步;在这个案例中,获取目录只需要请求一次,所以不需要异步

  • 下载每个章节的内容,则需要使用异步操作

import requests
import asyncio
import aiohttp
import json
import aiofiles

https://dushu.baidu.com/pc/detail?gid=4306063500

http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"4306063500"} 获取章节的名称,cid;只请求1次,不需要异步

http://dushu.baidu.com/api/pc/getChapterContent # 涉及多个任务分发,需要异步请求,拿到所有的文章内容

headers = {'User-Agent': 'Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.77 Mobile Safari/537.36 Edg/91.0.864.41'}
async def aio_download(cid, book_id,title):data = {'book_id': book_id,'cid' : f'{book_id}|{cid}','need_bookinfo': 1}data = json.dump(data)url = f'http://dushu.baidu.com/api/pc/getChapterContent?data={data}'async with aiohttp.ClientSession as session:async with session.get(url) as resp:dic = await resp.json()async with aiofiles.open('img/'+title+'.txt',mode='w') as f:await f.write(dic['data']['novel']['content'])await f.close()
async def getCatalog(url):resp = requests.get(url, headers=headers)dic = resp.json()tasks = []for item in dic['data']['novel']['items']:title = item['title']cid = item['cid']tasks.append(asyncio.create_task(aio_download((cid, book_id,title))))await asyncio.wait(tasks)if name == '__main__':book_id = '4306063500'url = r'http://dushu.baidu.com/api/pc/getCatalog?data={"book_id":"'+book_id+'"}'asyncio.run(getCatalog(url))

http://www.lryc.cn/news/191700.html

相关文章:

  • LeetCode:1488. 避免洪水泛滥(2023.10.13 C++)
  • SpringBoot 时 jar 报错 没有主清单属性
  • C/S架构学习之多进程实现TCP并发服务器
  • VSCode 快速移动光标至行尾
  • ACP.复盘方法
  • Springboot 订餐管理系统idea开发mysql数据库web结构java编程计算机网页源码maven项目
  • 判断当前Activity是否有DialogFragment显示
  • 开发一个npm组件包(2)
  • 迅为RK3568开发板Scharr滤波器算子边缘检测
  • HJ86 求最大连续bit数
  • Grafana 10 新特性解读:体验与协作全面提升
  • Django实现音乐网站 ⒆
  • 20基于MATLAB的车牌识别算法,在环境较差的情景下,夜间识别度很差的车牌号码可以精确识别出具体结果,程序已调通,可直接替换自己的数据跑。
  • vue音频制作
  • 好莱坞编剧大罢工终于结束;与OpenAI创始人共进早餐;使用DALL-E 3制作绘本分享;生成式AI的基础设施架构 | ShowMeAI日报
  • buuctf week2-web-ez_sql
  • 实验2.1.2 交换机的常用配置
  • 功率放大器应用场景分析报告
  • 解决 Centos 安装 Python 3.10 的报错: Could not import runpy module
  • HTML5简介-HTML5 新增语义化标签-HTML5 新增多媒体标签
  • pyqt---子线程进行gui操作导致界面崩溃
  • vue-cli 输出的模板 html 文件使用条件语句
  • Spring Boot集成kafka的相关配置
  • Git(11)——Git相关问题解答以及常用命令总结
  • 【LeetCode高频SQL50题-基础版】打卡第7天:第36~40题
  • C++入门1
  • Matlab论文插图绘制模板第118期—进阶气泡图
  • grafana接入OpenTSDB设置大盘语法
  • HarmonyOS 远端状态订阅开发实例
  • 实战一:Http轮询弹幕拦截