当前位置: 首页 > news >正文

Spark读取kafka(流式和批数据)

spark读取kafka(批数据处理)

在这里插入图片描述

在这里插入图片描述

# 按照偏移量读取kafka数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# spark读取kafka
options = {# 写kafka配置信息# 指定kafka的连接的broker服务节点信息'kafka.bootstrap.servers': 'node1:9092',# 指定主题'subscribe': 'itcast',# 读取的主题不存在会自动创建# todo 注意一:连接的配置#       主题名称 ,分区编号,偏移量# 指定起始偏移量   {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}'startingOffsets':""" {"itcast":{"0":0,"1":1}} """,# 指定结束偏移量  {主题名称:{分区编号0:偏移量,分区编号1:偏移量....}}'endingOffsets':""" {"itcast":{"0":3,"1":2}}  """# 注意点  : 偏移量的区间是左闭右开 ,结束偏移的指定按照最大偏移量加一 ,所有分区都要指定
}
# 读取
# format 指定读取kafka
df = ss.read.load(format='kafka',**options)
# todo 注意二:这一步的数据处理(将value转化为字符串类型)是必须做的,不然你看不懂数据。
#       可以用df.的方式,那我后来怎么都没怎么见过了0
df_select = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp','timestampType')
# 查看df数据
# todo 注意三:这里使用.show()的方式的,是因为它是有界表
df_select.show()

在这里插入图片描述

spark读取kafka(流数据处理)

在这里插入图片描述

# 流式读取kafka数据
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()
# todo 注意一:定义kafka的连接配置
options={# 写kafka配置信息# 指定kafka的连接的broker服务节点信息'kafka.bootstrap.servers': 'node1:9092',# 指定主题'subscribe': 'itheima'  # 读取的主题不存在会自动创建
}
df = ss.readStream.load(format='kafka',**options)
# todo 注意二:必须将value转化为string类型# 计算
df_res = df.select('key',df.value.cast('string'),'topic','partition','offset','timestamp')# 输出
# todo 注意三:输出不是df_res.show,
df_res.writeStream.start(format='console',outputMode='append').awaitTermination()
http://www.lryc.cn/news/284927.html

相关文章:

  • 经典目标检测YOLO系列(二)YOLOV2的复现(1)总体网络架构及前向推理过程
  • 怎样使用崭新的硬盘
  • Kafka-多线程消费及分区设置
  • 计算机导论06-人机交互
  • hot100:07接雨水
  • Docker安装MySQL教程分享(附MySQL基础入门教程)
  • 麒麟V10挂载iso,配置yum源
  • 《Linux C编程实战》笔记:信号的捕捉和处理
  • python算法与数据结构---单调栈与实践
  • 文心一言使用分享
  • 【C++干货铺】C++11新特性——lambda表达式 | 包装器
  • 在 EggJS 中实现 Redis 上锁
  • Unity-场景
  • MATLAB R2023b for Mac 中文
  • 01 MyBatisPlus快速入门
  • HarmonyOS 应用开发入门
  • 【机器学习300问】9、梯度下降是用来干嘛的?
  • 第13章 1 进程和线程
  • 什么是中间件?
  • 汽车售后服务客户满意度调查报告
  • 初始RabbitMQ(入门篇)
  • JVM:Java类加载机制
  • 要经历痛苦,才能在赚钱路上觉醒!
  • LeetCode 第381场周赛个人题解
  • 数据结构之二叉树的性质与存储结构
  • 机器视觉检测设备在连接器外观缺陷检测中的应用
  • ChatGPT vs 文心一言(AI助手全面比较)
  • MSPM0L1306例程学习-UART部分(2)
  • Baichuan2百川模型部署的bug汇总
  • ChatGPT 如何解决 “Something went wrong. lf this issue persists ….” 错误