当前位置: 首页 > article >正文

Iceberg写入过程

1.Iceberg结构基础

1.1.文件结构

  大框架上,Iceberg的文件组织形式与Hive类似,都是HDFS的目录,在warehouse下以/db/table的形式组建结构。
  不同的是,Iceberg是纯文件的,元数据也存储在HDFS上,并做到了文件级别的元数据组织。
  在/db/table的目录结构下,有两个目录:metadata和data,用于存储元数据和数据。
  data下存储数据,比较单一,只有一类文件,默认以Parquet形式存储。
  metadata下存储元数据,分三层:metadata file、manifest list、manifest file。元数据目前有两个版本:V1Metadata、V2Metadata,创建table时带参数设置:

CREATE TABLE tl(id BIGINT) WITH ('format-version'='2')

  也可以后续update

ALTER TABLE tl SET('format-version'='2')

  metadata下也对应有三类文件:metadata.json、m0.avro、snap-.avro。
  metadata.json就是元数据文件,snap-
.avro应该是manifest list,*m0.avro就是manifest file。整个查询关系是:metadata指定manifest list,manifest list指定manifest file,manifest file指定data file
  manifest list主要的目的是为了复用历史的manifest file,一个manifest file可以被多个manifest list索引,避免重复
  metadata file:元数据文件,从一个metadata file就可以构建出一个完整的Iceberg Table,包括表结构、分区字段、所有快照等等信息,每次对Iceberg Table做一次操作,都会产生一个新的metadata file文件。
  manifest list文件向上对应的是一个快照,向下对应的是若干个manifest file文件。
  manifest file文件是对一系列数据文件的索引,保存了每个数据文件的路径等信息,也就是在这里,Iceberg对文件的组织精确到了文件级别。
  snapshot:table提交后的一个快照,每一个snapshot由一个Manifest list文件组成,metadata.json 存储了历史所有snapshot信息,对应manifest list

1.1.1.注意点

  HiveCatalog和HadoopCatalog的元数据文件名有点不一样
  Hadoop是v4.metadata.json这种形式,改的只有前面的版本号
  Hive是00001-d97b1624-98e5-4974-9fd8-883d05069ae5.metadata.json,除里最前面的版本号,还有中间的UUID
  原因应该是出于commit元数据做一致性检查的目的

1.2.类结构

1.2.1.TableMetadata

  表的元数据规范类,Iceberg的功能就是以统一的格式组建数据,然后可以根据格式变成表,元数据是表的核心。

static TableMetadata newTableMetadata(Schema schema,PartitionSpec spec,SortOrder sortOrder,String location,Map<String, String> properties,int formatVersion) {

  Schema:Schema主要是对列信息的维护,包括了列名、字段类型等等
  PartitionSpec:分区信息,分区是由列转化而来的。这里注意隐藏分区,即不是用户手动指定的那种,而是根据列自行推导设置的,隐藏分区字段来源如下,目前好像只有Spark支持
在这里插入图片描述

  SortOrder:排序方式,定义表中数据和文件的排序方式
  location:对应数据文件的存储路径
  formatVersion:前面提到过的format-version设置的元数据版本
  最终的构造函数如下,相应的成员参数更多

TableMetadata(String metadataFileLocation,int formatVersion,String uuid,String location,long lastSequenceNumber,long lastUpdatedMillis,int lastColumnId,int currentSchemaId,List<Schema> schemas,int defaultSpecId,List<PartitionSpec> specs,int lastAssignedPartitionId,int defaultSortOrderId,List<SortOrder> sortOrders,Map<String, String> properties,long currentSnapshotId,List<Snapshot> snapshots,List<HistoryEntry> snapshotLog,List<MetadataLogEntry> previousFiles,Map<String, SnapshotRef> refs,List<StatisticsFile> statisticsFiles,List<MetadataUpdate> changes) {

1.2.2.Table

  Table包含了一些对元数据的操作,此外最重要的就是对Table的操作的构建,比如Scan等操作
在这里插入图片描述

1.2.3.TableOperations

  TableOperations是实际操作的类,主要就是与Catalog的交互操作。不同的Catalog有自己的实现,HiveCatalog提供的实现是HiveTableOperations
  核心的方法主要有
  current() 通过 Catalog 加载出当前表的 metadata 数据
  commit() 数据写入完成后提交当前表,也就是生成一个 snapshot
  io() 表示当前表的底层存储介质,比如 HDFS, AWS

1.2.4.Catalog

  Catalog 通常用来保存和查找表的元数据。Iceberg表的元数据主要存储在文件系统上,因此要存储的内容相比Hive要轻量很多。Iceberg的catalog主要有以下作用
  metadata文件地址
  表名的存储,可以通过表名获取到表的 metadata 文件地址
  当引擎层需要用到表的元数据时便会通过catalog进行加载,各个引擎都定义了自己的 catalog规范(接口) ,同时也将catalog进行了插件化,Iceberg为了和引擎层进行对接实现了引擎层定义的接口,如Flink Catalog/Spark Catalog

2.创建阶段——Flink为例

2.1.创建IcebergTableSink

  通过Flink SQL对Iceberg进行操作,整体走Flink的SQL解析流程,在流程中的translateToRel这一步,会获取TableSink,就需要实际调用到Iceberg的实现类了
  TableSink的创建基于工厂类DynamicTableSinkFactory,与Catalog一样,从类路径发现DynamicTableSinkFactory的子类,然后调用对应的create方法

final DynamicTableSinkFactory factory =preferredFactory != null? preferredFactory: discoverTableFactory(DynamicTableSinkFactory.class, context);return factory.createDynamicTableSink(context);

  Iceberg侧的实现类是FlinkDynamicTableFactory,其中创建了DynamicTableSink的Iceberg实现子类IcebergTableSink。接口中还有一个重要的内容就是创建TableLoader,TableLoader是后续加载Table的核心

TableLoader tableLoader;
if (catalog != null) {tableLoader = createTableLoader(catalog, objectPath);
} else {tableLoader =createTableLoader(catalogTable, writeProps, objectPath.getDatabaseName(), objectPath.getObjectName());
}return new IcebergTableSink(tableLoader, tableSchema, context.getConfiguration(), writeProps);

2.2.consumeDataStream

  Flink的流程继续走到translateToPlanInternal,这里会转换Transformation,在CommonExecSink的createSinkTransformation接口当中,有一步创建

SinkRuntimeProvider
final SinkRuntimeProvider runtimeProvider =tableSink.getSinkRuntimeProvider(new SinkRuntimeProviderContext(isBounded));

  这里的tableSink就是上面的IcebergTableSink,流程继续往后,就到了调用SinkRuntimeProvider的consumeDataStream,实现类就是IcebergTableSink里的内容,这里进行一系列FlinkSink的操作

return new DataStreamSinkProvider() {public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {return FlinkSink.forRowData(dataStream).tableLoader(tableLoader).tableSchema(tableSchema).equalityFieldColumns(equalityColumns).setAll(writeProps).overwrite(overwrite).flinkConf(readableConfig).append();}
};

  这里最重要的就是FlinkSink的append方法,内部构建了Iceberg最终要的两个操作实现类,写数据文件和提交元数据

2.3.chainIcebergOperators

  append方法最终调用到的接口是chainIcebergOperators

2.3.1.加载Table

  第一步是加载Table,通过TableLoader加载

tableLoader.open();
try (TableLoader loader = tableLoader) {this.table = loader.loadTable();
} catch (IOException e) {

2.3.2.equalityFieldColumns

  主要对于UPSERT,需要有equalityFieldColumns做比较
来源是在构建FlinkSink时的设置,基于主键生成

List<String> equalityColumns =
tableSchema.getPrimaryKey().map(UniqueConstraint::getColumns).orElseGet(ImmutableList::of);

2.3.3.数据类型设置

  是在Flink上的转换,把Flink的schema转换为RowType根据设置,对输入流做重分配。
  分配模式有NONE、HASH、RANGE。对于NONE,如果存在主键,会基于主键做重分配;HASH和RANGE要求必须存在主键
  分配模式的配置优先级如下:

String modeName =confParser.stringConf().option(FlinkWriteOptions.DISTRIBUTION_MODE.key()).flinkConfig(FlinkWriteOptions.DISTRIBUTION_MODE).tableProperty(TableProperties.WRITE_DISTRIBUTION_MODE).defaultValue(TableProperties.WRITE_DISTRIBUTION_MODE_NONE).parse();

2.3.4.IcebergStreamWriter

  IcebergStreamWriter是写文件的实现类,通过transform,转换成Flink的Operator

IcebergStreamWriter<RowData> streamWriter =createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream =input.transform(operatorName(ICEBERG_STREAM_WRITER_NAME),TypeInformation.of(WriteResult.class),streamWriter).setParallelism(parallelism);

  这里的输出WriteResult就是搜集的本次写文件的文件更改列表,这个输出会作为下一个算子的输入,也就是commit元数据提交的输入

2.3.5.IcebergFilesCommitter

IcebergFilesCommitter是元数据提交的实现类,与IcebergStreamWriter一样,会转成Operator
IcebergFilesCommitter filesCommitter =new IcebergFilesCommitter(tableLoader,flinkWriteConf.overwriteMode(),snapshotProperties,flinkWriteConf.workerPoolSize());
SingleOutputStreamOperator<Void> committerStream =writerStream.transform(operatorName(ICEBERG_FILES_COMMITTER_NAME), Types.VOID, filesCommitter).setParallelism(1).setMaxParallelism(1);

2.3.6.DiscardingSink

  前两个都是操作,Flink的整体流程必须有一个Sink,所以这里在最后添加了一个DiscardingSink,这是不做任何操作的一个Sink

DataStreamSink<T> resultStream =committerStream.addSink(new DiscardingSink()).name(operatorName(String.format("IcebergSink %s", this.table.name()))).setParallelism(1);

3.执行阶段——Flink为例

  上一章通过Flink的SQL解析流程,完成了这个计划创建,之后就可以执行了
  前面提到创建过程核心是两个类IcebergStreamWriter和IcebergFilesCommitter,这代表了Iceberg写入的两阶段过程:先进行文件写入,此时是临时写入,用户不可见;然后进行元数据写入,写入完成用户可见

3.1.作业拆分和依赖

  使用Flink执行insert命令基于Iceberg写入数据,在Flink上,任务会被拆成两个子任务,分别对应数据文件写入和元数据提交,对应的类是IcebergStreamWriter和IcebergFilesCommitter
在这里插入图片描述

  元数据的写入依赖于Flink的checkpoint,由checkpoint触发写入,所以使用Iceberg写入数据默认开启了checkpoint功能
  默认是At Least Once,手动配置变为Exactly Once,性能差距很大
在这里插入图片描述

3.2.IcebergStreamWriter

  数据从Source流入,经过Iceberg,Iceberg会进行DataStream转换,算子替换为IcebergStreamWriter,输出为WriteResult

IcebergStreamWriter<RowData> streamWriter =createStreamWriter(table, flinkWriteConf, flinkRowType, equalityFieldIds);int parallelism = writeParallelism == null ? input.getParallelism() : writeParallelism;
SingleOutputStreamOperator<WriteResult> writerStream =input.transform(operatorName(ICEBERG_STREAM_WRITER_NAME),TypeInformation.of(WriteResult.class),streamWriter).setParallelism(parallelism);

  IcebergStreamWriter实现了Flink的operators,以Operator的方式运行

class IcebergStreamWriter<T> extends AbstractStreamOperator<WriteResult>implements OneInputStreamOperator<T, WriteResult>, BoundedOneInput {

3.2.1.open

  open是初始化,这里最重要的是创建一个TaskWriter,TaskWriter是后面执行数据处理使用的
TaskWriter的创建由工厂类TaskWriterFactory承接,工厂类在FlinkSink中设定,为RowDataTaskWriterFactory
  RowDataTaskWriterFactory的create方法负责创建,根据使用场景不同,分为四种不同类型的TaskWriter:UnpartitionedWriter、RowDataPartitionedFanoutWriter、UnpartitionedDeltaWriter、PartitionedDeltaWriter
  四个实现类的区分条件是:是否包含主键和是否支持分区。前两个不包含主键,只能支持Insert追加,后两个支持主键,可以进行INSERT和equality DELETE(应该就是说支持Upsert的意思)

3.2.2.processElement

  处理接口,接收上游数据并处理,直接调用TaskWriter的write方法
  以最简单的UnpartitionedWriter为例,最终调用BaseTaskWriter的write

public void write(T record) throws IOException {write(currentWriter, record);this.currentRows++;if (shouldRollToNewFile()) {closeCurrent();openCurrent();}
}

  最终的write会调用到FileAppender,是真正的底层写的类,会根据文件格式(Parquet、ORC、AVRO)创建对应的文件读写器,具体创建在工厂类FlinkAppenderFactory
  shouldRollToNewFile这边根据情况会回滚新文件

private boolean shouldRollToNewFile() {return currentRows % ROWS_DIVISOR == 0 && length(currentWriter) >= targetFileSize;
}

  DIVISOR是1000,targetFileSize的来源如下,相应的参数为target-file-size-bytes、write.target-file-size-bytes

public long targetDataFileSize() {return confParser.longConf().option(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES.key()).flinkConfig(FlinkWriteOptions.TARGET_FILE_SIZE_BYTES).tableProperty(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES).defaultValue(TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT).parse();
}

  这里注意currentFile和currentWriter会在文件回滚以后重置,currentWriter就是封装了之前说的FileAppender,实际进行写操作的类;FileAppender根据规则产生文件名并封装

3.2.3.flush

  flush不是直接在流程里的接口,由prepareSnapshotPreBarrier和endInput触发调用,前者是checkpoint触发,后者是数据输入结束的标志(默认At least once结束触发一次)
  flush做的最重要的事就是产生WriteResult并转发下游,WriteResult包含的就是本批写入的文件信息

return WriteResult.builder().addDataFiles(completedDataFiles).addDeleteFiles(completedDeleteFiles).addReferencedDataFiles(referencedDataFiles).build();

  completedDataFiles和completedDeleteFiles是上一个接口中closeCurrent接口调用complete添加的,不同的文件实现调用会分别增加,实现类是RollingFileWriter和RollingEqDeleteWriter

3.3.IcebergFilesCommitter

  本质上也是Flink的Operator实现,与IcebergStreamWriter实现了相同的类

class IcebergFilesCommitter extends AbstractStreamOperator<Void>implements OneInputStreamOperator<WriteResult, Void>, BoundedOneInput {

3.3.1.initializeState

  需要做状态存储的算子需要实现的方法,也就是说这个里面是做state处理的。因为IcebergFilesCommitter涉及了Iceberg写入状态的完成,所以需要自行处理状态
  初始化主要就是获取一些状态信息和创建一些需要的对象(本质应该是做State的恢复的),最后有一步比较重要的操作,就是把未提交状态的commit提交完成

this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
NavigableMap<Long, byte[]> uncommittedDataFiles =Maps.newTreeMap(checkpointsState.get().iterator().next()).tailMap(maxCommittedCheckpointId, false);
if (!uncommittedDataFiles.isEmpty()) {// Committed all uncommitted data files from the old flink job to iceberg table.long maxUncommittedCheckpointId = uncommittedDataFiles.lastKey();commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
}

3.3.2.snapshotState

  snapshotState是做状态更新的(也就是持久化State),不写也可以,框架会有默认处理,自定义实现是为了实现一些自己需要的处理细节

3.3.3.notifyCheckpointComplete

  State和checkpoint是两个东西,一个临时的状态存储,一个是对State的进一步持久化存储。notifyCheckpointComplete的作用是在checkpoint完成后进行一些自定义的操作
  此处是当完成的checkpoint比缓存的最后的checkpoint大时,进行commit处理

if (checkpointId > maxCommittedCheckpointId) {commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);this.maxCommittedCheckpointId = checkpointId;
}

  commitUpToCheckpoint里面是对ManifestFile文件的一些操作,最终的提交根据是否分区等的不同有不同的实现,最终接口是PendingUpdate.commit,实际的操作根据TableOperations不同有不同的实现,HDFS是HadoopTableOperations,Hive是HiveTableOperations,核心就是写元数据文件
  write.metadata.compression-codec可以设置元数据文件的压缩格式,默认none
  这里最终写的是json格式的最上层元数据,最后会以json格式写入

3.3.4.processElement

  这里是把上个算子(IcebergStreamWriter)传来的输出加入writeResultsOfCurrentCkpt,writeResultsOfCurrentCkpt会在snapshotState等接口当中处理
  snapshotState和endInput当中调用writeToManifest,处理了writeResultsOfCurrentCkpt,这里写的是那个后缀为avro的文件

WriteResult result = WriteResult.builder().addAll(writeResultsOfCurrentCkpt).build();
DeltaManifests deltaManifests =FlinkManifestUtil.writeCompletedFiles(result, () -> manifestOutputFileFactory.create(checkpointId), table.spec());return SimpleVersionedSerialization.writeVersionAndSerialize(DeltaManifestsSerializer.INSTANCE, deltaManifests);

3.3.5.endInput

  数据结束,一样需要进行最后一次的提交

3.3.6.open

  构建了一个线程池,写元数据文件的操作会委托给这里创建的线程池处理

final String operatorID = getRuntimeContext().getOperatorUniqueID();
this.workerPool =ThreadPools.newWorkerPool("iceberg-worker-pool-" + operatorID, workerPoolSize);

3.3.7.元数据提交的基本流程

  processElement接口处理流式数据,将元数据信息存入内存

public void processElement(StreamRecord<WriteResult> element) {this.writeResultsOfCurrentCkpt.add(element.getValue());
}

  snapshotState接口将上文内存中的元数据写出到Manifest,相当于两阶段提交的第一阶段

dataFilesPerCheckpoint.put(checkpointId, writeToManifest(checkpointId));

  notifyCheckpointComplete接口进行最终元数据的commit提交,最终在commitOperation接口当中

// custom snapshot metadata properties will be overridden if they conflict with internal ones
// used by the sink.
operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
operation.set(FLINK_JOB_ID, newFlinkJobId);long startNano = System.nanoTime();
operation.commit(); // abort is automatically called if this fails.

4.提交冲突

  Iceberg支持并发提交元数据,但是可能发生提交冲突的问题

4.1.HiveTableOperations

  与IcebergFilesCommitter的调用联系是:IcebergFilesCommitter -> TableOperations -> BaseMetastoreTableOperations -> HiveTableOperations.doCommit
  高并发基于Iceberg写数据时,会产生报错

org.apache.iceberg.exceptions.CommitFailedException: Base metadata location 'hdfs://nameservice/hive/icedb.db/icetable/metadata/00245-64ad6084-4e23-4b84-8260-8b95d1559a18.metadata.json' is not same as the current table metadata location 'hdfs://nameservice/hive/icedb.db/icetable/metadata/00246-fb4cec60-22b3-4325-845f-0b5c67b7b714.metadata.json' for icedb.icetable

  从IcebergFilesCommitter看,有一个分支是构建RowDelta,其有一个子类BaseRowDelta,继承自SnapshotProducer,看其commit接口
  追踪SnapshotProducer的base来源(即本次操作基于的元数据),base的获取在SnapshotProducer有current和refresh,最终都调用到HiveTableOperations的doRefresh

4.1.1.commit重试

  commit存在重试机制,在SnapshotProducer的commit接口中,入口处设置了重试参数,默认重试次数4

Tasks.foreach(ops).retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT)).exponentialBackoff(base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),2.0 /* exponential */).onlyRetryOn(CommitFailedException.class)

4.1.2.元数据地址获取

  使用HiveCatalog创建Iceberg表时,会在Hive的TBLS当中插入一张表
在这里插入图片描述

  同时TABLE_PARAMS表中会有表的相关信息
在这里插入图片描述

  再插入数据,表中的信息会更新,最关键的是元数据地址信息:previous_metadata_location和metadata_location
在这里插入图片描述

  获取当前元数据就是去上面的TABLE_PARAMS表获取metadata_location信息

table = metaClients.run(client -> client.getTable(catalogName, database, tableName));
metadataLocation = table.getParameters().get(METADATA_LOCATION_PROP);

  获取到的metadataLocation 会被设置为currentMetadata,也就是base

4.1.3.锁

  进行操作前会进行对表的进程级别锁定,首先是进行线程级别锁定

ReentrantLock tableLevelMutex = commitLockCache.get(fullName, t -> new ReentrantLock(true));
tableLevelMutex.lock();

  之后是进行进程级别的锁定

HiveLockHeartbeat hiveLockHeartbeat = null;
try {lockId = Optional.of(acquireLock());hiveLockHeartbeat =new HiveLockHeartbeat(metaClients, lockId.get(), lockHeartbeatIntervalTime);hiveLockHeartbeat.schedule(exitingScheduledExecutorService);

  进程级别的锁是基于Hive提供的表锁进行的,这里获取的是独占锁

final LockComponent lockComponent =new LockComponent(LockType.EXCLUSIVE, LockLevel.TABLE, database);
lockComponent.setTablename(tableName);
final LockRequest lockRequest =new LockRequest(Lists.newArrayList(lockComponent),System.getProperty("user.name"),InetAddress.getLocalHost().getHostName());
LockResponse lockResponse = metaClients.run(client -> client.lock(lockRequest));

  Hive的锁是根据状态判断的,所以这里会不断地判断并更新状态,直到获取锁的请求超时。后面的HiveLockHeartbeat里调用heartbeat接口向Hive发送心跳,表明锁的持有客户端还是存活状态。

4.1.4.提交操作

  首先就是确定新的元数据文件地址,就是版本号+1,后面产生随机数
  这一步是在锁前面进行的

String newMetadataLocation =base == null && metadata.metadataFileLocation() != null? metadata.metadataFileLocation(): writeNewMetadata(metadata, currentVersion() + 1);

  之后的操作都是在锁定以后操作的
调用Hms接口去加载表,获取metadata_location信息,与base的metadata_location信息比较。此时如果已经有其他进程进行过元数据更新操作,这里就会失败

String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
if (!Objects.equals(baseMetadataLocation, metadataLocation)) {throw new CommitFailedException("Base metadata location '%s' is not same as the current table metadata location '%s' for %s.%s",baseMetadataLocation, metadataLocation, database, tableName);
}

  如果没有产生冲突,就会更新表的信息

Map<String, String> summary =Optional.ofNullable(metadata.currentSnapshot()).map(Snapshot::summary).orElseGet(ImmutableMap::of);
setHmsTableParameters(newMetadataLocation, tbl, metadata, removedProps, hiveEngineEnabled, summary);

  最后将更新过的表信息提交Hive,通过alterTable或createTable接口

4.2.HadoopTableOperations

  Hadoop的实现是会有一个version-hint.text文件,记录了当前的元数据版本号

Pair<Integer, TableMetadata> current = versionAndMetadata();

  之后会将元数据先写入一个临时文件

String codecName =metadata.property(TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
String fileExtension = TableMetadataParser.getFileExtension(codec);
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));

  然后在将写完的文件重命名,在重命名时,如果重命名失败了,会删除之前的临时文件并抛出异常

int nextVersion = (current.first() != null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
// this rename operation is the atomic commit operation
renameToFinal(fs, tempMetadataFile, finalMetadataFile, nextVersion);

  任务本次commit时的元数据目录名在锁定时已经确定,如果多并发情况,锁完以后rename就会发生文件存在

try {lockManager.acquire(dst.toString(), src.toString());if (fs.exists(dst)) {throw new CommitFailedException("Version %d already exists: %s", nextVersion, dst);}

5.补充

5.1.PRIMARY Key

  当创建的表设置了PRIMARY Key的时候(目前以Flink样例,其他组件未测试)

CREATE TABLE upTable (id  INT UNIQUE COMMENT 'unique id',data STRING NOT NULL,PRIMARY KEY(id) NOT ENFORCED) with ('format-version'='2', 'write.upsert.enabled'='true');

  写入数据时会产生两个数据文件,一个数据文件存储全量的数据,一个数据文件专门存储PRIMARY Key,文件前缀完全相同,由最后五位区分
在这里插入图片描述
  当在一批中相同PRIMARY Key有多个数据写入时,还会产生第三个文件,记录每次的变更,内容是数据文件和文件内偏移量
在这里插入图片描述

http://www.lryc.cn/news/2420167.html

相关文章:

  • Java多线程(吐血超详细整理)
  • AOP与OOP有什么区别
  • Java发送Http请求(HttpClient)
  • sm2和sm4加密算法浅析
  • 【C语言】动态内存管理 - malloc等函数详解
  • Loki 学习总结(1)—— Loki 中小项目日志系统的不二之选
  • 事物(Transaction)
  • 机器学习——神经网络(BP)
  • 【Web前端】“CSS”选择器是什么?
  • ELF文件详解
  • 新手安装 Ubuntu 操作系统步骤教程
  • SOTA:目标识别、计算机视觉中常见的名词SOTA的意思
  • Unicode及UTF-8、UTF-16、UTF-32
  • x86_64和AMD64和ARM64?傻傻分不清楚?
  • 保姆式介绍DDR5(比喻方法讲解)
  • 什么是VOIP-网络电话名词详解
  • 最全.NET Core各个版本特性整理,面试可能会考
  • iso文件打开方法
  • JDK版本说明/下载安装/环境配置 全过程详解
  • linux中fd的几点理解——一切皆文件
  • 《漏洞研究》Apache Log4j2 远程代码执行漏洞_apache log4j2远程代码执行漏洞
  • beyond compare简易破解方法
  • 【编程实践】Google Guava 极简教程
  • 知识点 | 今天好好学习MPP和MapReduce分别是个嘛?!
  • 微信小程序框架weui的基础使用
  • 带你正确认识Unicode和UTF-8
  • EIP的理解
  • C语言MD5算法
  • 从零学MyCat(一)Mycat基本介绍及安装
  • [HPC入门] 高性能计算 (HPC) 是什么?哪些业务场景需要HPC?