Flink java 工具类
flink 环境构建工具类
public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** @param args* @return org.apache.flink.api.java.utils.ParameterTool* @date 2023/8/4 - 10:05 AM*/public static ParameterTool createParameterTool(final String[] args) throws Exception {return ParameterTool.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME)).mergeWith(ParameterTool.fromArgs(args)).mergeWith(ParameterTool.fromSystemProperties());}/*** flink 环境配置** @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 11:10 AM*/public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));env.getConfig();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {CheckPointUtil.setCheckpointConfig(env,parameterTool);// 取消作业时保留外部化 Checkpoint 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);}env.getConfig().setGlobalJobParameters(parameterTool);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);return env;}
}
checkpoint 工具类
public class CheckPointUtil {private static final String CHECKPOINT_MEMORY = "memory";private static final String CHECKPOINT_FS = "fs";private static final String CHECKPOINT_ROCKETSDB = "rocksdb";/*** 默认的checkpoint 存储地址*/private static final String CHECKPOINT_DEFAULT = "default";/*** 设置flink check point** @param env* @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 10:49 AM*/public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{// 根据类型,设置合适的状态后端String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {//1、state 存放在内存中,默认是 5MStateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);env.setStateBackend(stateBackend);}else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);env.setStateBackend(stateBackend);}else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);env.setStateBackend(rocksDBStateBackend);}//设置 checkpoint 周期时间env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)// 设置 exactly-once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置 checkpoint 最小间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃env.getCheckpointConfig().setCheckpointTimeout(15*60000);// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置 checkpoint 的并发度为 1env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}
}
构建kafak source 、sink
/*** 构建 source kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>* @date 2023/8/4 - 2:41 PM*/private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);// 正则表达式消费FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),new SimpleStringSchema(),props);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);// 从最开始的位置开始消费if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){kafkaConsumer.setStartFromEarliest();}else{kafkaConsumer.setStartFromGroupOffsets();}return kafkaConsumer;}/*** 构建 sink kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>* @date 2023/8/16 - 11:38 AM*/private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC), (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes()),props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);}
kafka 工具类
public class KafkaConfigUtil {/*** 设置 kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/4 - 2:39 PM*/public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));props.put("flink.partition-discovery.interval-millis", "10000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");//0817 - 消费kafka数据超时时间和尝试次数props.put("request.timeout.ms", "30000");props.put("retries", 5);return props;}/*** 构建 sink kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/14 - 5:54 PM*/public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");return props;}}
jdbc 工具类
public class JdbcDatasourceUtils {public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();/*** 获取hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:23 PM*/public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);if (!DATASOURCES.containsKey(md5Key)) {synchronized (JdbcDatasourceUtils.class) {if (!DATASOURCES.containsKey(md5Key)) {DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));}}}return DATASOURCES.get(md5Key);}/*** 构建hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:14 PM*/private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dsUname);config.setPassword(dsPwd);config.setDriverClassName(dsDriver);// 从池返回的连接的默认自动提交,默认值:trueconfig.setAutoCommit(true);//只读config.setReadOnly(true);// 连接超时时间:毫秒,默认值30秒config.setConnectionTimeout(10000);// 最大连接数config.setMaximumPoolSize(32);// 最小空闲连接config.setMinimumIdle(16);// 空闲连接超时时间config.setIdleTimeout(600000);// 连接最大存活时间config.setMaxLifetime(540000);// 连接测试查询config.setConnectionTestQuery("SELECT 1");return new HikariDataSource(config);}/*** 按列加载数据** @param dataSource* @param sql* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>* @date 2023/8/15 - 6:03 PM*/public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {return loadSql(dataSource, sql, resultSet -> {List<Map<String, Object>> datas = new ArrayList<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();//组装返回值Map<String, Object> entry;while (resultSet.next()) {entry = new LinkedHashMap<>();// getColumnLabel 取重命名,getColumnName 原始字段名for (int i = 1; i <= metaData.getColumnCount(); i++) {entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));}datas.add(entry);}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 加载数据遍历放入set集合** @param dataSource* @param sql* @param function* @return java.util.Set<R>* @date 2023/8/15 - 6:03 PM*/public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {return loadSql(dataSource, sql, resultSet -> {Set<R> datas = new LinkedHashSet<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {for (int i = 1; i <= metaData.getColumnCount(); i++) {datas.add(function.apply(resultSet.getObject(i)));}}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 执行查询sql** @param dataSource* @param sql* @param function* @return R* @date 2023/8/15 - 6:03 PM*/private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();preparedStatement = connection.prepareStatement(sql);resultSet = preparedStatement.executeQuery();return function.apply(resultSet);} catch (Exception e){e.printStackTrace();} finally {if (connection != null){try {connection.close();} catch (SQLException e) {e.printStackTrace();}}if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}if (resultSet != null){try {resultSet.close();} catch (SQLException e) {e.printStackTrace();}}}return function.apply(null);}
}