当前位置: 首页 > news >正文

Python实例题:基于 Apache Kafka 的实时数据流处理平台

目录

Python实例题

题目

问题描述

解题思路

关键代码框架

难点分析

扩展方向

Python实例题

题目

基于 Apache Kafka 的实时数据流处理平台

问题描述

开发一个基于 Apache Kafka 的实时数据流处理平台,包含以下功能:

  • 数据生产者:从多个数据源收集数据
  • Kafka 集群:分布式消息队列存储数据流
  • 流处理引擎:实时处理和转换数据流
  • 数据消费者:将处理后的数据写入目标系统
  • 监控与管理:监控 Kafka 集群和数据流处理状态

解题思路

  • 搭建 Kafka 集群实现高可用消息队列
  • 开发数据生产者从不同数据源收集数据
  • 使用 Kafka Streams 或 Apache Flink 实现流处理
  • 设计数据消费者将处理结果写入目标系统
  • 集成监控工具监控集群和流处理状态

关键代码框架

# 数据生产者示例 - 从API获取数据并发送到Kafka
import requests
import json
from kafka import KafkaProducer
import time
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'api-data-stream'# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,value_serializer=lambda v: json.dumps(v).encode('utf-8'),key_serializer=lambda k: str(k).encode('utf-8')
)# API配置
API_URL = 'https://api.example.com/data'
API_KEY = 'your_api_key'def fetch_data_from_api():"""从API获取数据"""headers = {'Authorization': f'Bearer {API_KEY}'}try:response = requests.get(API_URL, headers=headers)response.raise_for_status()return response.json()except requests.exceptions.RequestException as e:logger.error(f"API请求失败: {e}")return Nonedef send_to_kafka(data):"""将数据发送到Kafka"""if not data:returnfor item in data:# 使用数据中的唯一ID作为键key = item.get('id', None)try:# 发送消息到Kafkafuture = producer.send(KAFKA_TOPIC, value=item, key=key)# 等待确认(可选)record_metadata = future.get(timeout=10)logger.info(f"消息发送成功 - 主题: {record_metadata.topic}, 分区: {record_metadata.partition}, 偏移量: {record_metadata.offset}")except Exception as e:logger.error(f"消息发送失败: {e}")def run_producer():"""运行生产者循环"""logger.info("启动数据生产者...")try:while True:# 从API获取数据data = fetch_data_from_api()# 发送数据到Kafkasend_to_kafka(data)# 等待一段时间再获取下一批数据time.sleep(60)  # 每分钟获取一次数据except KeyboardInterrupt:logger.info("生产者被用户中断")finally:# 关闭生产者连接producer.close()logger.info("生产者已关闭")if __name__ == "__main__":run_producer()
# 流处理示例 - 使用Kafka Streams处理实时数据
from kafka import KafkaConsumer, KafkaProducer
from kafka.streams import KafkaStreams, Processor, Stream
import json
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
INPUT_TOPIC = 'api-data-stream'
OUTPUT_TOPIC = 'processed-data-stream'# 定义处理器
class DataProcessor(Processor):def process(self, key, value):try:# 解析JSON数据data = json.loads(value)# 数据转换示例 - 添加处理时间戳data['processed_at'] = str(int(time.time()))# 数据过滤示例 - 只处理特定类型的数据if data.get('type') == 'important':# 将处理后的数据发送到输出主题self.context.forward(key, json.dumps(data).encode('utf-8'))except json.JSONDecodeError as e:logger.error(f"JSON解析错误: {e}")except Exception as e:logger.error(f"处理数据时出错: {e}")# 定义流处理拓扑
def create_stream():stream_builder = Stream()# 从输入主题读取数据stream = stream_builder.stream(INPUT_TOPIC)# 应用处理器stream.process(DataProcessor)# 将结果写入输出主题stream.to(OUTPUT_TOPIC)return stream_builder# 运行流处理应用
def run_stream_processor():logger.info("启动流处理应用...")# 创建Kafka配置config = {'bootstrap.servers': KAFKA_BOOTSTRAP_SERVERS,'application.id': 'data-processing-app','group.id': 'data-processing-group','auto.offset.reset': 'earliest'}# 创建并启动流处理应用stream_builder = create_stream()kafka_streams = KafkaStreams(stream_builder, config)try:kafka_streams.start()logger.info("流处理应用已启动")# 保持应用运行while True:time.sleep(1)except KeyboardInterrupt:logger.info("流处理应用被用户中断")finally:# 关闭流处理应用kafka_streams.close()logger.info("流处理应用已关闭")if __name__ == "__main__":run_stream_processor()
# 数据消费者示例 - 将处理后的数据写入数据库
import json
from kafka import KafkaConsumer
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import datetime
import logging# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 配置Kafka
KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']
KAFKA_TOPIC = 'processed-data-stream'# 配置数据库
DB_URL = 'postgresql://user:password@localhost:5432/stream_data'
engine = create_engine(DB_URL)
Base = declarative_base()
Session = sessionmaker(bind=engine)# 定义数据模型
class ProcessedData(Base):__tablename__ = 'processed_data'id = Column(Integer, primary_key=True)data_id = Column(String(50), index=True)type = Column(String(50))value = Column(String)processed_at = Column(DateTime)created_at = Column(DateTime, default=datetime.datetime.utcnow)# 创建表(如果不存在)
Base.metadata.create_all(engine)def consume_and_store():"""消费Kafka消息并存储到数据库"""logger.info("启动数据消费者...")# 创建Kafka消费者consumer = KafkaConsumer(KAFKA_TOPIC,bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,auto_offset_reset='earliest',group_id='database-writer-group',value_deserializer=lambda m: json.loads(m.decode('utf-8')))try:# 消费消息for message in consumer:try:# 获取消息数据data = message.value# 创建数据库会话session = Session()try:# 创建数据库记录db_record = ProcessedData(data_id=data.get('id'),type=data.get('type'),value=json.dumps(data),processed_at=datetime.datetime.fromtimestamp(int(data.get('processed_at', 0))))# 添加并提交session.add(db_record)session.commit()logger.info(f"成功存储数据到数据库 - ID: {data.get('id')}")except Exception as e:logger.error(f"存储数据到数据库失败: {e}")session.rollback()finally:session.close()except json.JSONDecodeError as e:logger.error(f"JSON解析错误: {e}")except Exception as e:logger.error(f"处理消息时出错: {e}")except KeyboardInterrupt:logger.info("消费者被用户中断")finally:# 关闭消费者连接consumer.close()logger.info("消费者已关闭")if __name__ == "__main__":consume_and_store()

难点分析

  • Kafka 集群配置:确保高可用性和数据持久性
  • 消息序列化与反序列化:处理不同格式的数据
  • 流处理语义:实现精确一次处理语义
  • 数据一致性:跨多个服务保证数据一致性
  • 监控与调优:监控 Kafka 集群性能并进行调优

扩展方向

  • 添加更多数据源和目标系统支持
  • 实现更复杂的流处理逻辑
  • 添加数据分区和负载均衡策略
  • 集成分布式追踪系统
  • 实现自动扩容和故障恢复机制
http://www.lryc.cn/news/572629.html

相关文章:

  • 腾讯云COS“私有桶”下,App如何安全获得音频调用流程
  • React Native【实战范例】弹跳动画菜单导航
  • 2025-06-20 VLC 查看视频时候是如何知道 RTP 图像包是通过 TCP 还是 UDP 协议传输的呢?
  • cusor资源管理器缩进调整与工具条竖着摆放
  • 【Java学习笔记】线程基础
  • C++实例化对象与初始化的区别:深入解析与最佳实践
  • EfficientVLA:面向视觉-语言-动作模型无训练的加速与压缩
  • 准备开始适配高德Flutter的鸿蒙版了
  • 观远ChatBI:加速零售消费企业数据驱动的敏捷决策
  • 以太坊节点搭建私链(POA)
  • 【秒杀系统设计】
  • Vue3+TypeScript+ Element Plus 从Excel文件导入数据,无后端(点击按钮,选择Excel文件,由前端解析数据)
  • 拓客软件有哪些?
  • AI Agent开发与安全
  • 企业级文档搜索系统架构设计与实践指南
  • 巧用云平台API实现开源模型免费调用的实战教程
  • 数据库从零开始:MySQL 中的 DDL 库操作详解【Linux版】
  • 从生活场景学透 JavaScript 原型与原型链
  • 链接过程使用链接器将该目标文件与其他目标文件、库文件、启动文件等链接起来生成可执行文件。附加的目标文件包括静态连接库和动态连接库。其中的启动文件是什么意思?
  • 【内存】Linux 内核优化实战 - vm.max_map_count
  • Spring AOP @AfterReturning (返回通知)的使用场景
  • MySQL 分页查询列表;Explain ;深度分页 ;管理系统,筛选系统
  • AR 眼镜之-条形码识别-实现方案
  • 【AI时代速通QT】第二节:Qt SDK 的目录介绍和第一个Qt Creator项目
  • AI人工智能与LLM大语言模型有什么区别
  • Node.js 在前端开发中的作用与 npm 的核心理解
  • 1.22Node.js 中操作 Redis
  • Kafka线上集群部署方案:从环境选型到资源规划思考
  • 源易信息:领先GEO供应商的市场布局与服务优势
  • 【生活点滴】车辆过户、新车挂牌