第四章:核心模块Python实现详解

4.1 数据接入模块:基于FastAPI + Kafka的通用接收器
import asyncio
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from confluent_kafka import Producer, KafkaException
import json
import logging
from typing import Optional, Dict, Any
from pydantic import BaseModel, Field
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
KAFKA_BROKERS = "kafka1:9092,kafka2:9092"
KAFKA_TOPIC = "raw_events"
class EventModel(BaseModel):event_id: str = Field(..., description="Unique event identifier")event_type: str = Field(..., description="Type of the event")source: str = Field(..., description="Source system of the event")timestamp: Optional[str] = Field(None, description="Event timestamp (ISO 8601)")payload: Dict[str, Any] = Field(..., description="Event data payload")
conf = {'bootstrap.servers': KAFKA_BROKERS,'client.id': 'fastapi_ingestor',
}
producer = Producer(conf)
app = FastAPI(title="Real-Time Event Ingestion API",description="API for ingesting events into Kafka",version="1.0.0"
)
async def produce_event(topic: str, key: str, value: dict):loop = asyncio.get_event_loop()try:await loop.run_in_executor(None, lambda: producer.produce(topic, key=key, value=json.dumps(value).encode('utf-8')))producer.poll(0) logger.info(f"Event with key '{key}' sent to topic '{topic}'")except BufferError:logger.error(f"Kafka producer buffer full for key '{key}'")raise HTTPException(status_code=503, detail="Service temporarily unavailable (Kafka buffer full)")except KafkaException as e:logger.error(f"Kafka error for key '{key}': {e}")raise HTTPException(status_code=500, detail=f"Internal server error (Kafka: {e})")
def delivery_report(err, msg):if err is not None:logger.error(f'Message delivery failed: {err}')else:logger.info(f'Message delivered to {msg.topic()} [{msg.partition()}]')
producer = Producer({**conf, 'on_delivery': delivery_report})
@app.post("/events/", response_model=Dict[str, str], status_code=202)
async def ingest_event(event: EventModel, background_tasks: BackgroundTasks):"""Ingest a single event into the Kafka topic."""try:if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"kafka_message = event.dict()kafka_key = event.event_id background_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)return {"status": "accepted", "event_id": event.event_id}except HTTPException:raiseexcept Exception as e:logger.error(f"Unexpected error ingesting event {event.event_id}: {e}")raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/events/batch/", response_model=Dict[str, Any], status_code=207)
async def ingest_events_batch(events: list[EventModel], background_tasks: BackgroundTasks):"""Ingest a batch of events into the Kafka topic.Returns a multi-status response indicating success/failure per event."""results = []success_count = 0failure_count = 0for event in events:try:if not event.timestamp:from datetime import datetimeevent.timestamp = datetime.utcnow().isoformat() + "Z"kafka_message = event.dict()kafka_key = event.event_idbackground_tasks.add_task(produce_event, KAFKA_TOPIC, kafka_key, kafka_message)results.append({"event_id": event.event_id, "status": "accepted"}