当前位置: 首页 > news >正文

pyspark 检测任务输出目录是否空,避免读取报错

前言

在跑调度任务时候,有时候子任务需要依赖前置任务的输出,但类似读取 Parquet 或者 Orc 文件时,如果不判断目录是否为空,在输出为空时会报错,所以需要 check 一下,此外Hadoop通常在写入数据时会在目录中生成一个名为_SUCCESS的文件来表示写入操作已成功完成,我们在检测时要排除这个文件

HDFS API 判断

from py4j.java_gateway import java_import
from pyspark.sql import SparkSession# 初始化SparkSession
spark = SparkSession.builder.appName("Example").getOrCreate()# 导入Hadoop FileSystem类
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')# 定义要检查的路径
FEATURE_OUTPUT_PATH = "your_path_here"# 获取Hadoop Configuration
hadoop_conf = spark._jsc.hadoopConfiguration()# 获取FileSystem对象
fs = spark._jvm.FileSystem.get(hadoop_conf)# 检查路径是否存在
path = spark._jvm.Path(FEATURE_OUTPUT_PATH)if fs.exists(path):# 获取目录下所有的文件和子目录status_list = fs.listStatus(path)non_success_files = [file_status.getPath().getName() for file_status in status_list iffile_status.getPath().getName() != "_SUCCESS"]# 检查除_SUCCESS文件外是否还有其他文件if non_success_files:# 读取Parquet文件table = spark.read.format('parquet').option('header', 'true').load(FEATURE_OUTPUT_PATH)else:print("The directory is empty or only contains a _SUCCESS file.")
else:print("The path does not exist.")

本地 Shell 判断

注意这段脚本能使用的前提是,执行的机器上已经安装和配置了 HDFS 的 shell 命令

import subprocessout=subprocess.check_output("hadoop fs -ls /tmp/file.txt",shell=True)out=out.strip()out=out.split("\n")for l in out:if l.endswith(".txt"):print "file exit"else:print "file not exit"
http://www.lryc.cn/news/179012.html

相关文章:

  • 「网页开发|前端开发|Vue」10 vuex模块化:将数据划分成不同modules分别管理
  • 苹果CMS插件-苹果CMS全套插件免费
  • 域环境介绍
  • 地球同步静止轨道上的中国卫星
  • HAProxy代理TCP(使用HAProxy 为TiDB-Server 做负载均衡)
  • 全新自适应导航网模板 导航网系统源码 网址导航系统源码 网址目录网系统源码
  • 无人直播间
  • Linux 服务器防止 ssh 暴力密码登录破解之使用 fail2ban
  • 第十四届蓝桥杯大赛软件赛决赛 C/C++ 大学 B 组 试题 D: 合并数列
  • ChatGPT必应联网功能正式上线
  • DETR中的问题汇总(代码)
  • 华为云云耀云服务器L实例评测|使用华为云耀云服务器L实例的CentOS部署Docker并运行Tomcat应用
  • Java基础---第八篇
  • (附源码)springboot体检预约APP 计算机毕设16370
  • Spring的注解开发-@Component的三个衍生注解
  • 无线WIFI工业路由器可用于楼宇自动化
  • 基于长短期神经网络铜期货价格预测,基于LSTM的铜期货价格预测,LSTM的详细原理
  • 300元开放式耳机推荐哪个、最值得入手的开放式耳机推荐
  • 嵌入式学习笔记(37) S5PV210的PWM定时器
  • python工具-base64-zip-json
  • Centos 7安装pm2 , 操作等常用命令
  • vue 实现弹出菜单,解决鼠标点击其他区域的检测问题
  • 经典网络解(三) 生成模型VAE | 自编码器、变分自编码器|有监督,无监督
  • gif怎么转换成视频MP4?
  • 标准化、逻辑回归、随机梯度参数估计
  • 【数据结构】【C++】封装哈希表模拟实现unordered_map和unordered_set容器
  • 26967-2011 一般用喷油单螺杆空气压缩机
  • Opengl之模板测试
  • iPhone苹果手机复制粘贴内容提示弹窗如何取消关闭提醒?
  • 释放潜力:人工智能对个性化学习的影响