当前位置: 首页 > news >正文

pyspark统计指标计算

下面介绍如何使用pyspark处理计算超大数据的统计指标,主要为:最大值、最小值、均值、方差、标准差、中位数、众数、非重复值等。

# 加载稽核数据
rd_sql = f"select * from database.table"
spark_data = spark.sql(rd_sql)
# 计算众数 由于spark 2.4版本未内置相关函数 需要自定义
import pyspark.sql.functions as F
# 自定义mode的计算
def sparkdf_mode(df, cols):# 构建一个空数据框mode_df = pd.DataFrame()# 循环每一列for col in cols:# 先过滤空值filtered_df = df.filter(F.col(col).isNotNull())# 加个判断 防止数据全空置时报错if filtered_df.count()>0:# 统计出现次数 排序grouped_counts = filtered_df.groupBy(col).count().orderBy(F.col("count").desc())# 获取计数值最大的第一行first_row = grouped_counts.first()# 转sparkdfpdf = spark.createDataFrame([first_row], grouped_counts.columns).toPandas()[col]else:# 数据全空置 赋值Nonepdf = pd.DataFrame({col: [None]}) # 拼接mode_df = pd.concat([mode_df, pdf], axis=1)return mode_df
from pyspark.sql.functions import col, count, when, approx_count_distinct
# 分开统计 先统计字符类型
# 统计指标
string_stats = spark_data.select(string_cols+date_cols).summary("max","min").toPandas()
# 非空值数量
string_nonull = spark_data.select([count(when(col(c).isNotNull(), c)).alias(c) for c in (string_cols+date_cols)]).toPandas()
# 非重复值
string_unique = spark_data.agg(*[approx_count_distinct(col(c)).alias(c) for c in (string_cols+date_cols)]).toPandas()
# 众数
string_mode = sparkdf_mode(spark_data, (string_cols+date_cols))
# 添加空值占位
null_rows = pd.DataFrame(None, index=np.arange(len(string_stats), len(string_stats) + 3), columns=string_stats.columns)
string_stats = string_stats.append(null_rows)
# 上下拼接
string_data = pd.concat([string_stats.iloc[:, 1:], string_nonull, string_unique, string_mode])
print(f"string_data稽核完成")
# 统计数值类型
# 统计指标
float_stats = spark_data.select(float_cols).summary("max","min","mean","50%","stddev").toPandas()
print(f"float_stats稽核完成")
# 非空值
float_nonull = spark_data.select([count(when(col(c).isNotNull(), c)).alias(c) for c in float_cols]).toPandas()
# 非重复值
float_unique = spark_data.agg(*[approx_count_distinct(col(c)).alias(c) for c in float_cols]).toPandas()
# 众数
float_mode = sparkdf_mode(spark_data, float_cols)
# 上下拼接
float_data = pd.concat([float_stats.iloc[:, 1:], float_nonull, float_unique, float_mode])
print(f"float_data稽核完成")
# 合并转置
pdf = pd.concat([string_data, float_data], axis=1).T
# 重命名
pdf.columns = ["max", "min", "mean", "median", "std", "nonull_cnt", "unique_cnt", "mode"]
# pdf转为sdf
sdf = spark.createDataFrame(pdf)
# 创建临时视图 用于sqlAPI操作
sdf.createOrReplaceTempView("temp_view")
# 插入库表
spark.sql(f"insert overwrite table database.table select * from temp_view")
# 用完删除临时视图
spark.catalog.dropTempView("temp_view")
# 关闭spark
spark.stop()
http://www.lryc.cn/news/304560.html

相关文章:

  • 2.22号qt
  • $attrs
  • OS X(MACOS) C/C++ 遍历系统所有的IP路由表配置。
  • 人工智能_普通服务器CPU_安装清华开源人工智能AI大模型ChatGlm-6B_003---人工智能工作笔记0098
  • 基于JAVA的实验室耗材管理系统 开源项目
  • NXP实战笔记(七):S32K3xx基于RTD-SDK在S32DS上配置ICU输入捕获
  • 左右联动布局效果
  • 【工具类】vscode ssh 远程免密登录开发
  • 【Antd】Form 表单获取不到 Input 的值
  • Encoder-decoder 与Decoder-only 模型之间的使用区别
  • 【STM32备忘录】【STM32WB系列的BLE低功耗蓝牙】一、测试广播配置搜不到信号的注意事项
  • ChatGPT 是什么
  • 4款好用的ai智能写作软件,为写作排忧解难!
  • js设计模式:计算属性模式
  • 2015-2024年考研数学(一)真题练习和解析——选择题
  • Git合并固定分支的某一部分至当前分支
  • Codeforces Round 928 (Div. 4) (A-E)
  • git远程操控gitee
  • 常见面试题:TCP的四次挥手和TCP的滑动窗口
  • 力扣随笔之两数之和 Ⅱ -输入有序数组(中等167)
  • 最优传输(Optimal Transport)
  • MIT-6.824-Lab2,Raft部分笔记|Use Go
  • 使用openeuler 22.03替代CentOS 7.9,建立虚拟机详细步骤
  • 代理技术引领出海征程
  • 谷粒商城篇章9 ---- P248-P261/P292-P294 ---- 消息队列【分布式高级篇六】
  • 【Spring连载】使用Spring Data访问 MongoDB(五)----生命周期事件
  • JavaSec 之 SQL 注入简单了解
  • 第十一章——期约与异步函数
  • 工具方法合集-utils.js
  • 安卓11-设置HDMI分辨率流程