当前位置: 首页 > news >正文

用pyspark把kafka主题数据经过etl导入另一个主题中的有关报错

首先看一下我们的示例代码

import os
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
"""
------------------------------------------Description : TODO:SourceFile : etl_stream_kafkaAuthor  : zxxDate  : 2024/11/14
-------------------------------------------
"""
if __name__ == '__main__':os.environ['JAVA_HOME'] = 'D:/bigdata/03-java/java-8/jdk'# 配置Hadoop的路径,就是前面解压的那个路径os.environ['HADOOP_HOME'] = 'D:/bigdata/04-Hadoop/hadoop/hadoop-3.3.1/hadoop-3.3.1'# 配置base环境Python解析器的路径os.environ['PYSPARK_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'  # 配置base环境Python解析器的路径os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/bigdata/22-spark/Miniconda3/python.exe'spark = SparkSession.builder.master("local[2]").appName("etl_stream_kafka").config("spark.sql.shuffle.partitions", 2).getOrCreate()# 连接kafkareadDF = spark.readStream.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("subscribe", "topicA") \.load()# 使用DSL语句etlDF = readDF.selectExpr("cast(value as STRING)").filter(F.col("value").contains("success"))etlDF.writeStream \.format("kafka") \.option("kafka.bootstrap.servers", "bigdata01:9092") \.option("topic", "etlTopic") \.option("checkpointLocation", "../../datas/kafka_stream") \.start().awaitTermination()# 关闭spark.stop()

运行发现报错

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Traceback (most recent call last):File "D:\bigdata\18-python\pyspark_project\pythonProject1\main\streamingkafka\etl_stream_kafka.py", line 22, in <module>readDF = spark.readStream.format("kafka") \File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\streaming.py", line 482, in loadreturn self._df(self._jreader.load())File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\py4j\java_gateway.py", line 1304, in __call__return_value = get_return_value(File "D:\bigdata\22-spark\Miniconda3\lib\site-packages\pyspark\sql\utils.py", line 117, in decoraise converted from None
pyspark.sql.utils.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

报错 : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;

解决:这个是因为缺少了Kafka和Spark的集成包,前往https://mvnrepository.com/artifact/org.apache.spark

下载对应的jar包即可,比如我是SparkSql写入的Kafka,那么我就需要下载Spark-Sql-Kafka.x.x.x.jar

 进入网站(已打包放入文章末尾)

找到对应有关spark 和kafka的模块

找到对应的版本 ,这里我用的kafka是3.0版本,下载的是3.1.2版本

 点进去,下载jar包

 再次运行会发现仍然报错,这是因为jar包之间的依赖关系,从刚才下载的界面下面再下载有关的jar包

 

 

 

 再次运行即可

 jar包下载链接

【免费】用pyspark把数据从kafka的一个主题用流处理后再导入kafka的另一个主题的有关报错资源-CSDN文库

http://www.lryc.cn/news/488325.html

相关文章:

  • Redis的过期删除策略和内存淘汰机制以及如何保证双写的一致性
  • 异常处理:import cv2时候报错No module named ‘numpy.core.multiarray‘
  • C++手写PCD文件
  • 优选算法(双指针)
  • 【保姆级】Mac上IDEA卡顿优化
  • python实战案例----使用 PyQt5 构建简单的 HTTP 接口测试工具
  • pytest 接口串联场景
  • Springboot项目搭建(2)-用户详细信息查询
  • Stable Diffusion的加噪和去噪详解
  • 解决 Gradle 报错:`Plugin with id ‘maven‘ not found` 在 SDK 开发中的问题
  • EMD-KPCA-Transformer多变量回归预测!分解+降维+预测!多重创新!直接写核心!
  • 前端 px、rpx、em、rem、vh、vw计量单位的区别
  • OceanBase数据库产品与工具介绍
  • 学习threejs,对模型多个动画切换展示
  • 【Bug合集】——Java大小写引起传参失败,获取值为null的解决方案
  • Python爬虫:如何从1688阿里巴巴获取公司信息
  • 单片机学习笔记 2. LED灯闪烁
  • 折叠光腔衰荡高反射率测量技术的matlab模拟理论分析
  • ubuntu 16.04 中 VS2019 跨平台开发环境配置
  • C语言第13节:指针(3)
  • java:简单小练习,面积
  • @Autowired 和 @Resource思考(注入redisTemplate时发现一些奇怪的现象)
  • PostgreSQL提取JSON格式的数据(包含提取list指定索引数据)
  • 如何利用谷歌浏览器提高网络安全
  • go-zero(四) 错误处理(统一响应信息)
  • 1.1 爬虫的一些知识(大模型提供语料)
  • Linux开发工具:Vim 与 gcc,打造高效编程的魔法双剑
  • cesium for unity的使用
  • Android AOSP 架构和各层次开发内容介绍
  • Kafka 到 Kafka 数据同步