当前位置: 首页 > news >正文

实时数仓: Hudi 表管理、Flink 性能调优或治理工具脚本

1. Hudi 表管理

1.1 Hudi 表基础管理

创建 Hudi 表
在 HDFS 上创建一个 Hudi 表(以 Merge-on-Read 为例):

CREATE TABLE real_time_dw.dwd_order_fact (order_id STRING,user_id STRING,product_id STRING,amount DOUBLE,order_date STRING,update_time TIMESTAMP
)
PARTITIONED BY (order_date)
STORED AS PARQUET
TBLPROPERTIES ('type'='MERGE_ON_READ','primaryKey'='order_id','preCombineField'='update_time'
);
1.2 数据操作

插入/更新数据
利用 Hudi 写入工具(如 Spark)进行批量或实时插入更新:

from pyspark.sql import SparkSession
from datetime import datetimespark = SparkSession.builder \.appName("Hudi Example") \.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \.getOrCreate()# 加载数据
data = [{"order_id": "1", "user_id": "101", "product_id": "201", "amount": 99.99, "order_date": "2025-01-01", "update_time": datetime.now()},{"order_id": "2", "user_id": "102", "product_id": "202", "amount": 199.99, "order_date": "2025-01-01", "update_time": datetime.now()}
]
df = spark.createDataFrame(data)# 写入 Hudi
hudi_options = {"hoodie.table.name": "dwd_order_fact","hoodie.datasource.write.recordkey.field": "order_id","hoodie.datasource.write.precombine.field": "update_time","hoodie.datasource.write.partitionpath.field": "order_date","hoodie.datasource.write.operation": "upsert","hoodie.datasource.write.table.type": "MERGE_ON_READ","hoodie.datasource.hive.sync.enable": "true","hoodie.datasource.hive.database": "real_time_dw","hoodie.datasource.hive.table": "dwd_order_fact","hoodie.datasource.hive.partition_fields": "order_date"
}df.write.format("hudi").options(**hudi_options).mode("append").save("hdfs://path/to/hudi/dwd_order_fact")
1.3 Hudi 表维护

表清理

  • 配置清理策略,清理过期版本:
    hoodie.cleaner.commits.retained=10
    hoodie.cleaner.policy=KEEP_LATEST_COMMITS
    
    保留最近 10 个提交版本。

表压缩

  • 针对 MOR 表,定期运行 compaction 任务:
    spark-submit --class org.apache.hudi.utilities.HoodieCompactor \--master yarn \--table-path hdfs://path/to/hudi/dwd_order_fact \--table-name dwd_order_fact
    

元数据管理

  • 更新 Hive 元数据:
    MSCK REPAIR TABLE real_time_dw.dwd_order_fact;
    

2. Flink 性能调优

2.1 Checkpoint 性能优化

增量 Checkpoint
启用 RocksDB 增量检查点,减少状态存储大小:

env.getCheckpointConfig().enableIncrementalCheckpoints(true);

异步快照
减少 Checkpoint 对性能的影响:

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60000); // 60秒超时
env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 优先使用Checkpoint恢复
2.2 Watermark 优化

如果数据有延迟,可以允许一定的 out-of-order 数据处理:

WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)) // 最大延迟5秒.withTimestampAssigner((event, timestamp) -> event.getEventTime());
2.3 状态管理优化

状态后端选择

  • 优先选择 RocksDB 状态后端,支持更大的状态数据:
    env.setStateBackend(new RocksDBStateBackend("hdfs://path/to/checkpoints", true));
    

TTL(Time-to-Live)设置

  • 自动清理无用状态:
    stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.hours(1)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).build());
    
2.4 Task Slot 配置

根据并发优化 TaskManager:

  • 每个 TaskManager 提供更多 slots:
    taskmanager.numberOfTaskSlots: 4
    

3. 治理工具脚本

3.1 数据质量治理(Great Expectations)

脚本自动化
以下 Python 脚本可以实现自动化数据校验(如字段非空和值域校验):

from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import DataContextcontext = DataContext()batch_request = BatchRequest(datasource_name="my_s3_datasource",data_connector_name="default_runtime_data_connector_name",data_asset_name="dwd_order_fact",runtime_parameters={"path": "s3://path/to/hudi/dwd_order_fact/"},batch_identifiers={"default_identifier_name": "2025-01-01"}
)validator = context.get_validator(batch_request=batch_request)# 非空校验
validator.expect_column_values_to_not_be_null("order_id")
# 值域校验
validator.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "SHIPPED", "CANCELLED"])
# 保存结果
validator.save_expectation_suite("order_fact_suite")context.run_validation_operator("action_list_operator",assets_to_validate=[validator]
)
3.2 数据权限管理(Apache Ranger)

策略 JSON 配置
以下为权限策略 JSON 文件的示例,适用于 Ranger API 批量添加策略:

{"policyName": "dwd_order_fact_policy","serviceType": "hive","resources": {"database": {"values": ["real_time_dw"],"isExcludes": false,"isRecursive": false},"table": {"values": ["dwd_order_fact"],"isExcludes": false,"isRecursive": false}},"policyItems": [{"accesses": [{"type": "select", "isAllowed": true}],"users": ["bi_user"],"groups": ["BI_Group"]},{"accesses": [{"type": "select", "isAllowed": true}, {"type": "insert", "isAllowed": true}],"users": ["etl_user"],"groups": ["ETL_Team"]}]
}

通过 Ranger REST API 部署该策略:

curl -u admin:admin -H "Content-Type: application/json" -X POST -d @policy.json http://<RANGER_HOST>:6080/service/public/v2/api/policy
3.3 数据血缘治理(Apache Atlas)

Flink 血缘注册脚本
通过 REST API 自动将 Flink 作业的输入输出血缘关系上传到 Atlas:

curl -X POST http://<ATLAS_HOST>:21000/api/atlas/v2/entity \
-H "Content-Type: application/json" \
-d '{"entity": {"typeName": "process","attributes": {"name": "flink_order_job","inputs": [{"typeName": "kafka_topic", "uniqueAttributes": {"qualifiedName": "order_topic"}}],"outputs": [{"typeName": "hdfs_path", "uniqueAttributes": {"qualifiedName": "hdfs://path/to/hudi/dwd_order_fact"}}]}}
}'

http://www.lryc.cn/news/515892.html

相关文章:

  • Kotlin 数据类与密封类
  • 大模型推理加速调研(框架、方法)
  • C语言进阶(3)--字符函数和字符串函数
  • 微服务拆分的艺术:构建高效、灵活的系统架构
  • 记录一次电脑被入侵用来挖矿的过程(Trojan、Miner、Hack、turminoob)
  • 计算机xinput1_4.dll丢失怎么修复?
  • 高等数学学习笔记 ☞ 连续函数的运算与性质
  • k8s基础(4)—Kubernetes-Service
  • CAN或者CANFD的Busoff的恢复时间会受到报文周期的影响么?
  • 【DevOps】Jenkins部署
  • 【MATLAB第112期】基于MATLAB的SHAP可解释神经网络回归模型(敏感性分析方法)
  • 【Shell编程 / 4】函数定义、脚本执行与输入输出操作
  • RK3588+麒麟国产系统+FPGA+AI在电力和轨道交通视觉与采集系统的应用
  • MySQL 01 02 章——数据库概述与MySQL安装篇
  • 运行framework7
  • 【Web】软件系统安全赛CachedVisitor——记一次二开工具的经历
  • 实现自定义集合类:深入理解C#中的IEnumerable<T>接口
  • Compression Techniques for LLMs
  • Nexus Message Transaction Services(MTS)
  • 2025年Stable Diffusion安装教程(超详细)
  • 力扣【SQL连续问题】
  • 深圳市-地铁线路和站点名称shp矢量数据(精品)2021年-2030最新arcmap含规划路线内容测评分析
  • 企业级网络运维管理系统深度解析与实践案例
  • 音视频入门基础:MPEG2-PS专题(5)——FFmpeg源码中,解析PS流中的PES流的实现
  • 【问题记录】npm create vue@latest报错
  • OpenGL材质系统和贴图纹理
  • Markdown中类图的用法
  • 钓鱼攻击(Phishing)详解和实现 (网络安全)
  • window11 wsl mysql8 错误分析:1698 - Access denied for user ‘root‘@‘kong.mshome.net‘
  • C++线程同步之条件变量