当前位置: 首页 > news >正文

pyspark大规模数据加解密优化实践

假如有1亿行数据

方法1 spark udf解密

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyDes import *
import binasciispark=SparkSession.builder.getOrCreate()def dec_fun(text):key = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, b"XXXXXXXX", padmode=PAD_PKCS5)if not text:return Nonereturn k.decrypt(base64.b64decode(text)).decode('utf-8')
spark.udf.register("dec_fun",dec_fun)df.withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密钥初始化1亿次
  • 每条数据触发一次JVM → Python进程的数据传输
  • 每次传输需序列化/反序列化数据(性能杀手)
  • 高频进程间通信(IPC)产生巨大开销

方法2 repartition+mapPartition

为了提高效率,我们可以利用mapPartitions在每个分区内部只初始化一次解密对象,避免重复初始化。

def dec_fun(text):if not text:return Noneelse:result = base64.b64decode(text)k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)d = k.decrypt(result)return d.decode("utf-8")spark.udf.register("dec_fun", dec_fun)# 分区解密
def rdd_decrypt(partitionData):for row in partitionData:try:yield [dec_fun(row.zj_no)]except:passdf.select("en_col").repartition(30).rdd.mapPartitions(rdd_decrypt).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")

密钥初始化依然是1亿次,这个代码写的不好,应该每个分区初始化1次而不是每行。但相对方法1依然有答复性能提升,

  • mapPartitions
    • 分区级批量传输:每个分区一次性从JVM发送到Python进程(例如1个分区10万条数据,仅1次传输)
    • 几个分区就调用几次函数(对比1亿次的UDF调用)

方法3 repartition+mapPartition+分区1次初始化

def dec_fun(partitionData):k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)for row in partitionData:try:if row.zj_no:result = base64.b64decode(row.zj_no)d = k.decrypt(result)yield [d.decode("utf-8")]else:continueexcept:pass# 如果要保留原始一大堆列,更麻烦
df.select("en_col").repartition(20).rdd.mapPartitions(dec_fun).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
  • 密钥初始化数=分区数
  • 通过repartition(20)合理调整分区
  • 利用RDD底层优化

方法4 scalar pandas_udf

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series: pd.Series) -> pd.Series:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passreturn series.apply(_decrypt)df.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")

🌟 优势

  • 利用Apache Arrow高效内存传输
  • Pandas向量化操作潜力
  • 与DataFrame API无缝集成

⚠️ 局限

  • 密钥初始化数=batch数
  • 大分区内存压力大

方法5 迭代器型pandas_udf

我们可以使用迭代器类型的Pandas UDF,在每次处理一个迭代器(一个迭代器对应一个batch)时只初始化一次密钥对象:

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passfor series in series_iter:de_series = series.apply(_decrypt)yield de_seriesdf.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密钥初始化数=分区数
  • 内存友好的批处理流
  • 向量化
  • Arrow优化零拷贝传输
  • 完美平衡初始化开销和并行效率

综合

方法5>(方法4,方法3)>方法2>方法1

方法4,方法3这两者,我也倾向方法4

http://www.lryc.cn/news/580354.html

相关文章:

  • Python小工具之PDF合并
  • 数据结构:多维数组在内存中的映射(Address Mapping of Multi-dimensional Arrays)
  • IDEA中application.yml配置文件不自动提示解决办法
  • 如何在IntelliJ IDEA中设置数据库连接全局共享
  • 从“电话催维修“到“手机看进度“——售后服务系统开发如何重构客户体验
  • CppCon 2018 学习:Surprises In Object Lifetime
  • Kotlin 协程:Channel 与 Flow 深度对比及 Channel 使用指南
  • 【ES6】Latex总结笔记生成器(网页版)
  • Jenkins Pipeline(二)
  • 【Elasticsearch】深度分页及其替代方案
  • 【openp2p】 学习2:源码阅读P2PNetwork和P2PTunnel
  • 【STM32实践篇】:GPIO 详解
  • 网络资源模板--基于Android Studio 实现的极简天气App
  • Excel 数据透视表不够用时,如何处理来自多个数据源的数据?
  • 动手实践OpenHands系列学习笔记1:Docker基础与OpenHands容器结构
  • Softhub软件下载站实战开发(十三):软件管理前端分片上传实现
  • 用户中心Vue3网页开发(1.0版)
  • Java零基础笔记01(JKD及开发工具IDEA安装配置)
  • Linux进程管理:从基础到实战
  • 60天python训练计划----day59
  • 数据结构:数组:插入操作(Insert)与删除操作(Delete)
  • 深度学习4(浅层神经网络)
  • 【深度学习】神经网络剪枝方法的分类
  • 由coalesce(1)OOM引发的coalesce和repartition理解
  • C++ 网络编程(15) 利用asio协程搭建异步服务器
  • Linux——进程(下)
  • android studio 配置硬件加速 haxm
  • spring中 方法上@Transation实现原理
  • C++20中的counting_semaphore的应用
  • C++ 模板参数匹配、特化