当前位置: 首页 > news >正文

流处理、实时分析与RAG驱动的Python ETL框架:构建智能数据管道(中)

第四章:核心模块Python实现详解

在这里插入图片描述

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
# fastapi_kafka_ingestor.py
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from confluent_kafka import Producer, KafkaException
import json
import logging
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# Kafka配置 (应从环境变量或配置中心读取)
KAFKA_BROKERS = "kafka1:9092,kafka2:9092"
KAFKA_TOPIC = "raw_events"# Pydantic模型用于请求验证
class EventModel(BaseModel):event_id: str = Field(..., description="Unique event identifier")event_type: str = Field(..., description="Type of the event")source: str = Field(..., description="Source system of the event")timestamp: Optional[str] = Field(None, description="Event timestamp (ISO 8601)")payload: Dict[str, Any] = Field(..., description="Event data payload")# Kafka Producer配置
conf = {'bootstrap.servers': KAFKA_BROKERS,'client.id': 'fastapi_ingestor',# 可选: 启用压缩# 'compression.codec': 'snappy',# 可选: 启用批处理# 'batch.num.messages': 100,# 'linger.ms': 10,# 可选: 启用ACKs保证# 'acks': 'all',# 可选: 重试# 'retries': 3,# 'retry.backoff.ms': 100,
}
producer = Producer(conf)# FastAPI应用
app = FastAPI(title="Real-Time Event Ingestion API",description="API for ingesting events into Kafka",version="1.0.0"
)# 异步发送消息到Kafka
async def produce_event(topic: str, key: str, value: dict):loop = asyncio.get_event_loop()try:# 在单独的线程中运行同步的producer.produceawait loop.run_in_executor(None, lambda: producer.produce(topic, key=key, value=json.dumps(value).encode('utf-8')))producer.poll(0)  # 触发回调logger.info(f"Event with key '{key}' sent to topic '{topic}'")except BufferError:logger.error(f"Kafka producer buffer full for key '{key}'")raise HTTPException(status_code=503, detail="Service temporarily unavailable (Kafka buffer full)")except KafkaException as e:logger.error(f"Kafka error for key '{key}': {e}")raise HTTPException(status_code=500, detail=f"Internal server error (Kafka: {e})")# 交付报告回调 (可选,用于确认消息是否成功发送)
def delivery_report(err, msg):if err is not None:logger.error(f'Message delivery failed: {err}')else:logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')# 设置交付报告回调
producer = Producer({**conf, 'on_delivery': delivery_report})# API端点:接收单个事件
@app.post("/events/", response_model=Dict[str, str], status_code=202)
async def ingest_event(event: EventModel, background_tasks: BackgroundTasks):"""Ingest a single event into the Kafka topic."""try:# 如果没有提供时间戳,使用当前时间if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"# 构造Kafka消息kafka_message = event.dict()kafka_key = event.event_id  # 使用event_id作为Kafka key保证顺序性# 异步发送消息 (使用BackgroundTasks避免阻塞响应)background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)return {"status": "accepted", "event_id": event.event_id}except HTTPException:raiseexcept Exception as e:logger.error(f"Unexpected error ingesting event {event.event_id}: {e}")raise HTTPException(status_code=500, detail="Internal server error")# API端点:批量接收事件
@app.post("/events/batch/", response_model=Dict[str, Any], status_code=207)
async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks):"""Ingest a batch of events into the Kafka topic.Returns a multi-status response indicating success/failure per event."""results = []success_count = 0failure_count = 0for event in events:try:if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"kafka_message = event.dict()kafka_key = event.event_idbackground_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)results.append({"event_id": event.event_id, "status": "accepted"}
http://www.lryc.cn/news/621763.html

相关文章:

  • Fanuc机器人EtherCAT通讯配置详解
  • 【Linux基础知识系列】第九十六篇 - 使用history命令管理命令历史
  • 【机器人】人形机器人“百机大战”:2025年产业革命的烽火与技术前沿
  • Zabbix【部署 01】Zabbix企业级分布式监控系统部署配置使用实例(在线安装及问题处理)程序安装+数据库初始+前端配置+服务启动+Web登录
  • 在 Vue2 中使用 pdf.js + pdf-lib 实现 PDF 预览、手写签名、文字批注与高保真导出
  • 力扣习题:基本计算器
  • Spring 工具类:StopWatch
  • Java 泛型类型擦除
  • 【递归、搜索与回溯算法】DFS解决FloodFill算法
  • Pytest项目_day17(随机测试数据)
  • JUC学习笔记-----LongAdder
  • 2025年最新油管视频下载,附MassTube下载软件地址
  • 嵌入式 C 语言编程规范个人学习笔记,参考华为《C 语言编程规范》
  • 嵌入式硬件篇---电容串并联
  • 嵌入式硬件篇---电容滤波
  • flutter开发(二)检测媒体中的静音
  • Flinksql bug: Heartbeat of TaskManager with id container_XXX timed out.
  • 对抗损失(GAN)【生成器+判断器】
  • LeetCode 922.按奇偶排序数组2
  • 大模型LLM部署与入门应用指南:核心原理、实战案例及私有化部署
  • 解决安装特定版本 anaconda-client 的错误
  • CSS从入门到精通完整指南
  • 【科研绘图系列】R语言绘制三维曲线图
  • 探索无人机图传技术:创新视野与无限可能
  • Salary Queries
  • 商品数据仓库构建指南:TB 级淘宝 API 历史详情数据归档方案
  • 8.15网络编程——UDP和TCP并发服务器
  • ​​金仓数据库KingbaseES V9R1C10安装教程 - Windows版详细指南​
  • MySQL知识点(上)
  • 复杂度扫尾+链表经典算法题