当前位置: 首页 > news >正文

[量化交易](1获取加密货币的交易数据)

使用rest接口调用历史数据

  • 环境准备的阶段
  • 初始化数据获取器
  • 获取历史数据的时候
  • 🚀 加密货币合约数据获取与保存完整教程
    • 📋 主要流程概览
      • 1️⃣ **环境准备阶段**
      • 2️⃣ **初始化数据获取器**
      • 3️⃣ **核心数据获取流程**
        • **步骤A: 参数验证**
        • **步骤B: API数据请求**
        • **步骤C: 数据处理与转换**
        • **步骤D: 数据保存**
    • 🔧 关键参数说明
      • **必需参数**
      • **可选参数**
    • 📊 数据结构说明
      • **输出的CSV文件包含以下列:**
    • 💡 实际使用示例
      • **示例1: 获取单个合约数据**
      • **示例2: 批量获取多个合约**
      • **示例3: 获取历史特定时间段数据**
    • ⚡ 性能优化建议
    • 📁 文件组织结构
    • 🎯 博客教程建议结构
  • 优化版本的代码

获取历史数据不需要使用api密钥。

环境准备的阶段

# 必需的依赖库
import os
import time
import pandas as pd
from datetime import datetime, timedelta
from binance.client import Client

初始化数据获取器

fetcher = BatchKlineFetcher()
# 可选参数:api_key, api_secret, use_testnet
# 开发测试阶段
client = BinanceAPIClient(api_key='test_key',api_secret='test_secret', use_testnet=True  # 使用测试网络
)# 生产部署阶段
client = BinanceAPIClient(api_key='real_key',api_secret='real_secret',use_testnet=False  # 使用主网络
)
  • Testnet(测试网络)

    • 用途: 开发和测试环境
    • 资金: 使用虚拟资金,不是真实货币
    • 风险: 零风险,可以随意测试
    • 数据: 模拟真实市场数据,但不影响真实账户
  • Mainnet(主网络)

    • 用途: 生产环境,真实交易
    • 资金: 真实的加密货币资金
    • 风险: 真实的资金风险
    • 数据: 真实市场数据和交易
      对于历史数据的研究没有区别。

获取历史数据的时候

相同点

主要的历史K线数据(价格、成交量等)在两个网络上基本一致
数据结构和格式完全相同
获取速度和稳定性差不多

细微差别

数据完整性: 主网的历史数据更完整,覆盖时间更长
数据精度: 主网数据是真实交易数据,测试网可能有微小的模拟差异
API限制: 速率限制可能略有不同

# 对于纯历史数据分析,可以这样设置
client = BinanceAPIClient(api_key='',  # 可以为空api_secret='',  # 可以为空  use_testnet=False  # 建议用主网获取最完整的历史数据
)# 获取历史K线不需要API密钥权限
klines = client.get_historical_klines('BTCUSDT', '1m', start_time, end_time)

如获取某个交易对1年的数据

# 一年前的时间
end_time = datetime.now()
start_time = end_time - timedelta(days=365)klines = client.get_historical_klines(symbol='BTCUSDT', interval='1m',start_time=start_time.strftime('%Y-%m-%d %H:%M:%S'),end_time=end_time.strftime('%Y-%m-%d %H:%M:%S')
)

对应的类文件和函数

class BinanceAPIClient:"""Binance API客户端类负责与Binance API的所有交互"""def __init__(self, api_key='', api_secret='', use_testnet=False):"""初始化API客户端参数:- api_key: Binance API密钥- api_secret: Binance API密钥- use_testnet: 是否使用测试网络"""self.api_key = api_keyself.api_secret = api_secretself.use_testnet = use_testnet# 初始化Binance客户端self.client = Client(api_key, api_secret, testnet=use_testnet)# 配置参数self.config = API_CONFIGdef get_historical_klines(self, symbol, interval, start_time, end_time, limit=1000):"""获取历史K线数据,带有重试机制和时区处理参数:- symbol: 交易对符号 (如 'BTCUSDT')- interval: 时间间隔 (如 '1h', '1d')- start_time: 开始时间 (字符串格式)- end_time: 结束时间 (字符串格式)- limit: 每次请求的最大数量返回:- K线数据列表"""all_klines = []current_start_time = start_timeretry_count = 0while retry_count < self.config["max_retries"]:try:# 添加小延迟避免频率限制time.sleep(self.config["request_delay"])while True:# 请求历史K线数据,每次最多1000条klines = self.client.get_historical_klines(symbol, interval, current_start_time, end_time, limit=limit)if not klines:logger.warning(f"没有获取到K线数据,尝试调整时间范围")breakall_klines.extend(klines)logger.info(f"获取了 {len(klines)} 条K线数据")# 如果返回的数据少于limit,说明已经获取完所有数据if len(klines) < limit:break# 更新开始时间为最后一根K线的收盘时间 + 1mslast_close_time = klines[-1][6]current_start_time = str(int(last_close_time) + 1)# 如果新的开始时间超过结束时间,跳出循环if current_start_time >= end_time:break# 如果到这里没有异常,跳出重试循环breakexcept Exception as e:retry_count += 1logger.error(f"获取K线数据时出错 (尝试 {retry_count}/{self.config['max_retries']}): {e}")if retry_count >= self.config["max_retries"]:logger.error("达到最大重试次数,放弃获取数据")breaktime.sleep(self.config["retry_delay"])  # 等待后重试return all_klines

🚀 加密货币合约数据获取与保存完整教程

梳理完整的数据获取流程。

📋 主要流程概览

1️⃣ 环境准备阶段

# 必需的依赖库
import os
import time
import pandas as pd
from datetime import datetime, timedelta
from binance.client import Client

2️⃣ 初始化数据获取器

fetcher = BatchKlineFetcher()
# 可选参数:api_key, api_secret, use_testnet

3️⃣ 核心数据获取流程

步骤A: 参数验证
  • 验证交易对符号(如 ‘BTCUSDT’)
  • 验证时间间隔(支持:1m, 3m, 5m, 15m, 30m, 1h, 2h, 4h, 1d)
  • 计算时间范围(开始时间 ← 结束时间 - 天数)
步骤B: API数据请求
  • 调用 Binance API 获取历史K线数据
  • 自动分批处理(每次最多1000条记录)
  • 内置重试机制和频率限制保护
步骤C: 数据处理与转换
  • 原始数据转换为 pandas DataFrame
  • 数据类型转换(价格、成交量等转为数值型)
  • 时间戳转换(毫秒 → 标准时间格式)
  • 设置时间戳为索引
步骤D: 数据保存
  • 自动生成文件名:{交易对}_{时间间隔}_{天数}days_{时间戳}.csv
  • 保存到指定目录:batch_kline_data/
  • CSV格式,包含完整的K线数据

🔧 关键参数说明

必需参数

参数类型说明示例
symbolstring交易对符号‘BTCUSDT’, ‘ETHUSDT’
intervalstring时间间隔‘1m’, ‘5m’, ‘1h’, ‘1d’
days_backint向前追溯天数7, 30, 365

可选参数

参数类型默认值说明
end_datestring/NoneNone结束日期,默认当前时间
save_to_fileboolTrue是否保存到文件
api_keystring‘’Binance API密钥(公开数据不需要)
api_secretstring‘’Binance API密钥

📊 数据结构说明

输出的CSV文件包含以下列:

  • timestamp: 时间戳(索引)
  • open: 开盘价
  • high: 最高价
  • low: 最低价
  • close: 收盘价
  • volume: 成交量
  • close_time: 收盘时间
  • quote_asset_volume: 计价资产成交量
  • number_of_trades: 成交笔数
  • taker_buy_base_asset_volume: 主动买入成交量
  • taker_buy_quote_asset_volume: 主动买入计价资产成交量

💡 实际使用示例

示例1: 获取单个合约数据

# 获取BTCUSDT最近7天的1分钟数据
df = fetcher.fetch_kline_data('BTCUSDT', '1m', 7)
print(f"获取了 {len(df)} 条记录")

示例2: 批量获取多个合约

# 批量获取多个交易对的5分钟数据
symbols = ['BTCUSDT', 'ETHUSDT', 'BNBUSDT']
results = fetcher.fetch_multiple_symbols(symbols, '5m', 3)

示例3: 获取历史特定时间段数据

# 获取2024年1月的数据
df = fetcher.fetch_kline_data('BTCUSDT', '1h', 31, end_date='2024-02-01')

⚡ 性能优化建议

  1. 时间间隔选择:分钟级数据建议不超过30天,小时级可获取更长时间
  2. 批量处理:使用 fetch_multiple_symbols() 批量获取多个交易对
  3. 内存管理:大数据集建议分批处理,避免内存溢出
  4. API限制:内置延迟机制,避免触发API频率限制

📁 文件组织结构

项目目录/
├── batch_kline_fetcher.py    # 主要获取器
├── api_client.py             # API客户端
├── config.py                 # 配置文件
└── batch_kline_data/         # 数据保存目录├── BTCUSDT_1m_7days_20250813_211425.csv├── ETHUSDT_5m_3days_20250813_211429.csv└── ...

🎯 博客教程建议结构

  1. 引言:为什么需要获取加密货币数据
  2. 环境搭建:Python环境和依赖安装
  3. 核心概念:K线数据、时间间隔、交易对
  4. 代码实现:逐步讲解每个函数
  5. 实战案例:3-5个不同场景的使用示例
  6. 数据分析:如何使用获取的数据进行分析
  7. 注意事项:API限制、错误处理、性能优化
  8. 扩展应用:结合技术指标、可视化等

优化版本的代码

因为原来的版本,我们使用的时候,是单线程,对于较长的时间段获取,时间会很长。使用多线程,自动切片任务。就可以加快获取数据的速度。


# -*- coding: utf-8 -*-
"""
优化版API客户端模块支持多线程并发获取和智能时间切片,大幅提升长时间段数据获取速度
"""import os
import time
import logging
import threading
import pandas as pd
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from binance.client import Client
from config import API_CONFIG, TIMEFRAMES, KLINE_COLUMNS
from queue import Queue
import mathlogger = logging.getLogger(__name__)class OptimizedBinanceAPIClient:"""优化版Binance API客户端类支持多线程并发获取和智能时间切片,提升长时间段数据获取效率"""def __init__(self, api_key='', api_secret='', use_testnet=False, max_workers=5):"""初始化优化版API客户端参数:- api_key: Binance API密钥- api_secret: Binance API密钥- use_testnet: 是否使用测试网络- max_workers: 最大并发线程数"""self.api_key = api_keyself.api_secret = api_secretself.use_testnet = use_testnetself.max_workers = max_workers# 初始化Binance客户端self.client = Client(api_key, api_secret, testnet=use_testnet)# 配置参数self.config = API_CONFIG.copy()# 优化并发配置self.config["concurrent_request_delay"] = 0.2  # 并发请求间隔更短self.config["chunk_size_hours"] = 24  # 每个时间切片的小时数# 线程锁self._lock = threading.Lock()self._request_times = Queue()def _rate_limit_check(self):"""检查API请求频率限制"""current_time = time.time()# 清理1分钟前的请求记录while not self._request_times.empty():if current_time - self._request_times.queue[0] > 60:self._request_times.get()else:break# 如果1分钟内请求过多,等待if self._request_times.qsize() >= 1000:  # Binance限制sleep_time = 60 - (current_time - self._request_times.queue[0])if sleep_time > 0:logger.warning(f"API频率限制,等待 {sleep_time:.2f} 秒")time.sleep(sleep_time)self._request_times.put(current_time)def _calculate_time_chunks(self, start_time, end_time, interval):"""计算时间切片参数:- start_time: 开始时间戳(毫秒)- end_time: 结束时间戳(毫秒)- interval: 时间间隔返回:- list: 时间切片列表 [(start, end), ...]"""start_ms = int(start_time)end_ms = int(end_time)# 根据时间间隔调整切片大小interval_minutes = {'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,'1h': 60, '2h': 120, '4h': 240, '1d': 1440}minutes_per_interval = interval_minutes.get(interval, 60)# 动态调整切片大小:确保每个切片不超过1000条记录max_records_per_chunk = 1000chunk_minutes = min(self.config["chunk_size_hours"] * 60,max_records_per_chunk * minutes_per_interval)chunk_ms = chunk_minutes * 60 * 1000  # 转换为毫秒chunks = []current_start = start_mswhile current_start < end_ms:current_end = min(current_start + chunk_ms, end_ms)chunks.append((str(current_start), str(current_end)))current_start = current_end + 1  # 避免重复logger.info(f"时间范围切分为 {len(chunks)} 个切片,每切片约 {chunk_minutes/60:.1f} 小时")return chunksdef _fetch_chunk_data(self, symbol, interval, start_time, end_time, chunk_index):"""获取单个时间切片的数据参数:- symbol: 交易对符号- interval: 时间间隔- start_time: 开始时间戳- end_time: 结束时间戳- chunk_index: 切片索引返回:- tuple: (chunk_index, klines_data)"""retry_count = 0while retry_count < self.config["max_retries"]:try:# 频率限制检查with self._lock:self._rate_limit_check()time.sleep(self.config["concurrent_request_delay"])# 获取数据klines = self.client.get_historical_klines(symbol, interval, start_time, end_time, limit=1000)logger.info(f"切片 {chunk_index}: 获取了 {len(klines)} 条记录")return (chunk_index, klines)except Exception as e:retry_count += 1logger.error(f"切片 {chunk_index} 获取失败 (尝试 {retry_count}/{self.config['max_retries']}): {e}")if retry_count >= self.config["max_retries"]:logger.error(f"切片 {chunk_index} 达到最大重试次数")return (chunk_index, [])time.sleep(self.config["retry_delay"])return (chunk_index, [])def get_historical_klines_optimized(self, symbol, interval, start_time, end_time, use_threading=True):"""优化版历史K线数据获取,支持多线程并发参数:- symbol: 交易对符号 (如 'BTCUSDT')- interval: 时间间隔 (如 '1h', '1d')- start_time: 开始时间 (字符串格式)- end_time: 结束时间 (字符串格式)- use_threading: 是否使用多线程返回:- K线数据列表"""logger.info(f"开始优化获取 {symbol} {interval} 数据")start_fetch_time = time.time()# 计算时间切片chunks = self._calculate_time_chunks(start_time, end_time, interval)if not use_threading or len(chunks) == 1:# 单线程模式logger.info("使用单线程模式")all_klines = []for i, (chunk_start, chunk_end) in enumerate(chunks):_, klines = self._fetch_chunk_data(symbol, interval, chunk_start, chunk_end, i)all_klines.extend(klines)else:# 多线程模式logger.info(f"使用多线程模式,{self.max_workers} 个工作线程")all_klines = []chunk_results = {}with ThreadPoolExecutor(max_workers=self.max_workers) as executor:# 提交所有任务future_to_chunk = {executor.submit(self._fetch_chunk_data, symbol, interval, chunk_start, chunk_end, i): ifor i, (chunk_start, chunk_end) in enumerate(chunks)}# 收集结果for future in as_completed(future_to_chunk):chunk_index, klines = future.result()chunk_results[chunk_index] = klineslogger.info(f"完成切片 {chunk_index}/{len(chunks)-1}")# 按顺序合并结果for i in range(len(chunks)):if i in chunk_results:all_klines.extend(chunk_results[i])else:logger.warning(f"切片 {i} 数据缺失")fetch_duration = time.time() - start_fetch_timelogger.info(f"优化获取完成,共 {len(all_klines)} 条记录,耗时 {fetch_duration:.2f} 秒")return all_klinesdef get_historical_klines_with_progress(self, symbol, interval, start_time, end_time, progress_callback=None, save_intermediate=False, output_file=None):"""带进度回调和中间保存的历史K线数据获取参数:- symbol: 交易对符号- interval: 时间间隔- start_time: 开始时间- end_time: 结束时间- progress_callback: 进度回调函数 callback(current, total, chunk_data)- save_intermediate: 是否保存中间结果- output_file: 输出文件路径返回:- K线数据列表"""logger.info(f"开始带进度获取 {symbol} {interval} 数据")chunks = self._calculate_time_chunks(start_time, end_time, interval)all_klines = []# 如果需要保存中间结果,准备文件if save_intermediate and output_file:# 创建临时文件头temp_df = pd.DataFrame(columns=KLINE_COLUMNS)temp_df.to_csv(output_file, index=False)logger.info(f"创建输出文件: {output_file}")for i, (chunk_start, chunk_end) in enumerate(chunks):logger.info(f"处理切片 {i+1}/{len(chunks)}")_, klines = self._fetch_chunk_data(symbol, interval, chunk_start, chunk_end, i)if klines:all_klines.extend(klines)# 保存中间结果if save_intermediate and output_file:chunk_df = pd.DataFrame(klines, columns=KLINE_COLUMNS)chunk_df.to_csv(output_file, mode='a', header=False, index=False)logger.info(f"切片 {i+1} 数据已保存到文件")# 进度回调if progress_callback:progress_callback(i+1, len(chunks), klines)logger.info(f"带进度获取完成,共 {len(all_klines)} 条记录")return all_klinesdef estimate_fetch_time(self, symbol, interval, days_back):"""估算数据获取时间参数:- symbol: 交易对符号- interval: 时间间隔- days_back: 天数返回:- dict: 估算信息"""# 计算大概的记录数interval_minutes = {'1m': 1, '3m': 3, '5m': 5, '15m': 15, '30m': 30,'1h': 60, '2h': 120, '4h': 240, '1d': 1440}minutes_per_interval = interval_minutes.get(interval, 60)total_minutes = days_back * 24 * 60estimated_records = total_minutes // minutes_per_interval# 估算请求次数requests_needed = math.ceil(estimated_records / 1000)# 估算时间(考虑并发)single_thread_time = requests_needed * (self.config["request_delay"] + 0.5)  # 0.5s per requestmulti_thread_time = single_thread_time / min(self.max_workers, requests_needed)return {'estimated_records': estimated_records,'requests_needed': requests_needed,'single_thread_time_seconds': single_thread_time,'multi_thread_time_seconds': multi_thread_time,'speedup_ratio': single_thread_time / multi_thread_time if multi_thread_time > 0 else 1}def get_symbol_data_optimized(self, symbol, timeframe, days=7, use_threading=True, save_to_file=True, output_dir='optimized_data'):"""优化版交易对数据获取参数:- symbol: 交易对符号- timeframe: 时间框架- days: 天数- use_threading: 是否使用多线程- save_to_file: 是否保存到文件- output_dir: 输出目录返回:- DataFrame: 处理后的数据"""# 创建输出目录if save_to_file and not os.path.exists(output_dir):os.makedirs(output_dir)# 计算时间范围end_time = datetime.now()start_time = end_time - timedelta(days=days)start_timestamp = str(int(start_time.timestamp() * 1000))end_timestamp = str(int(end_time.timestamp() * 1000))# 估算获取时间estimate = self.estimate_fetch_time(symbol, timeframe, days)logger.info(f"估算: {estimate['estimated_records']} 条记录, "f"单线程 {estimate['single_thread_time_seconds']:.1f}s, "f"多线程 {estimate['multi_thread_time_seconds']:.1f}s, "f"加速比 {estimate['speedup_ratio']:.1f}x")# 获取数据klines = self.get_historical_klines_optimized(symbol, TIMEFRAMES[timeframe], start_timestamp, end_timestamp, use_threading)if not klines:logger.warning("未获取到任何数据")return pd.DataFrame()# 转换为DataFramedf = pd.DataFrame(klines, columns=KLINE_COLUMNS)# 数据类型转换numeric_columns = ['open', 'high', 'low', 'close', 'volume', 'quote_asset_volume', 'number_of_trades','taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']for col in numeric_columns:df[col] = pd.to_numeric(df[col], errors='coerce')# 时间戳转换df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')df['close_time'] = pd.to_datetime(df['close_time'], unit='ms')# 设置时间戳为索引df.set_index('timestamp', inplace=True)# 保存到文件if save_to_file:timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')filename = f"{symbol}_{timeframe}_{days}days_optimized_{timestamp}.csv"filepath = os.path.join(output_dir, filename)df.to_csv(filepath)logger.info(f"优化数据已保存到: {filepath}")return dfdef compare_performance():"""性能对比测试"""from api_client import BinanceAPIClient# 创建客户端original_client = BinanceAPIClient()optimized_client = OptimizedBinanceAPIClient(max_workers=5)symbol = 'BTCUSDT'timeframe = '1m'days = 7print(f"\n性能对比测试: {symbol} {timeframe} {days}天数据")print("=" * 50)# 估算时间estimate = optimized_client.estimate_fetch_time(symbol, timeframe, days)print(f"估算记录数: {estimate['estimated_records']}")print(f"估算单线程时间: {estimate['single_thread_time_seconds']:.1f}秒")print(f"估算多线程时间: {estimate['multi_thread_time_seconds']:.1f}秒")print(f"预期加速比: {estimate['speedup_ratio']:.1f}x")# 测试优化版(多线程)print("\n测试优化版(多线程)...")start_time = time.time()df_optimized = optimized_client.get_symbol_data_optimized(symbol, timeframe, days, use_threading=True, save_to_file=False)optimized_time = time.time() - start_timeprint(f"优化版完成: {len(df_optimized)} 条记录,耗时 {optimized_time:.2f} 秒")# 测试优化版(单线程)print("\n测试优化版(单线程)...")start_time = time.time()df_single = optimized_client.get_symbol_data_optimized(symbol, timeframe, days, use_threading=False, save_to_file=False)single_time = time.time() - start_timeprint(f"单线程版完成: {len(df_single)} 条记录,耗时 {single_time:.2f} 秒")# 计算实际加速比if optimized_time > 0:actual_speedup = single_time / optimized_timeprint(f"\n实际加速比: {actual_speedup:.1f}x")print(f"效率提升: {(1 - optimized_time/single_time)*100:.1f}%")if __name__ == "__main__":# 运行性能对比compare_performance()

调用优化的apiclient,实现任务调度,可以取多个交易对,执行任务。


# -*- coding: utf-8 -*-
"""
批量数据管理器支持断点续传、数据验证、自动重试和智能调度的高级数据获取工具
"""import os
import json
import time
import logging
import hashlib
import pandas as pd
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from api_client_optimized import OptimizedBinanceAPIClient
from config import KLINE_COLUMNSlogger = logging.getLogger(__name__)class BatchDataManager:"""批量数据管理器提供高级数据获取功能:- 断点续传- 数据验证- 自动重试- 进度保存- 智能调度"""def __init__(self, output_dir='batch_data', max_workers=8, enable_resume=True):"""初始化批量数据管理器参数:- output_dir: 输出目录- max_workers: 最大工作线程数- enable_resume: 是否启用断点续传"""self.output_dir = Path(output_dir)self.output_dir.mkdir(exist_ok=True)self.max_workers = max_workersself.enable_resume = enable_resume# 创建子目录self.data_dir = self.output_dir / 'data'self.progress_dir = self.output_dir / 'progress'self.logs_dir = self.output_dir / 'logs'for dir_path in [self.data_dir, self.progress_dir, self.logs_dir]:dir_path.mkdir(exist_ok=True)# 初始化API客户端self.client = OptimizedBinanceAPIClient(max_workers=max_workers)# 任务状态self.tasks = {}self.completed_tasks = set()self.failed_tasks = set()# 配置日志self._setup_logging()def _setup_logging(self):"""设置日志配置"""log_file = self.logs_dir / f'batch_manager_{datetime.now().strftime("%Y%m%d")}.log'file_handler = logging.FileHandler(log_file, encoding='utf-8')file_handler.setLevel(logging.INFO)formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(formatter)logger.addHandler(file_handler)logger.setLevel(logging.INFO)def _generate_task_id(self, symbol: str, interval: str, start_time: str, end_time: str) -> str:"""生成任务ID"""task_str = f"{symbol}_{interval}_{start_time}_{end_time}"return hashlib.md5(task_str.encode()).hexdigest()[:12]def _get_progress_file(self, task_id: str) -> Path:"""获取进度文件路径"""return self.progress_dir / f"{task_id}_progress.json"def _save_progress(self, task_id: str, progress_data: Dict):"""保存进度信息"""progress_file = self._get_progress_file(task_id)progress_data['last_update'] = datetime.now().isoformat()with open(progress_file, 'w', encoding='utf-8') as f:json.dump(progress_data, f, indent=2, ensure_ascii=False)def _load_progress(self, task_id: str) -> Optional[Dict]:"""加载进度信息"""progress_file = self._get_progress_file(task_id)if not progress_file.exists():return Nonetry:with open(progress_file, 'r', encoding='utf-8') as f:return json.load(f)except Exception as e:logger.error(f"加载进度文件失败: {e}")return Nonedef _validate_data_integrity(self, file_path: Path, expected_records: int = None) -> bool:"""验证数据完整性参数:- file_path: 数据文件路径- expected_records: 预期记录数返回:- bool: 数据是否完整"""try:if not file_path.exists():return False# 读取数据df = pd.read_csv(file_path)# 检查基本结构if df.empty:logger.warning(f"数据文件为空: {file_path}")return False# 检查必要列required_columns = ['timestamp', 'open', 'high', 'low', 'close', 'volume']missing_columns = [col for col in required_columns if col not in df.columns]if missing_columns:logger.error(f"数据文件缺少必要列 {missing_columns}: {file_path}")return False# 检查记录数if expected_records and len(df) < expected_records * 0.95:  # 允许5%的误差logger.warning(f"数据记录数不足,期望 {expected_records},实际 {len(df)}: {file_path}")return False# 检查时间序列连续性df['timestamp'] = pd.to_datetime(df['timestamp'])df = df.sort_values('timestamp')# 检查是否有重复时间戳if df['timestamp'].duplicated().any():logger.warning(f"发现重复时间戳: {file_path}")return Falselogger.info(f"数据验证通过: {file_path} ({len(df)} 条记录)")return Trueexcept Exception as e:logger.error(f"数据验证失败: {file_path}, 错误: {e}")return Falsedef add_task(self, symbol: str, interval: str, days: int, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None,priority: int = 1) -> str:"""添加数据获取任务参数:- symbol: 交易对符号- interval: 时间间隔- days: 天数(如果未指定start_date和end_date)- start_date: 开始日期- end_date: 结束日期- priority: 优先级(数字越小优先级越高)返回:- str: 任务ID"""# 计算时间范围if start_date and end_date:start_time = start_dateend_time = end_dateelse:end_time = datetime.now()start_time = end_time - timedelta(days=days)start_timestamp = str(int(start_time.timestamp() * 1000))end_timestamp = str(int(end_time.timestamp() * 1000))# 生成任务IDtask_id = self._generate_task_id(symbol, interval, start_timestamp, end_timestamp)# 创建任务信息task_info = {'task_id': task_id,'symbol': symbol,'interval': interval,'start_time': start_timestamp,'end_time': end_timestamp,'start_date': start_time.isoformat(),'end_date': end_time.isoformat(),'days': (end_time - start_time).days,'priority': priority,'status': 'pending','created_at': datetime.now().isoformat(),'attempts': 0,'max_attempts': 3}self.tasks[task_id] = task_infologger.info(f"添加任务: {symbol} {interval} {task_info['days']}天 (ID: {task_id})")return task_iddef _execute_task(self, task_id: str) -> bool:"""执行单个任务参数:- task_id: 任务ID返回:- bool: 是否成功"""task = self.tasks[task_id]try:# 更新任务状态task['status'] = 'running'task['attempts'] += 1task['start_time_actual'] = datetime.now().isoformat()logger.info(f"开始执行任务 {task_id}: {task['symbol']} {task['interval']} (尝试 {task['attempts']}/{task['max_attempts']})")# 检查是否可以断点续传output_file = self.data_dir / f"{task['symbol']}_{task['interval']}_{task['days']}days_{task_id}.csv"if self.enable_resume and output_file.exists():# 验证现有数据if self._validate_data_integrity(output_file):logger.info(f"任务 {task_id} 数据已存在且完整,跳过")task['status'] = 'completed'task['end_time_actual'] = datetime.now().isoformat()self.completed_tasks.add(task_id)return Trueelse:logger.warning(f"任务 {task_id} 现有数据不完整,重新获取")output_file.unlink()  # 删除损坏的文件# 进度回调def progress_callback(current, total, chunk_data):progress = (current / total) * 100progress_data = {'task_id': task_id,'current_chunk': current,'total_chunks': total,'progress_percent': progress,'records_in_chunk': len(chunk_data),'status': 'running'}self._save_progress(task_id, progress_data)logger.info(f"任务 {task_id} 进度: {current}/{total} ({progress:.1f}%)")# 获取数据klines = self.client.get_historical_klines_with_progress(symbol=task['symbol'],interval=task['interval'],start_time=task['start_time'],end_time=task['end_time'],progress_callback=progress_callback,save_intermediate=True,output_file=str(output_file))# 验证结果if not klines:raise ValueError("未获取到任何数据")# 验证文件完整性if not self._validate_data_integrity(output_file, len(klines)):raise ValueError("数据完整性验证失败")# 更新任务状态task['status'] = 'completed'task['end_time_actual'] = datetime.now().isoformat()task['records_count'] = len(klines)task['output_file'] = str(output_file)# 保存最终进度final_progress = {'task_id': task_id,'status': 'completed','records_count': len(klines),'output_file': str(output_file)}self._save_progress(task_id, final_progress)self.completed_tasks.add(task_id)logger.info(f"任务 {task_id} 完成: {len(klines):,} 条记录")return Trueexcept Exception as e:# 任务失败task['status'] = 'failed'task['error'] = str(e)task['end_time_actual'] = datetime.now().isoformat()logger.error(f"任务 {task_id} 失败 (尝试 {task['attempts']}/{task['max_attempts']}): {e}")# 保存错误进度error_progress = {'task_id': task_id,'status': 'failed','error': str(e),'attempts': task['attempts']}self._save_progress(task_id, error_progress)if task['attempts'] >= task['max_attempts']:self.failed_tasks.add(task_id)logger.error(f"任务 {task_id} 达到最大重试次数,标记为失败")else:task['status'] = 'pending'  # 重新排队logger.info(f"任务 {task_id} 将重新尝试")return Falsedef execute_all_tasks(self, max_concurrent=3):"""执行所有任务参数:- max_concurrent: 最大并发任务数"""logger.info(f"开始执行批量任务,共 {len(self.tasks)} 个任务")# 按优先级排序任务sorted_tasks = sorted(self.tasks.items(),key=lambda x: (x[1]['priority'], x[1]['created_at']))total_tasks = len(sorted_tasks)completed_count = 0failed_count = 0start_time = time.time()for task_id, task in sorted_tasks:if task_id in self.completed_tasks:completed_count += 1continueif task_id in self.failed_tasks:failed_count += 1continue# 执行任务success = self._execute_task(task_id)if success:completed_count += 1else:if task['attempts'] >= task['max_attempts']:failed_count += 1# 显示总体进度progress = ((completed_count + failed_count) / total_tasks) * 100elapsed = time.time() - start_timelogger.info(f"总体进度: {completed_count + failed_count}/{total_tasks} ({progress:.1f}%) - "f"成功: {completed_count}, 失败: {failed_count}, 耗时: {elapsed:.1f}s")total_time = time.time() - start_timelogger.info(f"批量任务执行完成!")logger.info(f"总任务数: {total_tasks}")logger.info(f"成功: {completed_count}")logger.info(f"失败: {failed_count}")logger.info(f"总耗时: {total_time:.1f} 秒 ({total_time/60:.1f} 分钟)")return {'total': total_tasks,'completed': completed_count,'failed': failed_count,'duration': total_time}def get_task_status(self, task_id: str = None) -> Dict:"""获取任务状态参数:- task_id: 任务ID,如果为None则返回所有任务状态返回:- Dict: 任务状态信息"""if task_id:if task_id in self.tasks:task = self.tasks[task_id].copy()progress = self._load_progress(task_id)if progress:task['progress'] = progressreturn taskelse:return {'error': f'任务 {task_id} 不存在'}else:# 返回所有任务的摘要summary = {'total_tasks': len(self.tasks),'completed': len(self.completed_tasks),'failed': len(self.failed_tasks),'pending': len(self.tasks) - len(self.completed_tasks) - len(self.failed_tasks),'tasks': {}}for tid, task in self.tasks.items():summary['tasks'][tid] = {'symbol': task['symbol'],'interval': task['interval'],'days': task['days'],'status': task['status'],'attempts': task.get('attempts', 0)}return summarydef cleanup_failed_tasks(self):"""清理失败任务的临时文件"""cleaned_count = 0for task_id in self.failed_tasks:task = self.tasks[task_id]# 删除可能的部分文件output_file = self.data_dir / f"{task['symbol']}_{task['interval']}_{task['days']}days_{task_id}.csv"if output_file.exists():output_file.unlink()cleaned_count += 1logger.info(f"清理失败任务文件: {output_file}")# 删除进度文件progress_file = self._get_progress_file(task_id)if progress_file.exists():progress_file.unlink()logger.info(f"清理完成,删除了 {cleaned_count} 个失败任务的文件")def export_summary_report(self) -> str:"""导出摘要报告返回:- str: 报告文件路径"""report_file = self.output_dir / f'batch_report_{datetime.now().strftime("%Y%m%d_%H%M%S")}.json'report = {'generated_at': datetime.now().isoformat(),'summary': self.get_task_status(),'detailed_tasks': {}}for task_id, task in self.tasks.items():detailed_task = task.copy()progress = self._load_progress(task_id)if progress:detailed_task['progress'] = progressreport['detailed_tasks'][task_id] = detailed_taskwith open(report_file, 'w', encoding='utf-8') as f:json.dump(report, f, indent=2, ensure_ascii=False)logger.info(f"摘要报告已导出: {report_file}")return str(report_file)def create_preset_tasks(manager: BatchDataManager, preset_name: str = 'popular_pairs'):"""创建预设任务参数:- manager: 批量数据管理器- preset_name: 预设名称"""presets = {'popular_pairs': [('BTCUSDT', '1m', 30),('ETHUSDT', '1m', 30),('BNBUSDT', '1m', 30),('ADAUSDT', '5m', 60),('SOLUSDT', '5m', 60),('DOTUSDT', '15m', 90),('LINKUSDT', '15m', 90),],'major_pairs_hourly': [('BTCUSDT', '1h', 365),('ETHUSDT', '1h', 365),('BNBUSDT', '1h', 180),('ADAUSDT', '1h', 180),],'quick_test': [('BTCUSDT', '1h', 7),('ETHUSDT', '1h', 7),]}if preset_name not in presets:logger.error(f"未知预设: {preset_name}")returntasks = presets[preset_name]task_ids = []for symbol, interval, days in tasks:task_id = manager.add_task(symbol, interval, days)task_ids.append(task_id)logger.info(f"创建预设任务 '{preset_name}': {len(task_ids)} 个任务")return task_idsif __name__ == "__main__":# 示例使用manager = BatchDataManager(output_dir='advanced_batch_data', max_workers=6)# 创建预设任务print("创建预设任务...")create_preset_tasks(manager, 'quick_test')# 显示任务状态print("\n任务状态:")status = manager.get_task_status()print(f"总任务数: {status['total_tasks']}")print(f"待执行: {status['pending']}")# 执行所有任务print("\n开始执行任务...")result = manager.execute_all_tasks()# 导出报告report_file = manager.export_summary_report()print(f"\n报告已导出: {report_file}")
http://www.lryc.cn/news/619888.html

相关文章:

  • 01数据结构-Prim算法
  • Unity、C#常用的时间处理类
  • Gradle(三)创建一个 SpringBoot 项目
  • C++ 中构造函数参数对父对象的影响:父子控件管理机制解析
  • 【完整源码+数据集+部署教程】火柴实例分割系统源码和数据集:改进yolo11-rmt
  • 学习语言的一个阶段性总结
  • Linux操作系统应用编程——文件IO
  • Nginx的SSL通配符证书自动续期
  • 精准阻断内网渗透:联软科技终端接入方案如何“锁死”横向移动?
  • MySQL中的查询、索引与事务
  • MySQL三大存储引擎对比:InnoDB vs MyISAM vs MEMORY
  • RuoYi-Cloud 接入 Sentinel 的 3 种限流方式
  • Android 双屏异显技术全解析:从原理到实战的多屏交互方案
  • Ubuntu 20.04 虚拟机安装完整教程:从 VMware 到 VMware Tools
  • 基于.Net Framework4.5 Web API 引用Swagger
  • nginx高性能web服务器实验
  • INTERSPEECH 2025 | 数据堂诚邀您参加MLC-SLM挑战赛暨研讨会
  • JVM安全点轮询汇编函数解析
  • 【个人简单记录】PLT,GOT函数加载机制
  • 海康视觉平台VM创建项目
  • FxSound:为你的音频体验注入专业级享受
  • Android图片加载库Glide深度解析与实践指南
  • 4 种方法将联系人从 iPhone 传输到 realme
  • 用了Cursor AI之后,我的编程效率翻倍了?——一位程序员的真实体验分享
  • 小迪23年-32~40——java简单回顾
  • Dots.ocr:告别复杂多模块架构,1.7B参数单一模型统一处理所有OCR任务22
  • 直播预告|鸿蒙生态中的AI新玩法
  • 09--解密栈与队列:数据结构核心原理
  • 图像分割-动手学计算机视觉9
  • 算法提升-树上问题之(dfs序)