当前位置: 首页 > news >正文

iceberg系列之 hadoop catalog 小文件合并实战

  1. 背景
    flink1.15 hadoop3.0
  2. pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.iceberg</groupId><artifactId>flink-iceberg</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><!--idea运行时也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>compile</scope></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.15</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.3.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><!-- 指定主类 --><mainClass>com.iceberg.flink.UnionDelData</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
  1. 资源配置文件
    hadoop三个常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到资源目录下
  2. java代码
package com.iceberg.flink;import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;import java.io.File;
import java.net.MalformedURLException;public class UnionDelData {public static void main(String[] args) throws MalformedURLException {      String tableNames = args[1];long targetsSize = parseSizeToBytes(args[2]);int parallelism = Integer.parseInt(args[3]);long retainTime = parseTimeToMillis(args[4]);int retainLastNum = Integer.parseInt(args[5]);Configuration conf = new Configuration();conf.addResource(new File("/home/hadoop/hadoopconf/core-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/hdfs-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/yarn-site.xml").toURI().toURL());HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "/user/hadoop/path/");for (String tableName : tableNames.split(",")) {Table table = hadoopCatalog.loadTable(TableIdentifier.of("prod", tableName));UnionDataFile(table,parallelism,targetsSize);deleteSnap(table,retainTime,retainLastNum);}}public static void UnionDataFile(Table table,int parallelism,long targetsSize) {Actions.forTable(table).rewriteDataFiles().maxParallelism(parallelism).caseSensitive(false).targetSizeInBytes(targetsSize).execute();}public static void deleteSnap(Table table,long retainTime,int retainLastNum){Snapshot snapshot = table.currentSnapshot();long oldSnapshot = snapshot.timestampMillis() - retainTime;if (snapshot != null) {            table.expireSnapshots().expireOlderThan(oldSnapshot).cleanExpiredFiles(true).retainLast(retainLastNum).commit();}}public static long parseSizeToBytes(String sizeWithUnit) {long size = Long.parseLong(sizeWithUnit.substring(0, sizeWithUnit.length() - 1));char unit = sizeWithUnit.charAt(sizeWithUnit.length() - 1); switch (unit) {case 'B':return size;case 'K':case 'k': return size * 1024;case 'M':case 'm': return size * 1024 * 1024;case 'G':case 'g': return size * 1024 * 1024 * 1024;default:throw new IllegalArgumentException("Invalid size unit: " + unit);}}public static long parseTimeToMillis(String timeWithUnit) {long time = Long.parseLong(timeWithUnit.substring(0, timeWithUnit.length() - 1));char unit = timeWithUnit.charAt(timeWithUnit.length() - 1);switch (unit) {case 's':case 'S':return time * 1000;case 'm':case 'M':return time * 60 * 1000;case 'h':case 'H':return time * 60 * 60 * 1000;case 'd':case 'D':return time * 24 * 60 * 60 * 1000;default:throw new IllegalArgumentException("Invalid time unit: " + unit);}}
}
http://www.lryc.cn/news/128285.html

相关文章:

  • 神经网络基础-神经网络补充概念-25-深层神经网络
  • MySQL— 基础语法大全及操作演示!!!(上)
  • [golang gin框架] 46.Gin商城项目-微服务实战之后台Rbac客户端调用微服务权限验证以及Rbac微服务数据库抽离
  • 域名和ip的关系
  • excel日期函数篇1
  • Leetcode151 翻转字符串中的单词
  • PHP FTP的相关函数及简单使用示例
  • 高光谱 | 矿物识别和分类标签数据制作、农作物病虫害数据分类、土壤有机质含量回归与制图、木材含水量评估和制图
  • 【数据结构】二叉树篇| 纲领思路01+刷题
  • 系统架构设计师---计算机基础知识之数据库系统结构与规范化
  • PyCharm连接Docker中的容器(ubuntu)
  • 安防视频汇聚平台EasyCVR视频监控综合管理平台H.265转码功能更新,新增分辨率配置的具体步骤
  • 全平台数据(数据库)管理工具 DataCap 管理 Rainbond 上的所有数据库
  • “深入探究JVM内部机制:从字节码到实际执行“
  • C++写文件,直接写入结构体
  • 【Spring专题】Spring之Bean的生命周期源码解析——阶段二(二)(IOC之属性填充/依赖注入)
  • 线程|线程的使用、四种实现方式
  • Facebook 应用未启用:这款应用目前无法使用,应用开发者已得知这个问题。
  • (十八)大数据实战——Hive的metastore元数据服务安装
  • ubuntu 22.04 LTS 在 llvm release/17.x 分支上编译 cookbook llvm example Chapter 02
  • 【仿写tomcat】三、通过socket读取http请求信息
  • Hive的窗口函数与行列转换函数及JSON解析函数
  • CSS中的z-index属性有什么作用?如何控制元素在层叠上下文中的显示顺序?
  • c语言——字符转ASCLL码
  • ardupilot开发 --- 安装与调参篇
  • BC108 矩阵交换
  • 如何发现系统改进点,优化点,提高点,新系统 边界感不要太强
  • 5G无人露天矿山解决方案
  • Datawhale Django入门组队学习Task01
  • 【第二阶段】kotlin的函数类型作为返回类型