- 背景
flink1.15 hadoop3.0 - pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.iceberg</groupId><artifactId>flink-iceberg</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-files</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>compile</scope></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-flink-runtime-1.15</artifactId><version>1.3.0</version></dependency><dependency><groupId>org.apache.iceberg</groupId><artifactId>iceberg-core</artifactId><version>1.3.0</version></dependency></dependencies><build><plugins><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.3.0</version><configuration><archive><manifest><mainClass>com.iceberg.flink.UnionDelData</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins></build>
</project>
- 资源配置文件
hadoop三个常用配置文件core-site.xml hdfs-site.xml yarn-site.xml 放到资源目录下 - java代码
package com.iceberg.flink;import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.actions.Actions;
import org.apache.iceberg.hadoop.HadoopCatalog;import java.io.File;
import java.net.MalformedURLException;public class UnionDelData {public static void main(String[] args) throws MalformedURLException { String tableNames = args[1];long targetsSize = parseSizeToBytes(args[2]);int parallelism = Integer.parseInt(args[3]);long retainTime = parseTimeToMillis(args[4]);int retainLastNum = Integer.parseInt(args[5]);Configuration conf = new Configuration();conf.addResource(new File("/home/hadoop/hadoopconf/core-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/hdfs-site.xml").toURI().toURL());conf.addResource(new File("/home/hadoop/hadoopconf/yarn-site.xml").toURI().toURL());HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "/user/hadoop/path/");for (String tableName : tableNames.split(",")) {Table table = hadoopCatalog.loadTable(TableIdentifier.of("prod", tableName));UnionDataFile(table,parallelism,targetsSize);deleteSnap(table,retainTime,retainLastNum);}}public static void UnionDataFile(Table table,int parallelism,long targetsSize) {Actions.forTable(table).rewriteDataFiles().maxParallelism(parallelism).caseSensitive(false).targetSizeInBytes(targetsSize).execute();}public static void deleteSnap(Table table,long retainTime,int retainLastNum){Snapshot snapshot = table.currentSnapshot();long oldSnapshot = snapshot.timestampMillis() - retainTime;if (snapshot != null) { table.expireSnapshots().expireOlderThan(oldSnapshot).cleanExpiredFiles(true).retainLast(retainLastNum).commit();}}public static long parseSizeToBytes(String sizeWithUnit) {long size = Long.parseLong(sizeWithUnit.substring(0, sizeWithUnit.length() - 1));char unit = sizeWithUnit.charAt(sizeWithUnit.length() - 1); switch (unit) {case 'B':return size;case 'K':case 'k': return size * 1024;case 'M':case 'm': return size * 1024 * 1024;case 'G':case 'g': return size * 1024 * 1024 * 1024;default:throw new IllegalArgumentException("Invalid size unit: " + unit);}}public static long parseTimeToMillis(String timeWithUnit) {long time = Long.parseLong(timeWithUnit.substring(0, timeWithUnit.length() - 1));char unit = timeWithUnit.charAt(timeWithUnit.length() - 1);switch (unit) {case 's':case 'S':return time * 1000;case 'm':case 'M':return time * 60 * 1000;case 'h':case 'H':return time * 60 * 60 * 1000;case 'd':case 'D':return time * 24 * 60 * 60 * 1000;default:throw new IllegalArgumentException("Invalid time unit: " + unit);}}
}