当前位置: 首页 > news >正文

Apache Hive用PySpark统计指定表中各字段的空值、空字符串或零值比例

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, coalesce, trim, when, lit, sum
from pyspark.sql.types import StringType, NumericType# 初始化SparkSession
spark = SparkSession.builder \.appName("Hive Data Quality Analysis") \.enableHiveSupport() \.getOrCreate()# 配置参数
database_name = "your_database"  # 替换为实际数据库名
result_list = []# 获取数据库所有表
tables = spark.catalog.listTables(database_name)for table in tables:table_name = table.namefull_table_name = f"{database_name}.{table_name}"try:# 读取Hive表df = spark.table(full_table_name)df.cache()  # 缓存以提高性能total_count = df.count()if total_count == 0:continue# 为每个字段生成统计指标for field in df.schema.fields:column_name = field.namecolumn_type = str(field.dataType)# 根据字段类型处理数据if isinstance(field.dataType, StringType):# 字符串类型处理modified_col = trim(coalesce(col(column_name), lit("")))condition = (modified_col == lit(""))count_expr = sum(when(condition, 1).otherwise(0)).alias("cnt")elif isinstance(field.dataType, NumericType):# 数值类型处理modified_col = coalesce(col(column_name), lit(0))condition = (modified_col == lit(0))count_expr = sum(when(condition, 1).otherwise(0)).alias("cnt")else:# 其他类型处理condition = col(column_name).isNull()count_expr = sum(when(condition, 1).otherwise(0)).alias("cnt")# 执行计算stats = df.agg(count_expr).collect()[0]["cnt"]percentage = round((stats / total_count) * 100, 2) if total_count > 0 else 0.0# 收集结果result_list.append((database_name,table_name,column_name,column_type,stats,total_count,float(percentage)))df.unpersist()  # 释放缓存except Exception as e:print(f"Error processing table {table_name}: {str(e)}")continue# 创建结果DataFrame
result_columns = ["database_name","table_name","column_name","column_type","stat_count","total_rows","percentage"
]result_df = spark.createDataFrame(result_list, result_columns)# 输出结果(可根据需要保存到HDFS或Hive表)
result_df.show(truncate=False)# 停止SparkSession
spark.stop()

代码说明:

  1. 初始化配置:创建SparkSession并启用Hive支持
  2. 元数据获取:通过Spark Catalog获取指定数据库的所有表
  3. 数据缓存:对每个表进行缓存以提高后续多次操作的性能
  4. 字段类型判断
    • 字符串类型:NULL转空字符串并去除空格
    • 数值类型:NULL转0
    • 其他类型:直接统计NULL值
  5. 统计计算:使用PySpark的表达式进行条件统计
  6. 结果收集:将统计结果组织成结构化数据格式
  7. 结果输出:将最终结果以表格形式展示

注意事项:

  1. 需要替换代码中的your_database为实际数据库名称
  2. 该代码会处理数据库中所有表,如需指定特定表,可修改tables的获取逻辑
  3. 结果展示方式可根据需要修改为写入Hive表或文件系统
  4. 处理大型表时建议增加分区处理逻辑以提高性能
  5. 需要确保Spark集群有足够内存来处理目标表的数据量

输出示例:

+-------------+-----------+-----------+-----------+---------+----------+----------+
|database_name|table_name |column_name|column_type|stat_count|total_rows|percentage|
+-------------+-----------+-----------+-----------+---------+----------+----------+
|your_database|customers  |name       |StringType |125      |10000     |1.25      |
|your_database|customers  |age        |IntegerType|324       |10000     |3.24      |
|your_database|orders     |order_date |DateType   |56        |5000      |1.12      |
+-------------+-----------+-----------+-----------+---------+----------+----------+
http://www.lryc.cn/news/535970.html

相关文章:

  • 高校元宇宙实训室解决方案:以技术驱动教育,用数字人链接未来
  • 提升编程效率,体验智能编程助手—豆包MarsCode一键Apply功能测评
  • 【前端开发】query参数和params参数的区别
  • 推荐系统召回算法
  • Python基础(上)
  • 【DuodooBMS】给PDF附件加“受控”水印的完整Python实现
  • 【虚幻引擎UE】UE4.23到UE5.5的核心功能变化
  • 阿里云《AI 剧本生成与动画创作》解决方案技术评测
  • commons-io 包 IOUtils、FileUtils、FilenameUtils
  • JavaScript 加密技术全面指南
  • 【笔记】deep-seek wechat项目
  • FloodFill算法——搜索算法
  • H5接入支付宝手机网站支付并实现
  • 基于SpringBoot+uniapp的在线办公小程序+LW示例参考
  • 文章精读篇——OMG-LLaVA
  • 两个同一对象targetList和 sourceList 去重
  • 软件开发 | GitHub企业版常见问题解读
  • Docker 网络的配置与管理
  • 新手自学:如何用gromacs对简单分子复合物进行伞形采样
  • 力扣第一题 哈希解法 O(n)时间复杂度
  • elementui: el-dialog的header设置样式不生效
  • libpcap 的使用
  • ArcGISPro AA表O_Name字段 内容 复制到BB表BB字段里
  • 2.5 使用注解进行单元测试详解
  • 当没有OpenGL时,Skia如何绘制?
  • SaaS+AI应用架构:业务场景、智能体、大模型、知识库、传统工具系统
  • Go 语言中如何高效地处理集合
  • 布隆过滤器到底是什么东西?它有什么用
  • 【数据结构初阶第十节】队列(详解+附源码)
  • 沪深300股指期权能对股指期货进行完全套保吗?