当前位置: 首页 > news >正文

Paimon的部分更新以及DeleteVector实现

背景

本文基于 Paimon 0.9
出于对与Paimon内部的DeleteVctor的实现以及部分更新的实现进行的源码阅读。
关于 DeleteVector的介绍可以看这里

说明

对于Paimon来说无论是Spark中使用还是Flink使用,后面的逻辑都是一样的,所以我们以Spark为例来说。所以我们会参考类 org.apache.paimon.spark.SparkSource,
对于Flink可以参考org.apache.paimon.flink.FlinkTableFactory
如没特别说明,这里都是以主键表来进行说明。

paimon的部分字段更新

这里主要的场景更多的是多流或者多批写同一个表字段的场景,且每个流或批只更新某几个字段(同样的主键),具体的配置或说明参考Partial Update
这里涉及到的方法为 SparkTableWrite.write,最终会到MergeTreeWriter.write:

 @Overridepublic void write(KeyValue kv) throws Exception {long sequenceNumber = newSequenceNumber();boolean success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {flushWriteBuffer(false, false);success = writeBuffer.put(sequenceNumber, kv.valueKind(), kv.key(), kv.value());if (!success) {throw new RuntimeException("Mem table is too small to hold a single element.");}}}
  • writeBuffer.put 主要是往buffer中写数据
    这里的writeBufferSortBufferWriteBuffer类实例。
    这里会 主键+sequenceNumber+valueKind + value 的形式写入数据
  • flushWriteBuffer 这里就会涉及到数据落盘以及部分更新的逻辑:
      writeBuffer.forEach(keyComparator,mergeFunction,changelogWriter == null ? null : changelogWriter::write,dataWriter::write);
    
    • mergeFunction 这里的函数就是会在MergeTreeWriter初始化,也就是会初始化为PartialUpdateMergeFunction
    • 对于forEach的实现会构建一个 MergeIterator,在这里面会调用 PartialUpdateMergeFunction.add方法
      这里就会涉及到部分更新的逻辑,主要就是:把按照 主键+sequenceNumber 排序好的数据传给PartialUpdateMergeFunction
      这样PartialUpdateMergeFunction只需要判断前后两个的数据的主键是否一致来进行更新。
      具体的更新逻辑见: Partial Update
      new MergeIterator(awConsumer, buffer.sortedIterator(), keyComparator, mergeFunction);
      
      这里的buffer.sortedIterator主要看SortBufferWriteBuffer构造方法(也就是为什么会按照主键+sequenceNumber排序):
       public SortBufferWriteBuffer(RowType keyType,RowType valueType,@Nullable FieldsComparator userDefinedSeqComparator,MemorySegmentPool memoryPool,boolean spillable,MemorySize maxDiskSize,int sortMaxFan,CompressOptions compression,IOManager ioManager) {...// key fieldsIntStream sortFields = IntStream.range(0, keyType.getFieldCount());// user define sequence fieldsif (userDefinedSeqComparator != null) {IntStream udsFields =IntStream.of(userDefinedSeqComparator.compareFields()).map(operand -> operand + keyType.getFieldCount() + 2);sortFields = IntStream.concat(sortFields, udsFields);}// sequence fieldsortFields = IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount()));int[] sortFieldArray = sortFields.toArray();// row typeList<DataType> fieldTypes = new ArrayList<>(keyType.getFieldTypes());fieldTypes.add(new BigIntType(false));fieldTypes.add(new TinyIntType(false));fieldTypes.addAll(valueType.getFieldTypes());NormalizedKeyComputer normalizedKeyComputer =CodeGenUtils.newNormalizedKeyComputer(fieldTypes, sortFieldArray);RecordComparator keyComparator =CodeGenUtils.newRecordComparator(fieldTypes, sortFieldArray);...InternalRowSerializer serializer =InternalSerializers.create(KeyValue.schema(keyType, valueType));BinaryInMemorySortBuffer inMemorySortBuffer =BinaryInMemorySortBuffer.createBuffer(normalizedKeyComputer, serializer, keyComparator, memoryPool);
      
      其中IntStream.concat(sortFields, IntStream.of(keyType.getFieldCount())) 就会会把sequenceNumber这个字段带入到排序中去,
      也就是在buffer.sortedIterato方法中调用。
      如果有定义sequence.field,那这里面的字段也会参与排序,见:udsFields 字段

DeleteVector的实现

关于deleteVector的实现,可以参考Introduce deletion vectors for primary key table
大概的思想是: 基于Compaction + lookup的机制产生 DeleteVector:

  • 当一个记录不属于 level0层的话,就不会产生DelectVector
  • 当一个记录只属于需要进行compaction的level的话,就不会产生DeleteVector
  • 当一个记录只属于 level0层的话,就要去查询不包含 Compaction的层的文件数据,从而产生DeleteVector
    注意: deleteVector只支持主键表, 是属于bucket级别的,一个bucket一个DeleteVector。

DeleteVector的写

按照以上的说法,只有在Compaction的时候,才会产生DeleteVector,所以 我们直接到达 MergeTreeWriter.flushWriteBuffer,这里涉及到DeleteVector的数据流如下:

compactManager.triggerCompaction(forcedFullCompaction)||\/
submitCompaction||\/
MergeTreeCompactTask.doCompact||\/rewrite  ||\/
rewriteImpl ||\/
LookupMergeTreeCompactRewriter.rewrite ||\/
rewriteOrProduceChangelog||\/
createMergeWrapper||\/
iterator.next()||\/
RecordReaderIterator.next()||\/
advanceIfNeeded||\/
currentIterator.next() ||\/
SortMergeIterator.next()||\/
LookupChangelogMergeFunctionWrapper.add(winner)||\/
LookupChangelogMergeFunctionWrapper.getResult()
  • 这里MergeTreeCompactTask.doCompact写完之后,会有result.setDeletionFile(compactDfSupplier.get())
    compactDfSupplier 这里的源自submitCompaction方法中的compactDfSupplier构造:

     if (dvMaintainer != null) {compactDfSupplier =lazyGenDeletionFile? () -> CompactDeletionFile.lazyGeneration(dvMaintainer): () -> CompactDeletionFile.generateFiles(dvMaintainer);}
    

    而这里的deleteVector的产生来自LookupChangelogMergeFunctionWrapper.getResult(),见以下说明

  • 这里的LookupMergeTreeCompactRewriter.rewriteLookupMergeTreeCompactRewriter实例是在创建MergeTreeWriter

     CompactManager compactManager =createCompactManager(partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer)
    

    这里会调用createRewriter方法创建LookupMergeTreeCompactRewriter实例,
    其中会根据lookupStrategy来创建该实例:

     public LookupStrategy lookupStrategy() {return LookupStrategy.from(mergeEngine().equals(MergeEngine.FIRST_ROW),changelogProducer().equals(ChangelogProducer.LOOKUP),deletionVectorsEnabled(),options.get(FORCE_LOOKUP));
    
  • 这里 currentIterator.next() 是 通过调用currentIterator = SortMergeReaderWithLoserTree.readBatch获取的,而SortMergeReaderWithLoserTree 是通过readerForMergeTree方法获取的

  • 这里LookupChangelogMergeFunctionWrapper.getResult()才是重点

     @Overridepublic ChangelogResult getResult() {// 1. Compute the latest high level record and containLevel0 of candidatesLinkedList<KeyValue> candidates = mergeFunction.candidates();Iterator<KeyValue> descending = candidates.descendingIterator();KeyValue highLevel = null;boolean containLevel0 = false;while (descending.hasNext()) {KeyValue kv = descending.next();if (kv.level() > 0) {descending.remove();if (highLevel == null) {highLevel = kv;}} else {containLevel0 = true;}}// 2. Lookup if latest high level record is absentif (highLevel == null) {InternalRow lookupKey = candidates.get(0).key();T lookupResult = lookup.apply(lookupKey);if (lookupResult != null) {if (lookupStrategy.deletionVector) {PositionedKeyValue positionedKeyValue = (PositionedKeyValue) lookupResult;highLevel = positionedKeyValue.keyValue();deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition());} else {highLevel = (KeyValue) lookupResult;}}}// 3. Calculate resultKeyValue result = calculateResult(candidates, highLevel);// 4. Set changelog when there's level-0 recordsreusedResult.reset();if (containLevel0 && lookupStrategy.produceChangelog) {setChangelog(highLevel, result);}return reusedResult.setResult(result);} 
    
  • 这里主要说明 lookup.apply的方法,其中 lookup的 构造是在createLookupChangelogMergeFunctionWrapper构造中:

       @Overridepublic MergeFunctionWrapper<ChangelogResult> create(MergeFunctionFactory<KeyValue> mfFactory,int outputLevel,LookupLevels<T> lookupLevels,@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {return new LookupChangelogMergeFunctionWrapper<>(mfFactory,key -> {try {return lookupLevels.lookup(key, outputLevel + 1);} catch (IOException e) {throw new UncheckedIOException(e);}},valueEqualiser,changelogRowDeduplicate,lookupStrategy,deletionVectorsMaintainer,userDefinedSeqComparator);}}
    

    这里的lookupLevels.lookup 会最终调用createLookupFile 方法构造LookupFile 实例,
    其中会调用 valueProcessor.persistToDisk(kv, batch.returnedPosition()方法,持久化 行号到对应的文件,
    这样就能获取到对应的行号。

  • 获取到对应的结果 lookupResult 后
    调用 deletionVectorsMaintainer.notifyNewDeletion(positionedKeyValue.fileName(), positionedKeyValue.rowPosition()方法去构造
    DeletionVector.
    上面提到的result.setDeletionFile(compactDfSupplier.get())会调用 CompactDeletionFile.generateFiles(dvMaintainer) 方法
    从而调用maintainer.writeDeletionVectorsIndex方法,从而写如到DeleteVector文件中。

DeleteVector的读

DeleteVector的读取主要在以下方法中构造:PrimaryKeyFileStoreTable.newRead:
最终会调用RawFileSplitRead.createReader从而调用 ApplyDeletionVectorReader(fileRecordReader, deletionVector)方法构造ApplyDeletionVectorReader实例:

 public RecordIterator<InternalRow> readBatch() throws IOException {RecordIterator<InternalRow> batch = reader.readBatch();if (batch == null) {return null;}checkArgument(batch instanceof FileRecordIterator,"There is a bug, RecordIterator in ApplyDeletionVectorReader must be RecordWithPositionIterator");return new ApplyDeletionFileRecordIterator((FileRecordIterator<InternalRow>) batch, deletionVector);}

该处的readBatch方法会构造一个ApplyDeletionFileRecordIterator迭代器,可见在next()方法会对每一个记录调用deletionVector.isDeleted是否删除的判断.

 @Overridepublic InternalRow next() throws IOException {while (true) {InternalRow next = iterator.next();if (next == null) {return null;}if (!deletionVector.isDeleted(returnedPosition())) {return next;}}}

FAQ

写入文件的时候,怎么记录行号和主键的关系?

这里不会写入的时候记录行号,会在调用createLookupFile 在构建 LookupFile这个文件的时候(初始化),从parquet文件读取过来的时候,就会获取行号。

http://www.lryc.cn/news/597395.html

相关文章:

  • 篇四 tcp,udp客户端服务器编程模型
  • MYSQL 笔记3
  • 实验室信息管理系统的设计与实现/实验室管理系统
  • lwIP学习记录5——裸机lwIP工程学习后的总结
  • 【bug】websocket协议不兼容导致的一个奇怪问题
  • Linux 723 磁盘配额 限制用户写入 quota;snap快照原理
  • Linux 环境下安装 MySQL 8.0.34 二进制 详细教程 附docker+k8s启动
  • VU2 学习笔记4 计算属性、监视属性
  • 北京互联网公司面试题精华解析
  • 计算机网络学习----Https协议
  • 直接偏好优化(DPO):原理、演进与大模型对齐新范式
  • python-82-基于ORM操作数据库(一)简单模型CRUD
  • UniappDay01
  • JavaWeb笔记12
  • MySQL深度理解-深入理解MySQL索引底层数据结构与算法
  • 容联云携手信通院,启动“智能体服务生态共创计划”
  • 华为云ELB(弹性负载均衡)持续报异常
  • 2025年Zigbee技术白皮书:全球物联网无线通信的关键创新
  • HF86611_VC1/HF86611Q_VC1:多通道USB HiFi音频解码器固件技术解析
  • 【自动化运维神器Ansible】深入解析Ansible Host-Pattern:精准控制目标主机的艺术
  • .Net core 部署到IIS出现500.19Internal Server Error 解决方法
  • Ubuntu系统下FFmpeg源码编译安装
  • 内网穿透技术深析:从原理到工具应用的全方位解读,无公网IP本地服务器外网访问实操
  • IGM弧焊机器人气体节约
  • 【数据结构】哈希——位图与布隆过滤器
  • 彩色转灰度的核心逻辑:三种经典方法及原理对比
  • zabbix监控MySQL数据库
  • 企业选择将服务器放在IDC机房托管的优势
  • React+Three.js实现3D场景压力/温度/密度分布可视化
  • Spring Boot与Python的联动:实战案例解析