当前位置: 首页 > news >正文

python异步切片下载文件(内置redis获取任务 mongo更新任务状态等)

异步切片下载二进制文件并上传桶删除本地文件

import json
import os
import asyncio
from urllib import parseimport aiohttp
import aioredis
from motor.motor_asyncio import AsyncIOMotorClient
from retrying import retry
from minio import Minio
from minio.error import S3Error
import loguruclass Async_download:def __init__(self, sema_number=20, redis_address='redis://ip:6379/db', redis_password='passwd'):self.__sema_number = sema_numberself.__sema = Noneself.redis_address = redis_addressself.redis_password = redis_passwordself.minio_config = {"endpoint": "ip:port","access_key": "******","secret_key": "******","secure": False}self.mongo_config = {"uri": "mongodb://{0}:{1}@ip:27017".format(parse.quote_plus("user"),parse.quote_plus("passwd")),"db": "******","collection": "******",}@classmethoddef _mkdir(cls, file_path, full_path):if os.path.exists(file_path):return Truetry:os.makedirs(full_path)except Exception as e:passreturn Falsedef __init_check(self, file_path: str):full_path, file_name = file_path.rsplit('/', 1)file_size = os.path.getsize(file_path) if self._mkdir(file_path, full_path) else 0return file_name, file_size@classmethoddef __sync_save_local(cls, r_headers, results, file_path):done, padding = resultsfor d in done:for index, value in d.result().items():r_headers[index] = valuewith open(file_path, 'ab') as f:for _, value in r_headers.items():f.write(value)return True@classmethoddef __generate_headers(cls, headers, file_size, first_byte):r_headers = {}index = 0if first_byte > 51200000:byte = 2048000  # 2M 为一片else:byte = 1024000  # 1M 为一片while True:file_size_two = file_size + byteif file_size_two >= first_byte:r_headers[index] = {"Range": f"bytes={file_size}-{first_byte}"}breakr_headers[index] = {"Range": f"bytes={file_size}-{file_size_two - 1}"}index += 1file_size = file_size_twofor key in r_headers:r_headers[key].update(headers)return r_headers@retry(stop_max_attempt_number=3)async def __download_one(self, session, method, url, r_headers, **kwargs):index, headers = r_headersasync with self.__sema:async with session.request(method, url, headers=headers, **kwargs) as response:binary = await response.content.read()return {index: binary}async def __async_section_download(self, session, method, url, r_headers, **kwargs):tasks = [asyncio.create_task(self.__download_one(session, method, url, (key, r_headers[key]), **kwargs)) for key inr_headers]return await asyncio.wait(tasks)@classmethodasync def __get_content_length(cls, session, method, url, headers, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:return response.headers.get('Content-Length') or response.headers.get('content-length') or 0@classmethodasync def __sync_download(cls, session, method, url, headers, file_path, **kwargs):async with session.request(method, url, headers=headers, **kwargs) as response:with open(file_path, 'wb') as f:binary = await response.content.read()f.write(binary)async def __async_download_main(self, method, url, headers, file_path, **kwargs):file_name, file_size = self.__init_check(file_path)self.__sema = asyncio.Semaphore(self.__sema_number)async with aiohttp.ClientSession() as session:content_length = await self.__get_content_length(session, method, url, headers, **kwargs)if content_length and content_length.isdigit():content_length = int(content_length)if file_size >= content_length:await self.__upload_to_minio(file_path, file_name)  # Upload to MinIOawait self.__update_mongo_status(file_name, True)  # Update MongoDB statusos.remove(file_path)  # Delete local filereturn True, file_pathr_headers = self.__generate_headers(headers, file_size, content_length)results = await self.__async_section_download(session, method, url, r_headers, **kwargs)self.__sync_save_local(r_headers, results, file_path)else:await self.__sync_download(session, method, url, headers, file_path, **kwargs)if os.path.getsize(file_path) >= int(content_length):await self.__upload_to_minio(file_path, file_name)  # Upload to MinIOawait self.__update_mongo_status(file_name, True)  # Update MongoDB statusos.remove(file_path)  # Delete local filereturn True, file_pathreturn False, file_pathasync def __get_task_from_redis(self):async with aioredis.from_url(self.redis_address, password=self.redis_password) as redis:task = await redis.lpop('file_file_all')return taskasync def __process_redis_tasks(self):while True:task_info = await self.__get_task_from_redis()if task_info is None:breaktask = json.loads(task_info)try:method = 'get'url = task["file_link"]file_path = './{}'.format(task["file_name"])headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.93 Safari/537.36'}if url.startswith('http') or url.startswith('https'):try:ddd = url.split('http')if len(ddd) != 0:url = 'http' + ddd[-1]except:continue# Perform download for the current taskawait self.__async_download_main(method, url, headers, file_path)except Exception as e:loguru.logger.error("Error processing Redis task:", e)async def __upload_to_minio(self, file_path, object_name):"""上传minio"""try:minioClient = Minio(**self.minio_config)check_bucket = minioClient.bucket_exists("******")if not check_bucket:minioClient.make_bucket("******")loguru.logger.info("start upload file to MinIO")minioClient.fput_object(bucket_name="******", object_name=object_name, file_path=file_path)loguru.logger.info("file {0} is successfully uploaded to MinIO".format(object_name))except FileNotFoundError as err:loguru.logger.info('*' * 10)loguru.logger.error('MinIO upload failed: ' + str(err))except S3Error as err:loguru.logger.error("MinIO upload failed:", err)async def __update_mongo_status(self, file_name, status):"""更新mongo采集状态"""try:mongo_uri = self.mongo_config["uri"]db_name = self.mongo_config["db"]collection_name = self.mongo_config["collection"]client = AsyncIOMotorClient(mongo_uri)db = client.get_database(db_name)collection = db.get_collection(collection_name)await collection.update_one({"file_name": file_name}, {"$set": {"status": status}})except Exception as e:loguru.logger.error("MongoDB update failed:", e)async def start(self):await self.__process_redis_tasks()loguru.logger.add("download_file_output.log", rotation="500 MB", level="DEBUG")
if __name__ == '__main__':as_dw = Async_download(20)asyncio.run(as_dw.start())

部分代码来源于y小白的笔记

http://www.lryc.cn/news/283574.html

相关文章:

  • 《吐血整理》进阶系列教程-拿捏Fiddler抓包教程(10)-Fiddler如何设置捕获Firefox浏览器的Https会话
  • 阿里云云原生弹性方案:用弹性解决集群资源利用率难题
  • Spring-BeanPostProcessor PostConstruct init InitializingBean 执行顺序
  • 【算法】递归
  • DC-1靶机刷题记录
  • rust跟我学七:获取外网IP地址
  • 华为:交换机忘记console密码重置
  • 2024年甘肃省职业院校技能大赛信息安全管理与评估 样题三 模块一
  • Go 中 slice 的 In 功能实现探索
  • pyDAL一个python的ORM(终) pyDAL的一些性能优化
  • springboot log4j配置xml实例说明
  • VsCode重新安装需要配机的ESLint和 Prettier - Code formatter 配置
  • 录屏功能怎么打开?简单操作,一学就会!
  • 小程序显示兼容处理,home键处理
  • 【java八股文】之JVM基础篇
  • 2024美赛数学建模思路 - 案例:异常检测
  • 【EI会议征稿通知】2024年通信技术与软件工程国际学术会议 (CTSE 2024)
  • Js面试之作用域与闭包
  • Go 爬虫之 colly 从入门到不放弃指南
  • Ceph分布式存储(1)
  • 制造业工厂为什么要实施MES系统呢?
  • Python 一行命令部署http、ftp服务
  • DBA技术栈(三):MySQL 性能影响因素
  • SpringCloud GateWay 在全局过滤器中注入OpenFeign网关后无法启动
  • web前端项目-贪吃蛇小游戏【附源码】
  • ICCV2023 | PTUnifier+:通过Soft Prompts(软提示)统一医学视觉语言预训练
  • 代码随想录 Leetcode459. 重复的子字符串(KMP算法)
  • Rust之构建命令行程序(三):重构改进模块化和错误处理
  • 广和通AI解决方案“智”赋室外机器人迈向新天地!
  • C++I/O流——(4)格式化输入/输出(第二节)