当前位置: 首页 > news >正文

Apache Hudi初探(二)(与spark的结合)

背景

目前hudi的与spark的集合还是基于spark datasource V1来的,这一点可以查看hudi的source实现就可以知道:

class DefaultSource extends RelationProviderwith SchemaRelationProviderwith CreatableRelationProviderwith DataSourceRegisterwith StreamSinkProviderwith StreamSourceProviderwith SparkAdapterSupportwith Serializable {

闲说杂谈

我们先从hudi的写数据说起(毕竟没有写哪来的读),对应的流程:

createRelation||\/
HoodieSparkSqlWriter.write

具体的代码

继续上一次Apache Hudi初探(与spark的结合)的代码:

      handleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tblName, operation, fs)val partitionColumns = SparkKeyGenUtils.getPartitionColumns(keyGenerator, toProperties(parameters))val tableMetaClient = if (tableExists) {HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(path).build()} else {...}val commitActionType = CommitUtils.getCommitActionType(operation, tableConfig.getTableType)if (hoodieConfig.getBoolean(ENABLE_ROW_WRITER) &&operation == WriteOperationType.BULK_INSERT) {val (success, commitTime: common.util.Option[String]) = bulkInsertAsRow(sqlContext, parameters, df, tblName,basePath, path, instantTime, partitionColumns, tableConfig.isTablePartitioned)return (success, commitTime, common.util.Option.empty(), common.util.Option.empty(), hoodieWriteClient.orNull, tableConfig)}
  • handleSaveModes 是对spark SaveMode和hoodie的hoodie.datasource.write.operation配置进行校验验证
    如 如果根据现有spark.sessionState.conf.resolver配置计算出来的表名(source中配置的hoodie.table.name和tableconfig获取的hoodie.table.name)不一致则报错

  • partitionColumns 获取分区字段,一般是 “field1,field2”格式

  • val tableMetaClient =
    构造tableMetaClient,如果表存在,则复用现有的,
    如果不存在则会新建,主要的是新建目录以及初始化对应的目录结构:

    • 创建.hoodie目录
    • 创建.hoodie/.schema目录
    • 创建.hoodie/archived目录
    • 创建.hoodie/.temp目录
    • 创建.hoodie/.aux目录
    • 创建.hoodie/.aux/.bootstrap目录
    • 创建.hoodie/.aux/.bootstrap/.partitions目录
    • 创建.hoodie/.aux/.bootstrap/.fileids目录
    • 创建.hoodie/hoodie.properties文件
      并向hoodie.properties写入属性值
      最终会形成如下的文件目录机构:
        hudi_result_mor/.hoodie/.auxhudi_result_mor/.hoodie/.aux/.bootstrap/.partitionshudi_result_mor/.hoodie/.aux/.bootstrap/.fileidshudi_result_mor/.hoodie/.schemahudi_result_mor/.hoodie/.temphudi_result_mor/.hoodie/archivedhudi_result_mor/.hoodie/hoodie.propertieshudi_result_mor/.hoodie/metadata
      
  • val commitActionType = CommitUtils.getCommitActionType
    这个决定了commit的类型,如果是COW表则是commit,如果是MOR表是deltacommit,这会在文件的后缀上有体现

  • bulkInsertAsRow
    如果同时满足“hoodie.datasource.write.row.writer.enable”(默认是true)和“hoodie.datasource.write.operation”是bulk_insert,则会按照spark原生的ROW格式写入数据,否则会有额外的转换操作

bulkInsertAsRow解析

由于bulkInsertAsRow是写入数据的重点,所以逐一分析:

    val sparkContext = sqlContext.sparkContextval populateMetaFields = java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))val dropPartitionColumns = parameters.get(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.key()).map(_.toBoolean).getOrElse(DataSourceWriteOptions.DROP_PARTITION_COLUMNS.defaultValue())// register classes & schemasval (structName, nameSpace) = AvroConversionUtils.getAvroRecordNameAndNamespace(tblName)sparkContext.getConf.registerKryoClasses(Array(classOf[org.apache.avro.generic.GenericData],classOf[org.apache.avro.Schema]))var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace)if (dropPartitionColumns) {schema = generateSchemaWithoutPartitionColumns(partitionColumns, schema)}validateSchemaForHoodieIsDeleted(schema)sparkContext.getConf.registerAvroSchemas(schema)log.info(s"Registered avro schema : ${schema.toString(true)}")if (parameters(INSERT_DROP_DUPS.key).toBoolean) {throw new HoodieException("Dropping duplicates with bulk_insert in row writer path is not supported yet")}
  • populateMetaFields= ,如果是True,会在每行记录中添加Hudi的元数据字段(如_hoodie_commit_time等),这在后面的bulkInsertPartitionerRows时候用到,默认是True
  • dropPartitionColumns 是否删除分区字段,默认是否,也就是会保留分区字段
  • sparkContext.getConf.registerKryoClassesGenericData和Schema使用Kyro序列化
  • var schema = AvroConversionUtils.convertStructTypeToAvroSchema 把spark sql Schema转换为Avro Schema
  • sparkContext.getConf.registerAvroSchemas 注册Avro序列化
  • “hoodie.datasource.write.insert.drop.duplicates” 不允许为True
 val params: mutable.Map[String, String] = collection.mutable.Map(parameters.toSeq: _*)params(HoodieWriteConfig.AVRO_SCHEMA_STRING.key) = schema.toStringval writeConfig = DataSourceUtils.createHoodieConfig(schema.toString, path, tblName, mapAsJavaMap(params))val bulkInsertPartitionerRows: BulkInsertPartitioner[Dataset[Row]] = if (populateMetaFields) {val userDefinedBulkInsertPartitionerOpt = DataSourceUtils.createUserDefinedBulkInsertPartitionerWithRows(writeConfig)if (userDefinedBulkInsertPartitionerOpt.isPresent) {userDefinedBulkInsertPartitionerOpt.get} else {BulkInsertInternalPartitionerWithRowsFactory.get(writeConfig.getBulkInsertSortMode, isTablePartitioned)}} else {// Sort modes are not yet supported when meta fields are disablednew NonSortPartitionerWithRows()}val arePartitionRecordsSorted = bulkInsertPartitionerRows.arePartitionRecordsSorted()params(HoodieInternalConfig.BULKINSERT_ARE_PARTITIONER_RECORDS_SORTED) = arePartitionRecordsSorted.toStringval isGlobalIndex = if (populateMetaFields) {SparkHoodieIndexFactory.isGlobalIndex(writeConfig)} else {false}
  • 注册“hoodie.avro.schema”为刚才的Avro Schema
  • val writeConfig = DataSourceUtils.createHoodieConfig
    创建hudiConfig对象,其中包括:
    • “hoodie.datasource.compaction.async.enable” 是否异步compaction,默认是true
    • 如果不是异步compaction,且满足是MOR表,则表明是同步Compaction
    • “hoodie.datasource.write.insert.drop.duplicates”如果是True(默认False),则会在插入记录的时候去重
    • 设置“hoodie.datasource.write.payload.class”,默认是“OverwriteWithLatestAvroPayload”
    • 设置“hoodie.datasource.write.precombine.field”,默认是ts字段,这个字段用在Playload的时候进行record的比较
    • 这里还会在在最后的build()步骤里设置"hoodie.index.type",如果是spark引擎,则是"SIMPLE"
  • bulkInsertPartitionerRows,默认是NonSortPartitionerWithRows,也就是原样输出,不做任何改动
  • 设置"hoodie.bulkinsert.are.partitioner.records.sorted",默认为False
  • val isGlobalIndex = 这里会根据索引类型来判断,因为默认是“SIMPLE”索引,所以是False
val hoodieDF = HoodieDatasetBulkInsertHelper.prepareForBulkInsert(df, writeConfig, bulkInsertPartitionerRows, dropPartitionColumns)if (HoodieSparkUtils.isSpark2) {hoodieDF.write.format("org.apache.hudi.internal").option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime).options(params).mode(SaveMode.Append).save()} else if (HoodieSparkUtils.isSpark3) {hoodieDF.write.format("org.apache.hudi.spark3.internal").option(DataSourceInternalWriterHelper.INSTANT_TIME_OPT_KEY, instantTime).option(HoodieInternalConfig.BULKINSERT_INPUT_DATA_SCHEMA_DDL.key, hoodieDF.schema.toDDL).options(params).mode(SaveMode.Append).save()} else {throw new HoodieException("Bulk insert using row writer is not supported with current Spark version."+ " To use row writer please switch to spark 2 or spark 3")}val syncHiveSuccess = metaSync(sqlContext.sparkSession, writeConfig, basePath, df.schema)(syncHiveSuccess, common.util.Option.ofNullable(instantTime))}
  • HoodieDatasetBulkInsertHelper.prepareForBulkInsert 这是插入数据前的准备工作

    • 如果"hoodie.populate.meta.fields"是True,则增加元数据字段:
      _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name
    • “hoodie.combine.before.insert”,是否在写入存储之前,先进行数据去重处理(按照precombine的key),默认是False
      • 默认走的是,只是加上元数据字段
      • 如果是设置为True,则会引入额外的shuffle来进行去重处理
      • 如果"hoodie.datasource.write.drop.partition.columns"为True(默认是False),去掉分区字段
  • 因为这里是Spark3 所以会进入到hoodieDF.write.format(“org.apache.hudi.spark3.internal”)
    这里后续再分析

http://www.lryc.cn/news/63329.html

相关文章:

  • 颠覆世界的“数字孪生”到底是什么?这篇文章带你搞懂全部内涵!
  • Vector底层结构和源码分析
  • 计算卸载论文阅读01-理论梳理
  • Windows 11 本地 php 开发环境搭建:PHP + Apache + MySQL +VSCode 安装和环境配置
  • 15个使用率超高的Python库,下载量均过亿
  • 所有知识付费都可以用 ChatGPT 再割一次?
  • Python中“is”和“==”的区别(避坑)
  • 20230426----重返学习-vue-router路由
  • Java字节码指令
  • Vue3之setup参数介绍
  • ESET NOD32 互联网安全软件和防毒软件 -简单,可靠的防护。
  • 试试这几个冷门但好用的软件吧
  • 【云原生】k8s NetworkPolicy 网络策略是怎么样的
  • 手把手教你用几行代码给winform多个控件(数量无上限)赋值
  • 回炉重造十一------ansible批量安装服务
  • 系统集成项目管理工程师 笔记(第20章:知识产权管理、第21章:法律法规和标准规范)
  • Channel-wise Knowledge Distillation for Dense Prediction(ICCV 2021)原理与代码解析
  • No.052<软考>《(高项)备考大全》【冲刺6】《软考之 119个工具 (4)》
  • Go | 一分钟掌握Go | 9 - 通道
  • 【建议收藏】计算机视觉是什么?这几个计算机视觉的核心任务你真的了解吗?
  • BatteryChargingSpecification1.2中文详解
  • 基于Jenkins,docker实现自动化部署(持续交互)【转】
  • 漫谈大数据 - 数据湖认知篇
  • 阿里云国际版ACE与国内版ACE区别
  • Mysql8.0 gis支持
  • 汇编---Nasm
  • NDK OpenGL渲染画面效果
  • 常见的深度学习框架
  • 【设计模式】七大设计原则--------单一职责原则
  • MySQL-中间件mycat(一)