当前位置: 首页 > news >正文

aws boto3 下载文件

起因:有下载 aws s3 需求,但只有web 登录账号,有 id 用户名 密码,没有 boto3 的 key ID

经过分析,发现网页版有个地址会返回临时 keyID,playwright 模拟登录,用 page.on 监测返回数据,获取 keyID 后再使用 boto3 抓取相关文件,比构造网页请求方便快捷

import os, json, urllib, base64
import time, re
from datetime import datetime
from playwright.sync_api import Playwright, sync_playwright, expect
from bs4 import BeautifulSoup
from functools import wrapsproxy = 'http://username:password@192.192.14.32:3128'
proxies = {'http': proxy,'https': proxy
}# 缓存目录
CACHE_DIR = (r'D:\code\aws_s3\cache')# 确保缓存目录存在
os.makedirs(CACHE_DIR, exist_ok=True)def timethis(func):'''Decorator that reports the execution time:param func::return:'''@wraps(func)def wrapper(*args, **kwargs):start = time.time()s1 = datetime.now()result = func(*args, **kwargs)end = time.time()s2 = datetime.now()func_name = func.__name__consume = end - startconsume2 = s2 - s1print(f'{func_name} consume time is ---> {consume}')print(f'{func_name} consume minutes is ---> {consume2}')return resultreturn wrapperdef handle_route(route):# 获取请求的 URLurl = route.request.urlresource_type = route.request.resource_typeurl = route.request.urlresource_type = route.request.resource_typeblock_list = [# 'telemetry', "browserCreds", 'module-utils.js',#             'svg', 'gif', 'image',#           'module', 'panoramaroute', 'log', 'tele', 'index', 'util', 'css']if any(x in url for x in block_list):# print(f"---: {url} (包含 'dist')")route.abort()  # 中止该请求return# print(f"处理请求: {url} ({resource_type})")# 生成对应的缓存文件名# 使用安全的 URL 名称file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"cache_file = os.path.join(CACHE_DIR, file_name)# 检查缓存文件是否存在if os.path.exists(cache_file):# print(f"从缓存加载: {url}")# 从缓存文件加载数据try:with open(cache_file, 'r') as f:cached_response = json.load(f)# 模拟返回缓存的响应route.fulfill(status=cached_response['status'],headers=cached_response['headers'],body=base64.b64decode(cached_response['body'])  # 解码 body)except:passelse:# 继续请求并缓存响应route.continue_()def log_response(response):url = response.urlresource_type = response.request.resource_type# 仅缓存 CSS、JS 和图片文件if resource_type in ['script', 'stylesheet', 'image']:file_name = url.replace("https://", "").replace("http://", "").replace("/", "_").replace(":", "_") + ".json"cache_file = os.path.join(CACHE_DIR, file_name)# 只有在成功状态时才缓存响应if response.status == 200:try:response_body = {'status': response.status,'headers': dict(response.headers),'body': base64.b64encode(response.body()).decode('utf-8')  # 确保调用 body() 方法获取字节}# 将响应写入缓存文件with open(cache_file, 'w') as f:json.dump(response_body, f)# print(f"缓存资源: {url}")except Exception as e:# print('cache error', url)pass
requests_info = {}def log_request(request):# 记录请求的开始时间requests_info[request.url] = {'start_time': time.time()  # 记录当前时间(开始时间)}def on_response(response, response_data):# 检查响应的 URLif 's3/tb/creds' in response.url and response.status == 200:# 解析响应数据并存储到 response_data 中boto3 = response.json()print('boto3', boto3)response_data.append(response.json())# 使用已保存的状态文件跳过登录状态直接访问系统
@timethis
def get_boto3_token():with sync_playwright() as playwright:browser = playwright.chromium.launch(headless=True,proxy={# 'server': 'http://username:password@192.192.13.193:3128','server': 'http://username:password@192.192.14.32:3128',# 'server': 'http://username:password@10.67.9.200:3128',# 'server': 'http://192.192.163.177:5003',"username": "username","password": "password"})# 创建浏览器上下文时加载状态文件context = browser.new_context()page = context.new_page()should_abort = False# 定义一个列表来存储响应数据response_data = []def handle_route(route):nonlocal should_abort# 检查当前页面是否包含 "open"if should_abort or response_data:print("检测到 'open',停止加载其他内容。")route.abort()  # 中止该请求else:route.continue_()  # 继续请求# 注册请求拦截事件# page.on("route", handle_route)# 直接访问登录后的URLurl = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs?prefix=RESPONSE/'# 注册请求和响应事件page.on("response", log_response)# page.on("route", handle_route)page.route("*", handle_route)page.goto(url, timeout=30000 * 3)# 屏蔽这一段就正常了# if page.locator("input[id=\"root_user_radio_button\"]"):#     print('find')#     page.locator("input[id=\"iam_user_radio_button\"]").click()#     page.locator("input[id=\"resolving_input\"]").fill("1111111")#     page.locator("button[id=\"next_button\"]").click()if page.locator("input[id=\"account\"]"):print('find')page.locator("input[id=\"account\"]").click()page.locator("input[id=\"account\"]").fill("1111111")# page.locator("button[id=\"next_button\"]").click()print('input username')while True:try:page.locator("input[name=\"username\"]").fill("username")page.locator("input[name=\"password\"]").fill("password")page.locator("#signin_button").click()print('break-->')breakexcept:print(datetime.now(), 'error-->')time.sleep(2)print('wait 6 senconds')time.sleep(2)cookies = page.context.cookies()print('cookie', cookies)url = 'https://us-west-2.console.aws.amazon.com/s3/buckets/bs-tai?region=us-west-2&bucketType=general&prefix=RESPONSE/2023/&showversions=false'# 注册请求和响应事件# 注册响应事件处理函数page.on("response", lambda response: on_response(response, response_data))page.goto(url, timeout=30000 * 3)print('page on response')while True:try:cookies = page.context.cookies()breakexcept:time.sleep(2)print('sleep 2 seconds')soup = BeautifulSoup(page.content(), 'lxml')meta_tag = soup.find('meta', {'name': 'tb-data'})# 提取 content 属性的值tb_data = meta_tag.get('content')# 将 JSON 字符串转换为 Python 字典tb_data_dict = json.loads(tb_data)# 提取 CSRF 令牌xsrf_token = tb_data_dict['csrfToken']print('xsrf token', xsrf_token)print('response_data',response_data)# if not response_data:#     get_boto3_token()# else:#     print('return boto3 token')# page.close()# browser.close()# playwright.stop()return response_data[0]if __name__ == '__main__':get_boto3_token()pass
boto3_token = get_boto3_token()info = boto3_tokenprint(arrow.now())print('boto3_token-->', type(boto3_token), boto3_token)id = info.get("accessKeyId")key = info.get("secretAccessKey")aws_session_token = info.get("sessionToken")session = Session(aws_access_key_id=id, aws_secret_access_key=key, aws_session_token=aws_session_token)# session = Session(aws_access_key_id=id, aws_secret_access_key=key,aws_session_token=aws_session_token)# 获取s3连接的session##bucket = 'bs-tai'client_s3 = session.client('s3', config=Config(proxies=proxies))s3 = session.resource('s3', config=Config(proxies=proxies)).Bucket('bs-tai')def get_prefix_for_months(months_shift=0):arrow_month = arrow.now().shift(months=months_shift)year = arrow_month.format('YYYY')month = arrow_month.format('MM')return f'conn/RESPONSE/{year}/{month}/'# 获取上一个月和当前月的前缀prefix_last_month = get_prefix_for_months(months_shift=-1)prefix_this_month = get_prefix_for_months(months_shift=0)# 组合前缀到列表prefix_list = [prefix_last_month, prefix_this_month]for prefix in prefix_list:for obj in s3.objects.filter(Prefix=prefix):# print(obj.key)if obj.key.endswith('.csv'):file_path = obj.key# 使用字符串分割来提取年月日parts = file_path.split('/')year = parts[2]  # 第四部分是年份month = parts[3]  # 第五部分是月份day = parts[4]  # 第六部分是日期# print(year, month, day)key = obj.keylocal_filename = key.split('/')[-1]local_file_path = os.path.join(public_share_path, f'{year}{month}{day}', local_filename)if not os.path.exists(local_file_path):local_file_dir = os.path.dirname(local_file_path)os.makedirs(local_file_dir, exist_ok=True)client_s3.download_file(bucket, key, local_file_path)print(f'Downloaded {local_file_path}')read_csv(local_file_path, day=f'{year}{month}{day}')export_result_source(day=f'{year}{month}{day}')

参考
https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html

https://cuiqingcai.com/36045.html

https://www.cnblogs.com/neozheng/p/13563841.html

https://stackoverflow.com/questions/35803027/retrieving-subfolders-names-in-s3-bucket-from-b-boto3

https://stackoverflow.com/questions/35803027/retrieving-subfolders-names-in-s3-bucket-from-b-boto3

https://stackoverflow.com/questions/29378763/how-to-save-s3-object-to-a-file-using-boto3

http://www.lryc.cn/news/475491.html

相关文章:

  • 3DDFA-V3——基于人脸分割几何信息指导下的三维人脸重建
  • 求串长(不使用任何字符串库函数)
  • 第02章 MySQL环境搭建
  • linux系统编程 man查看manual.stat
  • 从网络到缓存:在Android中高效管理图片加载
  • 【数据结构】链表详解:数据节点的链接原理
  • 使用AWS Redshift从AWS MSK中读取数据
  • 从0开始学统计-数据类别与测量层次
  • 使用AIM对SAP PO核心指标的自动化巡检监控
  • C++——unordered_map和unordered_set的封装
  • 微信小程序scroll-view吸顶css样式化表格的表头及iOS上下滑动表头的颜色覆盖、z-index应用及性能分析
  • 【高中数学】数列
  • 数字媒体技术基础:AMF(ACES 元数据文件 )
  • Apache Dubbo (RPC框架)
  • LeetCode 3226. 使两个整数相等的位更改次数
  • 面试经典 150 题:189、383
  • Python模拟真人动态生成鼠标滑动路径
  • 如何压缩pdf文件的大小?5分钟压缩pdf的方法推荐
  • 【SQL】[2BP01] ERROR: cannot drop table course because other objects depend on it
  • gbase8s之spring框架用druid中间件报语法错误
  • 【网络安全】|nessus使用
  • CSRA2的LINUX操作系统24年11月2日上午上课笔记
  • 通过分解质因数求若干个数的最小公倍数
  • 数据库三范式(1NF、2NF、3NF)
  • C语言_数据结构_顺序表
  • Llama 3.2 Vision Molmo:多模态开源生态系统基础
  • 【数据结构与算法】第6课—数据结构之栈
  • 开源全站第一个Nextron(NextJS+electron)项目--NextTalk:一款集成chatgpt的实时聊天工具
  • 多样化的编程模型:并发与并行策略
  • npm入门教程2:npm历史