商品数据仓库构建指南:TB 级淘宝 API 历史详情数据归档方案
引言
随着电子商务的蓬勃发展,淘宝平台积累了海量的商品历史数据,这些数据包含了丰富的商业信息,对数据分析、市场预测、用户行为研究等具有重要价值。构建一个能够高效存储、管理和分析 TB 级淘宝 API 历史详情数据的商品数据仓库,成为电商企业数据战略的核心组成部分。
本文将详细介绍商品数据仓库的构建方案,重点讲解针对 TB 级淘宝 API 历史详情数据的归档策略,包括数据模型设计、ETL 流程实现、存储优化及查询性能提升等关键技术点,并提供可落地的代码实现。
数据仓库架构设计
淘宝商品数据仓库采用分层架构设计,主要包含以下层次:
- ODS 层(操作数据存储层):直接存储从淘宝 API 获取的原始数据,保持数据原貌
- DWD 层(数据明细层):对 ODS 层数据进行清洗、转换,形成结构化的明细数据
- DWS 层(数据汇总层):基于 DWD 层数据进行汇总统计,形成各类汇总指标
- ADS 层(应用数据层):为具体业务场景提供数据服务,如报表、分析等
架构图如下:
plaintext
淘宝API → 数据采集 → ODS层 → DWD层 → DWS层 → ADS层 → 业务应用↑ ↑ ↑ ↑└────────┴────────┴────────┘数据治理与监控
数据模型设计
针对淘宝商品历史详情数据,我们设计以下核心数据模型:
1. 商品基础信息表
存储商品的基本属性信息,如商品 ID、名称、分类、价格等
2. 商品属性表
存储商品的扩展属性,采用键值对形式,支持灵活扩展
3. 商品价格历史表
记录商品价格的历史变动情况,支持追踪价格变化趋势
4. 商品库存历史表
记录商品库存的历史变动情况
5. 商品类目表
存储商品分类体系,支持多级分类
数据采集与同步方案
从淘宝 API 采集历史商品数据是数据仓库的入口,需要考虑数据量巨大(TB 级)的特点,设计高效的采集方案。
1. 淘宝 API 数据采集工具
import requests
import json
import time
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta
import os
import csv
from queue import Queue
import threading# 配置日志
logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('taobao_api_crawler.log'),logging.StreamHandler()]
)class TaobaoApiCrawler:def __init__(self, app_key, app_secret, max_workers=10, batch_size=100):self.app_key = app_keyself.app_secret = app_secretself.base_url = "https://eco.taobao.com/router/rest"self.max_workers = max_workersself.batch_size = batch_sizeself.data_queue = Queue(maxsize=1000)self.writer_thread = Noneself.stop_event = threading.Event()def generate_sign(self, params):"""生成API签名,实际实现需根据淘宝API要求"""# 此处简化实现,实际项目中需按照淘宝API签名规则实现import hashlibsorted_params = sorted(params.items(), key=lambda x: x[0])sign_str = self.app_secret + ''.join([f"{k}{v}" for k, v in sorted_params]) + self.app_secretreturn hashlib.md5(sign_str.encode()).hexdigest().upper()def fetch_product_batch(self, product_ids):"""批量获取商品详情"""params = {"method": "taobao.item.get","app_key": self.app_key,"format": "json","v": "2.0","timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"fields": "num_iid,title,price,stock,cid,category,name,desc,pics,props,created,modified","num_iids": ",".join(map(str, product_ids))}# 生成签名params["sign"] = self.generate_sign(params)try:response = requests.get(self.base_url, params=params, timeout=30)result = response.json()if "error_response" in result:logging.error(f"API调用错误: {result['error_response']}")return Nonereturn result.get("items_get_response", {}).get("items", [])except Exception as e:logging.error(f"获取商品数据失败: {str(e)}", exc_info=True)return Nonedef data_writer(self, output_dir):"""数据写入线程,从队列中取数据并写入文件"""while not self.stop_event.is_set() or not self.data_queue.empty():try:# 阻塞等待数据,超时1秒用于检查stop_eventdata = self.data_queue.get(timeout=1)if not data:continue# 按日期分目录存储date_str = datetime.now().strftime("%Y%m%d")date_dir = os.path.join(output_dir, date_str)os.makedirs(date_dir, exist_ok=True)# 按商品ID范围分文件first_id = data[0].get('num_iid', 0)file_id = first_id // 10000 # 每10000个商品一个文件file_name = f"products_{file_id}.jsonl"file_path = os.path.join(date_dir, file_name)# 写入JSON Lines格式文件with open(file_path, 'a', encoding='utf-8') as f:for item in data:f.write(json.dumps(item, ensure_ascii=False) + '\n')self.data_queue.task_done()except Exception as e:logging.error(f"数据写入失败: {str(e)}", exc_info=True)def crawl_product_range(self, start_id, end_id, output_dir):"""爬取指定ID范围的商品数据"""# 启动数据写入线程self.writer_thread = threading.Thread(target=self.data_writer, args=(output_dir,))self.writer_thread.start()try:total = end_id - start_id + 1batches = [(i, min(i + self.batch_size - 1, end_id)) for i in range(start_id, end_id + 1, self.batch_size)]logging.info(f"开始爬取商品ID范围: {start_id}-{end_id}, 共{total}个商品, {len(batches)}批")with ThreadPoolExecutor(max_workers=self.max_workers) as executor:futures = []for batch_start, batch_end in batches:product_ids = list(range(batch_start, batch_end + 1))# 控制API调用频率,避免触发限流time.sleep(0.1)futures.append(executor.submit(self.fetch_product_batch, product_ids))# 处理结果for future in as_completed(futures):result = future.result()if result:self.data_queue.put(result)logging.info(f"成功获取{len(result)}个商品数据")finally:# 等待队列处理完毕self.data_queue.join()# 停止写入线程self.stop_event.set()if self.writer_thread:self.writer_thread.join()def crawl_historical_data(self, output_dir, start_date, end_date):"""爬取指定日期范围内的历史数据变更"""# 按日期范围获取商品变更历史current_date = start_datewhile current_date <= end_date:date_str = current_date.strftime("%Y-%m-%d")logging.info(f"开始爬取{date_str}的商品历史数据")# 实际应用中,需根据淘宝API的历史数据查询接口调整params = {"method": "taobao.item.history.get","app_key": self.app_key,"format": "json","v": "2.0","timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),"start_date": date_str,"end_date": date_str,"fields": "num_iid,title,price,stock,modified"}params["sign"] = self.generate_sign(params)try:response = requests.get(self.base_url, params=params, timeout=30)result = response.json()if "error_response" in result:logging.error(f"API调用错误: {result['error_response']}")current_date += timedelta(days=1)continueitems = result.get("item_history_get_response", {}).get("items", [])if items:# 保存历史数据date_dir = os.path.join(output_dir, "history", current_date.strftime("%Y%m%d"))os.makedirs(date_dir, exist_ok=True)file_path = os.path.join(date_dir, "product_history.jsonl")with open(file_path, 'a', encoding='utf-8') as f:for item in items:f.write(json.dumps(item, ensure_ascii=False) + '\n')logging.info(f"成功获取{date_str}的{len(items)}条商品历史数据")except Exception as e:logging.error(f"获取{date_str}历史数据失败: {str(e)}", exc_info=True)current_date += timedelta(days=1)# 控制API调用频率time.sleep(1)if __name__ == "__main__":# 配置参数APP_KEY = "your_app_key"APP_SECRET = "your_app_secret"OUTPUT_DIR = "/data/taobao/ods"# 初始化爬虫crawler = TaobaoApiCrawler(APP_KEY, APP_SECRET, max_workers=10)# 示例1: 爬取指定ID范围的商品数据# crawler.crawl_product_range(1000000, 2000000, OUTPUT_DIR)# 示例2: 爬取指定日期范围的历史数据start_date = datetime(2023, 1, 1)end_date = datetime(2023, 12, 31)crawler.crawl_historical_data(OUTPUT_DIR, start_date, end_date)
2. 数据同步到 HDFS
对于 TB 级数据,我们采用 HDFS 作为底层存储系统,使用 Sqoop 工具将本地文件同步到 HDFS:
# 创建HDFS目录
hadoop fs -mkdir -p /user/hive/warehouse/taobao_ods.db/product_details_raw# 使用Sqoop导入数据到HDFS(按日期分区)
sqoop import \--connect jdbc:mysql://localhost:3306/taobao_api_log \--username hive \--password hive \--query "SELECT * FROM api_data WHERE dt='2023-10-01' AND \$CONDITIONS" \--target-dir /user/hive/warehouse/taobao_ods.db/product_details_raw/dt=2023-10-01 \--split-by id \--fields-terminated-by '\001' \--lines-terminated-by '\n' \--num-mappers 10# 对于JSON文件,可以直接使用Hadoop命令上传
hadoop fs -put /local/data/taobao/20231001/*.jsonl /user/hive/warehouse/taobao_ods.db/product_details_raw/dt=2023-10-01/
数据仓库表设计与实现
1. ODS 层表设计
ODS 层直接存储从 API 获取的原始数据,采用分区表按日期存储:
-- 创建商品详情原始表
CREATE EXTERNAL TABLE IF NOT EXISTS taobao_ods.product_details_raw (num_iid BIGINT COMMENT '商品ID',title STRING COMMENT '商品标题',price DECIMAL(10,2) COMMENT '商品价格',stock INT COMMENT '库存数量',cid BIGINT COMMENT '分类ID',category STRING COMMENT '分类名称',name STRING COMMENT '商品名称',desc STRING COMMENT '商品描述',pics ARRAY<STRING> COMMENT '商品图片URL',props MAP<STRING, STRING> COMMENT '商品属性',created STRING COMMENT '创建时间',modified STRING COMMENT '修改时间'
)
COMMENT '淘宝商品详情原始数据'
PARTITIONED BY (dt STRING COMMENT '数据日期')
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/hive/warehouse/taobao_ods.db/product_details_raw';-- 创建商品历史变更原始表
CREATE EXTERNAL TABLE IF NOT EXISTS taobao_ods.product_history_raw (num_iid BIGINT COMMENT '商品ID',title STRING COMMENT '商品标题',price DECIMAL(10,2) COMMENT '商品价格',stock INT COMMENT '库存数量',modified STRING COMMENT '修改时间'
)
COMMENT '淘宝商品历史变更原始数据'
PARTITIONED BY (dt STRING COMMENT '数据日期')
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/hive/warehouse/taobao_ods.db/product_history_raw';-- 添加分区的脚本
ALTER TABLE taobao_ods.product_details_raw ADD IF NOT EXISTS PARTITION (dt='2023-10-01')
LOCATION '/user/hive/warehouse/taobao_ods.db/product_details_raw/dt=2023-10-01';
2. DWD 层表设计
DWD 层对原始数据进行清洗和转换,形成结构化的明细数据:
-- 创建商品详情明细表
CREATE TABLE IF NOT EXISTS taobao_dwd.product_details (product_id BIGINT COMMENT '商品ID',product_title STRING COMMENT '商品标题',price DECIMAL(10,2) COMMENT '商品价格',stock INT COMMENT '库存数量',category_id BIGINT COMMENT '分类ID',category_name STRING COMMENT '分类名称',product_name STRING COMMENT '商品名称',description STRING COMMENT '商品描述',image_urls ARRAY<STRING> COMMENT '商品图片URL',attributes MAP<STRING, STRING> COMMENT '商品属性',create_time TIMESTAMP COMMENT '创建时间',modify_time TIMESTAMP COMMENT '修改时间',etl_time TIMESTAMP COMMENT 'ETL处理时间'
)
COMMENT '淘宝商品详情明细数据'
PARTITIONED BY (dt STRING COMMENT '数据日期')
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');-- 创建商品价格历史明细表
CREATE TABLE IF NOT EXISTS taobao_dwd.product_price_history (product_id BIGINT COMMENT '商品ID',price DECIMAL(10,2) COMMENT '商品价格',modify_time TIMESTAMP COMMENT '修改时间',etl_time TIMESTAMP COMMENT 'ETL处理时间'
)
COMMENT '淘宝商品价格历史明细数据'
PARTITIONED BY (dt STRING COMMENT '数据日期')
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');
3. DWD 层 ETL 实现
使用 Spark 实现 DWD 层的 ETL 处理,将 ODS 层数据清洗转换后加载到 DWD 层:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import java.sql.Timestamp
import java.text.SimpleDateFormatobject ProductDetailETL {def main(args: Array[String]): Unit = {// 解析命令行参数,获取日期val dt = if (args.length > 0) args(0) else {val sdf = new SimpleDateFormat("yyyy-MM-dd")sdf.format(new java.util.Date(System.currentTimeMillis() - 86400000)) // 默认为前一天}// 初始化SparkSessionval spark = SparkSession.builder().appName(s"ProductDetailETL-$dt").enableHiveSupport().getOrCreate()import spark.implicits._// 1. 处理商品详情数据,从ODS到DWDval odsProductDF = spark.sql(s"""|SELECT * FROM taobao_ods.product_details_raw|WHERE dt = '$dt'""".stripMargin)// 数据清洗和转换val dwdProductDF = odsProductDF// 过滤无效数据.filter($"num_iid".isNotNull && $"title".isNotNull)// 重命名列并转换数据类型.withColumnRenamed("num_iid", "product_id").withColumnRenamed("title", "product_title").withColumnRenamed("cid", "category_id").withColumnRenamed("category", "category_name").withColumnRenamed("name", "product_name").withColumnRenamed("desc", "description").withColumnRenamed("pics", "image_urls").withColumnRenamed("props", "attributes").withColumnRenamed("created", "create_time_str").withColumnRenamed("modified", "modify_time_str")// 转换字符串时间为Timestamp类型.withColumn("create_time", to_timestamp($"create_time_str", "yyyy-MM-dd HH:mm:ss")).withColumn("modify_time", to_timestamp($"modify_time_str", "yyyy-MM-dd HH:mm:ss"))// 添加ETL处理时间.withColumn("etl_time", current_timestamp())// 选择需要的列.select($"product_id",$"product_title",$"price",$"stock",$"category_id",$"category_name",$"product_name",$"description",$"image_urls",$"attributes",$"create_time",$"modify_time",$"etl_time")// 将数据写入DWD层dwdProductDF.write.mode("overwrite").partitionBy("dt").insertInto("taobao_dwd.product_details")// 2. 处理商品价格历史数据val odsHistoryDF = spark.sql(s"""|SELECT * FROM taobao_ods.product_history_raw|WHERE dt = '$dt'""".stripMargin)val dwdPriceHistoryDF = odsHistoryDF.filter($"num_iid".isNotNull).withColumnRenamed("num_iid", "product_id").withColumnRenamed("modified", "modify_time_str").withColumn("modify_time", to_timestamp($"modify_time_str", "yyyy-MM-dd HH:mm:ss")).withColumn("etl_time", current_timestamp()).select($"product_id",$"price",$"modify_time",$"etl_time").withColumn("dt", lit(dt))// 写入DWD层价格历史表dwdPriceHistoryDF.write.mode("overwrite").partitionBy("dt").insertInto("taobao_dwd.product_price_history")// 3. 数据质量检查val productCount = dwdProductDF.count()val historyCount = dwdPriceHistoryDF.count()println(s"ETL完成: 商品详情数据 $productCount 条, 价格历史数据 $historyCount 条, 日期: $dt")spark.stop()}
}
4. DWS 层和 ADS 层设计
DWS 层主要进行数据汇总,ADS 层为应用提供数据服务:
-- DWS层:商品分类汇总表
CREATE TABLE IF NOT EXISTS taobao_dws.product_category_summary (category_id BIGINT COMMENT '分类ID',category_name STRING COMMENT '分类名称',product_count BIGINT COMMENT '商品数量',avg_price DECIMAL(10,2) COMMENT '平均价格',total_stock BIGINT COMMENT '总库存',stat_date STRING COMMENT '统计日期'
)
COMMENT '商品分类汇总数据'
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');-- ADS层:商品价格趋势表
CREATE TABLE IF NOT EXISTS taobao_ads.product_price_trend (product_id BIGINT COMMENT '商品ID',product_title STRING COMMENT '商品标题',date_str STRING COMMENT '日期',price DECIMAL(10,2) COMMENT '价格',price_change DECIMAL(10,2) COMMENT '价格变动',change_rate DECIMAL(5,2) COMMENT '变动比例(%)'
)
COMMENT '商品价格趋势分析'
STORED AS ORC
TBLPROPERTIES ('orc.compress' = 'SNAPPY');
数据归档策略
针对 TB 级历史数据,需要设计合理的归档策略,平衡存储成本和查询性能:
1. 分层存储策略
- 热数据(最近 3 个月):存储在高性能存储介质,如 SSD,确保快速查询
- 温数据(3-12 个月):存储在普通 HDD
- 冷数据(1 年以上):压缩后存储,可考虑迁移到低成本存储
#!/bin/bash
# 数据归档脚本:将超过1年的历史数据迁移到归档存储# 配置参数
ARCHIVE_THRESHOLD_DAYS=365 # 超过365天的数据将被归档
HIVE_DB="taobao_dwd"
TABLES=("product_details" "product_price_history")
ARCHIVE_HDFS_PATH="/archive/hive/warehouse"
CURRENT_HDFS_PATH="/user/hive/warehouse"# 计算归档阈值日期
THRESHOLD_DATE=$(date -d "$ARCHIVE_THRESHOLD_DAYS days ago" +%Y-%m-%d)
echo "开始归档${THRESHOLD_DATE}之前的数据..."# 遍历所有需要归档的表
for TABLE in "${TABLES[@]}"; doecho "处理表: $HIVE_DB.$TABLE"# 获取需要归档的分区PARTITIONS=$(hive -e "show partitions $HIVE_DB.$TABLE" | grep -v 'dt=' | while read -r part; dodt=$(echo $part | awk -F'=' '{print $2}')if [[ "$dt" < "$THRESHOLD_DATE" ]]; thenecho $dtfidone)# 归档每个分区for dt in $PARTITIONS; doecho "归档分区: dt=$dt"# 1. 压缩数据hadoop jar /path/to/hadoop-streaming.jar \-Dmapred.reduce.tasks=1 \-input "$CURRENT_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=$dt" \-output "$CURRENT_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=${dt}_temp" \-mapper cat \-reducer cat \-outputformat org.apache.hadoop.io.compress.GzipCodec# 2. 移动到归档目录hadoop fs -mkdir -p "$ARCHIVE_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=$dt"hadoop fs -mv "$CURRENT_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=${dt}_temp/*" "$ARCHIVE_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=$dt/"hadoop fs -rm -r "$CURRENT_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=${dt}_temp"# 3. 修改Hive表分区位置hive -e "ALTER TABLE $HIVE_DB.$TABLE PARTITION (dt='$dt') SET LOCATION '$ARCHIVE_HDFS_PATH/$HIVE_DB.db/$TABLE/dt=$dt';"echo "完成分区dt=$dt的归档"done
doneecho "所有符合条件的数据归档完成"
结论与展望
本文详细介绍了 TB 级淘宝 API 历史详情数据的商品数据仓库构建方案,从架构设计、数据模型、ETL 实现到归档策略和性能优化,提供了一套完整的解决方案。通过分层架构设计和合理的存储策略,能够高效管理海量商品历史数据,为数据分析和业务决策提供支持。
未来,我们将在以下方面进行优化:
- 实时数据仓库:集成 Flink 等流处理技术,构建实时 + 离线一体化数据仓库
- 智能数据生命周期管理:基于 AI 算法预测数据访问频率,动态调整存储策略
- 自助分析平台:构建基于数据仓库的自助分析平台,降低数据分析门槛
- 数据湖集成:将数据仓库与数据湖结合,支持更灵活的数据存储和分析模式
该方案不仅适用于淘宝商品数据,也可推广到其他电商平台或具有类似特点的大数据归档场景。