当前位置: 首页 > news >正文

pyspark中的kafka的读和写案例操作

下面将详细讲解 PySpark 中操作 Kafka 进行数据读写的案例,包括必要的配置、代码实现和关键参数说明。

PySpark 与 Kafka 集成基础

PySpark 通过 Spark Streaming 或 Structured Streaming 与 Kafka 集成,需要引入特定的依赖包。通常使用spark-sql-kafka-0-10_2.12包,版本需要与 Spark 版本匹配。

读取 Kafka 数据(消费消息)

从 Kafka 读取数据可以分为批处理和流处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StringType, IntegerType# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaReaderExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式读取Kafka数据
def stream_read_kafka():# 配置Kafka连接参数kafka_df = spark.readStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic")  # 订阅的主题,可以是多个用逗号分隔.option("startingOffsets", "earliest")  # 从最早的偏移量开始消费.load()# Kafka返回的数据包含多个字段,我们主要关注value字段(实际消息内容)# 将二进制的value转换为字符串kafka_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")# 如果消息是JSON格式,可以进一步解析schema = StructType() \.add("id", IntegerType()) \.add("name", StringType()) \.add("age", IntegerType())parsed_df = kafka_df.select(from_json(col("value"), schema).alias("data")).select("data.*")# 输出到控制台(调试用)query = parsed_df.writeStream \.outputMode("append") \.format("console") \.start()query.awaitTermination()# 2. 批处理方式读取Kafka数据
def batch_read_kafka():kafka_df = spark.read \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("subscribe", "test_topic") \.option("startingOffsets", """{"test_topic":{"0":0}}""")  # 指定分区和偏移量.option("endingOffsets", """{"test_topic":{"0":100}}""") \.load()# 转换为字符串并展示result_df = kafka_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")result_df.show(truncate=False)if __name__ == "__main__":# 选择运行流处理或批处理# stream_read_kafka()batch_read_kafka()

写入 Kafka 数据(生产消息)

同样,写入 Kafka 也支持流处理和批处理两种方式:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_json, struct# 初始化SparkSession
spark = SparkSession.builder \.appName("KafkaWriterExample") \.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0") \.getOrCreate()# 1. 流处理方式写入Kafka
def stream_write_kafka():# 创建测试数据data = [("1", "Alice", 25), ("2", "Bob", 30), ("3", "Charlie", 35)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需的格式(必须包含key和value字段)kafka_df = df.select(col("id").alias("key"),  # key字段to_json(struct("id", "name", "age")).alias("value")  # value字段转为JSON)# 写入Kafkaquery = kafka_df.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.option("checkpointLocation", "/tmp/kafka_checkpoint")  # 流处理必须设置检查点.start()query.awaitTermination()# 2. 批处理方式写入Kafka
def batch_write_kafka():# 创建测试数据data = [("4", "David", 40), ("5", "Eve", 45)]df = spark.createDataFrame(data, ["id", "name", "age"])# 转换为Kafka所需格式kafka_df = df.select(col("id").cast("string").alias("key"),to_json(struct("id", "name", "age")).alias("value"))# 写入Kafkakafka_df.write \.format("kafka") \.option("kafka.bootstrap.servers", "localhost:9092") \.option("topic", "test_topic") \.save()if __name__ == "__main__":# 选择运行流处理或批处理# stream_write_kafka()batch_write_kafka()

关键参数说明

  1. 连接参数

    • kafka.bootstrap.servers:Kafka 集群的地址列表,格式为host:port
    • subscribe:要订阅的主题,多个主题用逗号分隔
    • topic:写入时指定的目标主题
  2. 偏移量设置

    • startingOffsets:读取的起始偏移量,earliest(最早)或latest(最新)
    • endingOffsets:批处理时的结束偏移量
  3. 数据格式

    • Kafka 中的数据以二进制形式存储,需要转换为字符串:CAST(key AS STRING)CAST(value AS STRING)
    • 写入时需要将数据转换为包含keyvalue字段的 DataFrame
  4. 流处理特殊参数

    • checkpointLocation:必须设置,用于保存流处理的状态信息
    • outputMode:输出模式,常用append(追加)

运行注意事项

  1. 确保 Kafka 服务已启动并正常运行
  2. 主题需要提前创建:kafka-topics.sh --create --topic test_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
  3. 依赖包版本需要与 Spark 版本匹配,例如 Spark 3.3.0 对应spark-sql-kafka-0-10_2.12:3.3.0
  4. 流处理程序需要手动停止,可通过query.stop()或 Ctrl+C 终止

通过以上示例,你可以实现 PySpark 与 Kafka 之间的数据交互,根据实际需求选择批处理或流处理方式。

http://www.lryc.cn/news/610647.html

相关文章:

  • RocketMq如何保证消息的顺序性
  • 基于deepSeek的流式数据自动化规则清洗案例【数据治理领域AI带来的改变】
  • SpringBoot3.x入门到精通系列:4.2 整合 Kafka 详解
  • NLP——BERT模型全面解析:从基础架构到优化演进
  • 家常菜点餐|基于java和小程序的家庭大厨家常菜点餐系统设计与实现(源码+数据库+文档)
  • 一次“无告警”的服务器宕机分析:从无迹可寻到精准定位
  • 一文掌握Bard机器翻译,以及用python调用的4种方式(现已升级为 Gemini)
  • vue3通过按钮实现横向滚动或鼠标滚动横坐标滚动
  • 用 Python 构建高质量的中文 Wikipedia 语料库:从原始 XML 到干净段落
  • 【taro react】 ---- useModel 数据双向绑定 hook 实现
  • 【乐企板式文件生成工程】关于乐企板式文件(PDF/OFD/XML)生成工程介绍
  • Taro Hooks 完整分类详解
  • wps创建编辑excel customHeight 属性不是标准 Excel Open XML导致比对异常
  • 云计算一阶段Ⅱ——11. Linux 防火墙管理
  • 《Node.js与 Elasticsearch的全文搜索架构解析》
  • Sentinel全面实战指南
  • 剑指offer第2版:字符串
  • Day34 GPU训练及类的call方法
  • Android audio之 AudioDeviceInventory
  • PCBA电子产品复制全攻略:从入门到精通
  • 【音视频】WebRTC 一对一通话-信令服
  • 强化学习_Paper_1991_Reinforcement learning is direct adaptive optimal control
  • 自然语言处理×第三卷:文本数据分析——她不再只是贴着你听,而开始学会分析你语言的结构
  • python+MySQL组合实现生成销售财务报告
  • 游戏画面总是卡顿怎么办 告别延迟畅玩游戏
  • 电脑搜索不到公司无线网络
  • 基于ARM+FPGA多通道超声信号采集与传输系统设计
  • NuGet03-私有仓库搭建
  • mac前端环境安装
  • 【ARM】CMSIS6 介绍