当前位置: 首页 > news >正文

Spark 3.1.1 遇到的 from_json regexp_replace组合表达式慢问题的解决

背景

目前公司在从spark 2.4.x升级到3.1.1的时候,遇到了一类SQL极慢的情况,该SQL的如下(只列举了关键的):

 select device_personas.* from(selectdevice_id, ads_id, from_json(regexp_replace(device_personas, '(?<=(\\{|,))"device_', '"user_device_'), ${device_schema}) as device_personasfrom input )其${device_schema} 有几百个字段

在没有调优之前 在360core 720GB内存的情况下,需要运行43分钟:
在这里插入图片描述

调优之后,资源不变的情况下,只需要运行6分钟:
在这里插入图片描述

结论

先说结论:
主要的原因是 Spark 3.1.x 引入的 org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs 新规则,该规则对于该SQL作用是裁剪了不必要的列:
导致 regexp_replace 会被调用很多次,具体的原因如该规则的解释:

if JsonToStructs(json) is shared among all fields of CreateNamedStruct. prunedSchema contains all accessed fields in original CreateNamedStruct.

所以设置 spark.sql.optimizer.enableJsonExpressionOptimization 为 false,或者设置

spark.sql.adaptive.optimizer.excludedRules	    org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs
spark.sql.optimizer.excludedRules	              org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs

跳过该规则。

分析

该SQL的物理计划如下:
在这里插入图片描述

没有跳过该规则的情况下:

该主要的物理计划为:

(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]

经过该规则的处理计划转换如下(以两个字段为例):

=== Applying Rule org.apache.spark.sql.catalyst.optimizer.OptimizeJsonExprs ===InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        InsertIntoHadoopFsRelationCommand oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227, false, Parquet, Map(coalesceNum -> 500, path -> oss://xima-bd-data3.cn-shanghai.oss-dls.aliyuncs.com/reslib/droplet/generate/data/ai-ad/102041271/1723411818435/xqldata/.staging_1691066243227), Overwrite, [device_id, ads_id, user_device_adv_age_year, user_device_child_age, ads_material_text_tag, ads_ad_pic_resolution, ctx_sound_patch_scene, ctx_position, album_category_id, album_nlp_labels_app]+- Repartition 500, true                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              +- Repartition 500, true
!   +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]      +- Project [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]+- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73)))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 +- Filter (if ((label_click#84 = 0)) (rand(7794855199306151884) >= 0.95) else true AND (NOT (isnull(device_personas#69) AND isnull(ads_personas#70)) OR NOT isnull(ctx_personas#73)))+- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                        +- Filter ((((dt#82 >= 20230710) AND (dt#82 <= 20230712)) AND NOT coalesce(appshadow#76, ) IN (2,3)) AND ((NOT (position_name#75 = sound_agg) AND isnotnull(get_json_object(ads_personas#70, $.ads_first_trade))) AND NOT coalesce(get_json_object(ads_personas#70, $.ads_business_type), -11111) IN (1,2,3)))+- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   +- Relation[device_id#62,ads_id#63,response_id#64,track_id#65,album_id#66,imp_ts#67,click_ts#68,device_personas#69,ads_personas#70,track_personas#71,album_personas#72,ctx_personas#73,label_conv#74,position_name#75,appshadow#76,play_num#77,sub_num#78,leave_num#79,pay_num#80,live_num#81,dt#82,hour#83,label_click#84] parquet

可以看到最主要的转换为:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293||\/from_json(StructField(user_device_adv_age_year,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

from_json 中的 schema 由 StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true)分开成了
StructField(user_device_adv_age_year,StringType,true)
StructField(user_device_child_age,StringType,true)单独的两个schema

那为什么会变慢呢?是因为JsonToStructs中的处理逻辑:

case class JsonToStructs(schema: DataType,options: Map[String, String],child: Expression,timeZoneId: Option[String] = None)extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypeswith NullIntolerant {...@transient lazy val parser = {val parsedOptions = new JSONOptions(options, timeZoneId.get, nameOfCorruptRecord)val mode = parsedOptions.parseModeif (mode != PermissiveMode && mode != FailFastMode) {throw new IllegalArgumentException(s"from_json() doesn't support the ${mode.name} mode. " +s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}.")}val (parserSchema, actualSchema) = nullableSchema match {case s: StructType =>ExprUtils.verifyColumnNameOfCorruptRecord(s, parsedOptions.columnNameOfCorruptRecord)(s, StructType(s.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord)))case other =>(StructType(StructField("value", other) :: Nil), other)}val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = false)val createParser = CreateJacksonParser.utf8String _new FailureSafeParser[UTF8String](input => rawParser.parse(input, createParser, identity[UTF8String]),mode,parserSchema,parsedOptions.columnNameOfCorruptRecord)}...override def nullSafeEval(json: Any): Any = {converter(parser.parse(json.asInstanceOf[UTF8String]))}

最主要关心的是 parser这个变量,因为由于上述规则的原因,两个schema单独在不同的parser中,而这里的 Child是由regexp_replace表达式组成的,所以该正则表达式会计算两次,
而由于该字段会有10多个,所以该正则表达式会被重复计算100多次(正则表达式的是比较消耗时间的)

跳过该规则的情况下

该主要的物理计划为:

(6) Project
Output [10]: [device_id#62, ads_id#63, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_material_text_tag AS ads_material_text_tag#294, from_json(StructField(ads_material_text_tag,StringType,true), StructField(ads_ad_pic_resolution,StringType,true), ads_personas#70, Some(Asia/Shanghai)).ads_ad_pic_resolution AS ads_ad_pic_resolution#295, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_sound_patch_scene AS ctx_sound_patch_scene#296, from_json(StructField(ctx_sound_patch_scene,StringType,true), StructField(ctx_position,StringType,true), ctx_personas#73, Some(Asia/Shanghai)).ctx_position AS ctx_position#297, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_category_id AS album_category_id#298, from_json(StructField(album_category_id,StringType,true), StructField(album_nlp_labels_app,StringType,true), album_personas#72, Some(Asia/Shanghai)).album_nlp_labels_app AS album_nlp_labels_app#299]
Input [6]: [device_id#62, ads_id#63, device_personas#69, ads_personas#70, album_personas#72, ctx_personas#73]

如果跳过该规则的话,那么该规则不会被应用,还是以两个字段为例,所以from_json的Schema不会变:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

其实从物理计划我们看到:其实在regexp_replace这个表达式还是会出现多次,难道不会被调用多次么?当然不会被调用多次,直接看物理计划ProjectExec

ProjectExecprotected override def doExecute(): RDD[InternalRow] = {child.execute().mapPartitionsWithIndexInternal { (index, iter) =>val project = UnsafeProjection.create(projectList, child.output)project.initialize(index)iter.map(project)}}

该方法的调用链如下:

UnsafeProjection.create||\/
InterpretedUnsafeProjection.createProjection/GenerateUnsafeProjection.generate||\/create||\/
createCode(ctx, expressions, subexpressionEliminationEnabled)||\/
ctx.generateExpressions(expressions, useSubexprElimination)||\/
subexpressionElimination

subexpressionElimination 这里主要是提取公共表达式,也就是说后续的公共表达式的计算只会被计算一次
那对应到我们的表达式为:

 Alias(GetStructField(attribute.get, i), f.name)()其中 attribute.get 为 JsonToStructs(StructType(StructField(user_device_adv_age_year,StringType,true),StructField(user_device_child_age,StringType,true)), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai))

这里的刚好能和Spark UI上显示的计划能对上:

from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_adv_age_year AS user_device_adv_age_year#292, from_json(StructField(user_device_adv_age_year,StringType,true), StructField(user_device_child_age,StringType,true), regexp_replace(device_personas#69, (?<=(\{|,))"device_, "user_device_, 1), Some(Asia/Shanghai)).user_device_child_age AS user_device_child_age#293

(主要就是调用JsonToStructs.toString的方法)

其他

  • Alias 的toString方法为:
s"$child AS $name#${exprId.id}$typeSuffix$delaySuffix" 
  • GetStructField 的toString方法为:
val fieldName = if (resolved) childSchema(ordinal).name else s"_$ordinal"
s"$child.${name.getOrElse(fieldName)}" 
  • UnresolvedStar这个类里有对 SELECT record. from (SELECT struct(a,b,c) as record …)*的解释

  • ResolveReferences 规则中的方法buildExpandedProjectList 进行 UnresolvedStar 的expand方法的调用
    这里就会解析为 Alias(GetStructField(attribute.get, i), f.name)()

  • 具体的优化规则见Optimize Json expression chain

http://www.lryc.cn/news/108783.html

相关文章:

  • Docker 容器常用的命令和操作
  • iTOP-RK3568开发板Windows 安装 RKTool 驱动
  • nginx rtmp http_flv直播推流
  • Day50 算法记录| 动态规划 17(子序列)
  • RabbitMQ:概念和安装,简单模式,工作,发布确认,交换机,死信队列,延迟队列,发布确认高级,其它知识,集群
  • 小研究 - 基于解析树的 Java Web 灰盒模糊测试(二)
  • 对于现有的分布式id发号器的思考 id生成器 雪花算法 uuid
  • jmeter中json提取器,获取多个值,并通过beanshell组成数组
  • 通过nvm工具快捷切换node.js版本、以及nvm的安装
  • 企业如何搭建矩阵内容,才能真正实现目的?
  • Arduino驱动MQ5模拟煤气气体传感器(气体传感器篇)
  • Mongodb安装(Centos7)
  • Python 批量处理JSON文件,替换某个值
  • 凯迪正大—SF6泄漏报警装置的主要特点
  • 适配器模式与装饰器模式对比分析:优雅解决软件设计中的复杂性
  • idea使用protobuf
  • 【深度学习_TensorFlow】误差函数
  • mysql按照日期分组统计数据
  • 19 | 分类模型评估指标
  • 【Pycharm2022.2.1】python编辑器最新版安装教程(包含2017-2022的所有版本win/mac/linux)
  • 深度学习-相关概念
  • 眼科医生推荐的台灯 护眼台灯买什么好?
  • 如何使用 ChatGPT 为 Midjourney 或 DALL-E 等 AI 图片生成提示词
  • 【Linux后端服务器开发】Reactor模式实现网络计算器
  • 【WebRTC---源码篇】(二:一)PeerConnection详解
  • 使用tinyxml解析和修改XML文件
  • [Docker实现测试部署CI/CD----相关服务器的安装配置(1)]
  • 【自动化运维】编写LNMP分布式剧本
  • 用Rust实现23种设计模式之单例
  • 小米平板6将推14英寸版!与MIX Fold 3同步推出