当前位置: 首页 > news >正文

FlinkCDC-Hudi数据实时入湖原理篇

1.Hudi应用场景

面对海量数据开发场景,一种支持存储多种原始数据格式、多种计算引擎、高效的元数据统一管理的存储方式能极大的提高开发效率。所以在选择技术选型的时候,这种存储方式有以下几个特点:

  • 存储原始数据,这些原始数据来源非常丰富(结构化,非结构化);

  • 支持多种计算模型;

  • 完善的数据管理能力,要能做到多种数据源接入,实现不同数据之间的连接;

  • 灵活的底层存储,一般用 hdfs 这种廉价的分布式文件系统。

本文会向大家介绍Hudi是如何具备上面集中优势的。但是Hadoop的技术栈那么复杂、而且Hudi也是近几年刚兴起的技术,为什么还要推荐大家使用Hudi?在这里总结了一下几点:

  1. Hudi对数据的读取有独特的优点,它能够帮助合并DFS上的最小文件,解决了HDFS和云存储上的小文件问题,能够显著提高查询性能。

  2. Hudi提供了删除存储在数据湖中数据的能力,可以通过Merge on Read的方式来处理辅助键随机删除所导致的写放大(只要 Partition 内有消息变更都需要覆盖重写)。

  3. Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照。

  4. Hudi对获取数据变更提供了很好的支持:可以从给定的时间点获取给定表中updated/inserted/deleted的所有记录的增量流。

总的来说,它是一种针对分析型业务的、扫描优化的数据存储抽象,它能够使DFS数据集在分钟级的时延内支持变更,也支持下游系统对这个数据集的增量处理。

2.数据入湖桥梁-FlinkCDC

目前 数据库的数据导入数据湖可以通过 CDC connector 一次性将全量和增量数据导入到 Hudi 格式中;也可以通过消费 Kafka 上的 CDC changelog,通过 Flink 的 CDC format 将数据导入到 Hudi 格式。

CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。

又细分为基于直连查询的 CDC和基于Binlog的 CDC。

对比点

基于直连查询的CDC

基于Binlog的 CDC

是否可以捕获所有数据变换

延迟性能

高延迟

低延迟

执行模式

Batch 批处理

Streaming流处理

对数据库的压力

压力较大

压力较小

开源产品

kafka JDBC source

Canal

以下是之前的mysql binlog日志处理流程,例如canal监听binlog把日志写入到kafka中。 Flink实时消费Kakfa的数据实现mysql数据的同步,整体上可以分为以下几个阶段。

  • 1.mysql开启binlog

  • 2.canal同步binlog数据写入到kafka

  • 3.flink读取kakfa中的binlog数据进行相关的业务处理。

整体的处理链路较长,需要用到的组件也比较多。 Apache Flink CDC可以直接从数据库获取到binlog供下游进行业务计算分析。

图片

也就是说数据不再通过canal与kafka进行同步,而flink直接进行处理mysql的数据。节省了canal与kafka的过程。

3.hudi结构介绍

hudi将一个表映射为如下文件结构:

图片

Hudi存储分为两个部分:元数据区、数据区。

3.1 元数据

hoodie目录对应着表的元数据信息,包括表的版本管理(Timeline)、归档目录(存放过时的instant也就是版本),一个instant记录了一次提交(commit)的行为、时间戳和状态,hudi以时间轴的形式维护了在数据集上执行的所有操作的元数据;

由于它维护着一条所有操作的不同 Instant组成的 Timeline(时间轴),通过时间轴,用户可以轻易的进行增量查询或基于某个历史时间点的查询。

Timeline格式:

图片

一个Instant的组成包括:

state

状态:目前包括REQUESTED(已调度但未初始化)、INFLIGHT(当前正在执行)、COMPLETED(操作执行完成),状态会转变,如当提交完成时会从 inflight状态转变为 completed状态。

action操作

对数据集执行的操作类型,如 commit、 deltacommit等:

提交(commit):一次提交表示将一批记录原子写入数据集中的过程。

增量提交(delta_commit) :增量提交是指将一批记录原子写入到MOR表中,其中数据都将只写入到日志中。

清理(clean):清理数据集中不再被查询中使用的文件的较旧版本。

压缩(compaction):将MOR表中多个log文件进行合并,用以减小数据存储,本质是将行式文件转化为列式文件的动作。

timestamp:开始 一个Instant发生的时间戳,Hudi会保证单调递增。

3.2 数据区

  • 数据文件/基础文件:Hudi将数据以列存格式(Parquet)存放,称为数据文件/基础文件。

  • 增量日志文件: 在 MOR 表格式中,更新被写入到增量日志文件中,该文件以 avro 格式存储。 这些增量日志文件始终与基本文件相关联。假设有一个名为 data_file_1 的数据文件,对 data_file_1 中记录的任何更新都将写入到新的增量日志文件。在服务读取查询时,Hudi 将实时合并基础文件及其相应的增量日志文件中的记录。

  • 文件组(FileGroup):通常根据存储的数据量,可能会有很多数据文件。 每个数据文件及其对应的增量日志文件形成一个文件组。 在 COW表中,只有基本文件。

  • 文件版本:比如COW表每当数据文件发生更新时,将创建数据文件的较新版本,其中包含来自较旧数据文件和较新传入记录的合并记录。

  • 文件切片(FileSlice):对于每个文件组,可能有不同的文件版本。 因此文件切片由特定版本的数据文件及其增量日志文件组成。 对于 COW表,最新的文件切片是指所有文件组的最新数据/基础文件。 对于 MOR表,最新文件切片是指所有文件组的最新数据/基础文件及其关联的增量日志文件。

4.Flink Hudi的批流一体

4.1 hudi表介绍

hudi支持两种表类型:Copy On Write(COW) & Merge On Read(MOR)。

COW表:在数据写入的时候,通过复制旧文件数据并且与新写入的数据进行合并,对 Hudi 的每一个新批次写入都将创建相应数据文件的新版本。

图片

data_file1 和 data_file2 都将创建更新的版本,data file 1 V2 是数据文件 data file 1 V1 的内容与数据文件data file 1 中传入批次匹配记录的记录合并。 由于在写入期间进行合并,COW 会产生一些写入延迟。 但是COW 的优势在于它的简单性。

MOR表:对于具有要更新记录的现有数据文件,Hudi 创建增量日志文件记录更新数据。此在写入期间不会合并或创建较新的数据文件版本;在进行数据读取的时候,将本批次读取到的数据进行Merge。Hudi 使用压缩机制来将数据文件和日志文件合并在一起并创建更新版本的数据文件。

图片

COW表和MOR表优势对比:

COW适用于读多写少的场景,MOR适用于写多读少的场景。

对比点

COW

MOR

说明

更新代价

COW为每批次写入都会创建更新的数据文件,所以cow的I/O成本高,而MOR更新增量日志文件,其I/O成本低。

读取延迟

一般

COW在写入就进行了合并,与Cow相比的话,MOR延迟较高。

写放大问题

假设有一个大小为100MB的数据文件,并且每次更新10%的记录进行4批次写入,4次写入之后,Hudi将会存储5个大小为100Mb的COW数据文件,MOR在4次写入后,将有1*100MB的文件和4个增量日志文件(10MB)的大小约140MB;

4.2 hudi表写入原理介绍

分为三个模块:数据写入、数据压缩与数据清理。

图片

4.2.1 数据写入

(1)基础数据封装:将数据流中flink的RowData封装成Hoodie实体;

(2)BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:

(3)Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize时进行刷新到文件中;

(4)Oprator Coordinator:主要与Hoodie Stream Writer进行交互,提交instant到timeLine上,并生成下一个instant的时间。

4.2.2 数据压缩

压缩( compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。

compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instant:

图片

4.3 hudi表读取介绍

Hudi支持如下三种查询类型:

快照读(Snapshot Queries)

  1. MOR表查询:在MOR模式下,Hudi在写入时将数据写入到可变的数据文件中,这些文件称为日志文件。当文件大小达到一定阈值时,Hudi会将这些日志文件归档到一个不可变的数据文件中。这些不可变的数据文件称为快照文件。因此,在MOR模式下,Hudi的查询快照实际上是查询这些快照文件。由于MOR表的数据文件是可变的,因此如果一个数据文件中的数据被更新,那么这个更新不会影响已经归档为快照文件的数据文件。因此,在查询MOR表的快照时,Hudi需要同时查询所有的数据文件和快照文件,以确保查询结果的正确性。

  2. COW表查询:在COW模式下,Hudi在写入时将数据写入到不可变的数据文件中,这些文件称为快照文件。当有更新发生时,Hudi会将更新写入一个新的数据文件中,并将这个新的数据文件作为新的快照文件。因此,在COW模式下,Hudi的查询快照实际上是查询这些快照文件。由于COW表的数据文件是不可变的,因此如果一个数据文件中的数据被更新,那么这个更新会生成一个新的数据文件,而不是更新原始的数据文件。因此,在查询COW表的快照时,Hudi只需要查询最新的快照文件即可,不需要查询旧的数据文件。这种方式可以提高查询性能。

增量读(Incremental Queries)

  1. 对于MOR表,增量查询可以直接在Hudi数据集中运行。这种查询类型可以在Hudi数据集中基于增量数据执行查询。MOR表中,每个数据文件都包含了最近一次写操作之后的所有更改。这意味着,如果在两个查询之间执行了一些写操作,则下一个查询将只考虑这些更改,并自动过滤掉之前的数据。

  2. 对于COW表,增量查询需要从历史数据中进行计算。在这种情况下,Apache Hudi需要将之前的数据文件加载到内存中,并计算增量数据。

优化读(Read Optimized Queries)

  1. 对于MOR表,可以使用时间戳或者Hudi记录中的默认时间戳进行Range查询,以查询特定时间范围内的数据。Apache Hudi会自动选择包含所需时间范围的文件版本,并返回该时间范围内的数据。

  2. 对于COW表,同样可以使用时间戳或默认时间戳进行Range查询。但由于COW表在每次写操作中都会创建一个全新的文件版本,因此Apache Hudi需要加载所有历史数据,并计算出特定时间范围内的数据。在这种情况下,COW表的查询时间可能会比MOR表更长。

图片

5.hudi vs Iceberg 数据更新能力

5.1 Iceberg 数据更新

Iceberg 的官方定位是「面向海量数据分析场景的高效存储格式」。所以它没有像 Hudi 一样模拟业务数据库的设计模式(主键+索引)来实现数据更新,而是设计了更强大的文件组织形式来实现数据的 update 操作,详见下图:

图片

s0,s1代表的是当前操作的一个快照,每次commit都会生成一个快照Snapshot,每个Snapshot快照对应一个manifest list元数据文件组,每个manifest list中包含多个Manifest元数据文件,maifest中记录了当前操作生成数据所对应的文件地址,也就是data file地址。

Data files(数据文件)

数据文件是Apache Iceberg表真实存储数据的文件,一般是在表的数据存储目录的data目录下,如果我们的文件格式选择的是parquet,那么文件是以“.parquet”结尾,Iceberg每次更新会产生多个数据文件。

Snapshot(表快照)

快照代表一张表在某个时刻的状态,每个快照里面会列出表在某个时刻的所有Data files 列表。Data files存储在不同的Manifest files里面,Manifest files存储在一个Manifest list文件里面,而一个Manifest list文件代表一个快照。

Manifest file(清单文件)

Manifest file是一个元数据文件,它列出组成快照(Snapshot)的数据文件(Data files)的列表信息。每行都是每个数据文件的详细描述,包括数据文件的状态、文件路径、分区信息、列级别的统计信息(比如每列的最大最小值、空值数等)、文件的大小以及文件里面数据行数等信息。

Manifest list(清单列表)

Manifest list也是一个元数据文件,它列出构建表快照(Snapshot)的清单。这个元数据文件中存储的是Manifest file列表,每个Manifest file占据一行。每行中存储了Manifest file的路径、其存储的数据文件(Data files)的分区范围,增加了几个数文件、删除了几个数据文件等信息,这些信息可以用来在查询时提供过滤,加快速度。

Iceberg 实现 update 的大致逻辑是:先将要删除的数据写入 Delete File;然后将「Data File」 JOIN 「Delete File」进行数据比对,实现数据更新。

5.2 hudi 数据更新

图片

如图所示,filink cdc写入Hudi,Upsert执行核心操作如下:

  1. 开始提交:判断上次任务是否失败,如果失败会触发回滚操作。然后会根据当前时间生成一个事务开始的请求标识元数据。

  2. 构造HoodieRecord Rdd对象:Hudi会根据元数据信息构造HoodieRecord Rdd对象,方便后续数据去重和数据合并。

  3. 数据去重:一批增量数据中可能会有重复的数据,Hudi会根据主键对数据进行去重,避免重复数据写入Hudi表。

  4. 数据fileId位置信息获取:在修改记录中可以根据索引获取当前记录所属文件的fileld,因数据合并时Update操作需要知道向哪个fileid文件写入新的快照文件。

  5. 数据合并:在COW表模式中会重写索引命中的fileId快照文件;在MOR表模式中根据fileId追加到分区中的log文件。

  6. 完成提交:在元数据中生成xxxx.commit文件,只有生成commit元数据文件,查询引擎才能根据元数据查询到刚刚Upsert后的数据。

  7. 数据清理:用于删除旧的文件片,以及限制表空间的增长,清理操作在每次写操作之后自动被执行,同时利用缓存在TimeLine Server上的TimeLine Metadata来防止扫描整个表。

  8. Compaction压缩:主要是MOR模式中才会用到,会将MOR模式中的xxx.log数据合并到xxx.parquet快照文件中去。

5.3 对比总结

Hudi 凭借文件组+索引+主键的设计模式,能够有效减少数据文件的冗余更新,提高数据更新效率。而Iceberg 通过文件组织设计也能达到数据更新效果,但是每一次的 commit 都会产生新的文件,如果写入/更新频繁,小文件问题会比较严重。

http://www.lryc.cn/news/571186.html

相关文章:

  • JVM监控的挑战:Applications Manager如何提供帮助
  • Spring Boot集成Kafka全攻略:从基础配置到高级实践
  • 多模态大语言模型演进:从视觉理解到具身智能的技术突破
  • Linux运维新人自用笔记(部署 ​​LAMP:Linux + Apache + MySQL + PHP、部署discuz论坛)
  • 5.安装IK分词器
  • ELK在Java的使用
  • Selenium(选择元素,浏览器/元素操作,等待,页面交互)
  • Windows Python 环境管理终极对比:极简方案 VS 传统方案(仅需 2 个软件实现全流程自动化)
  • Selenium(多窗口,frame,验证码,截图,PO模式)
  • rockx读取单张图片并检测图片内人脸的矩形
  • vite的常用配置
  • 「动态规划::数位DP」统计数字递推 / LeetCode 3352|1012(C++)
  • 线程池(Thread Pool)详解
  • 基于Cesium移动的天空云
  • 【Docker基础】Docker核心概念:命名空间(Namespace)之IPC详解
  • 根据Python模块的完整路径import动态导入
  • 05_MinIO+Java SpringBoot 实现透传代理下载
  • 如何确定驱动480x320分辨率的显示屏所需的MCU主频
  • 为何前馈3DGS的边界总是“一碰就碎”?PM-Loss用“3D几何先验”来解
  • Mac 安装JD-GUI
  • 低轨导航 | 低轨卫星导航PNT模型,原理,公式,matlab代码
  • 软件工程:流程图如何画?
  • Python 爬虫入门 Day 5 - 使用 XPath 进行网页解析(lxml + XPath)
  • springboot使用kafka
  • Jmeter的三种参数化方式详解
  • web前端开发核心基础:Html结构分析,head,body,不同标签的作用
  • Java内存模型与线程
  • Anaconda 使用
  • 力扣经典算法篇-17-反转字符串中的单词(逆序遍历,数组分割,正则表达式)
  • 4_STM32F103ZET6芯片系统架构和寄存器