当前位置: 首页 > news >正文

Flink java 工具类

flink 环境构建工具类

public class ExecutionEnvUtil {/*** 从配置文件中读取配置(生效优先级:配置文件<命令行参数<系统参数)** @param args* @return org.apache.flink.api.java.utils.ParameterTool* @date 2023/8/4 - 10:05 AM*/public static ParameterTool createParameterTool(final String[] args) throws Exception {return ParameterTool.fromPropertiesFile(ExecutionEnvUtil.class.getResourceAsStream(BaseConstants.PROPERTIES_FILE_NAME)).mergeWith(ParameterTool.fromArgs(args)).mergeWith(ParameterTool.fromSystemProperties());}/*** flink 环境配置** @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 11:10 AM*/public static StreamExecutionEnvironment prepare(ParameterTool parameterTool) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(parameterTool.getInt(PropertiesConstants.STREAM_PARALLELISM, 12));env.getConfig();env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, Time.seconds(60)));if (parameterTool.getBoolean(PropertiesConstants.STREAM_CHECKPOINT_ENABLE, true)) {CheckPointUtil.setCheckpointConfig(env,parameterTool);// 取消作业时保留外部化 Checkpoint 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);}env.getConfig().setGlobalJobParameters(parameterTool);env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);return env;}
}

checkpoint 工具类

public class CheckPointUtil {private static final String CHECKPOINT_MEMORY = "memory";private static final String CHECKPOINT_FS = "fs";private static final String CHECKPOINT_ROCKETSDB = "rocksdb";/*** 默认的checkpoint 存储地址*/private static final String CHECKPOINT_DEFAULT = "default";/*** 设置flink check point** @param env* @param parameterTool* @return org.apache.flink.streaming.api.environment.StreamExecutionEnvironment* @date 2023/8/4 - 10:49 AM*/public static StreamExecutionEnvironment setCheckpointConfig(StreamExecutionEnvironment env, ParameterTool parameterTool) throws Exception{// 根据类型,设置合适的状态后端String stateBackendType = parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_TYPE, CHECKPOINT_DEFAULT);if (CHECKPOINT_MEMORY.equalsIgnoreCase(stateBackendType)) {//1、state 存放在内存中,默认是 5MStateBackend stateBackend = new MemoryStateBackend(5 * 1024 * 1024 * 100);env.setStateBackend(stateBackend);}else if (CHECKPOINT_FS.equalsIgnoreCase(stateBackendType)) {StateBackend stateBackend = new FsStateBackend(new URI(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR)), 0, true);env.setStateBackend(stateBackend);}else if (CHECKPOINT_ROCKETSDB.equalsIgnoreCase(stateBackendType)) {RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(parameterTool.get(PropertiesConstants.STREAM_CHECKPOINT_DIR), true);env.setStateBackend(rocksDBStateBackend);}//设置 checkpoint 周期时间env.enableCheckpointing(parameterTool.getLong(PropertiesConstants.STREAM_CHECKPOINT_INTERVAL, 60000));//高级设置(这些配置也建议写成配置文件中去读取,优先环境变量)// 设置 exactly-once 模式env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 设置 checkpoint 最小间隔 500 msenv.getCheckpointConfig().setMinPauseBetweenCheckpoints(2*60000);// 设置 checkpoint 必须在n分钟内完成,否则会被丢弃env.getCheckpointConfig().setCheckpointTimeout(15*60000);// 设置 checkpoint 失败时,任务不会 fail,可容忍3次连续失败env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);// 设置 checkpoint 的并发度为 1env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);return env;}
}

构建kafak source 、sink

/*** 构建 source kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer<java.lang.String>* @date 2023/8/4 - 2:41 PM*/private static FlinkKafkaConsumer<String> buildSourceKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSourceKafkaProps(parameterTool);// 正则表达式消费FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(Pattern.compile(parameterTool.get(PropertiesConstants.KAFKA_SOURCE_TOPIC)),new SimpleStringSchema(),props);kafkaConsumer.setCommitOffsetsOnCheckpoints(true);// 从最开始的位置开始消费if(parameterTool.getBoolean(PropertiesConstants.KAFKA_START_FROM_FIRST, false)){kafkaConsumer.setStartFromEarliest();}else{kafkaConsumer.setStartFromGroupOffsets();}return kafkaConsumer;}/*** 构建 sink kafka** @param parameterTool* @return org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer<com.alibaba.fastjson.JSONObject>* @date 2023/8/16 - 11:38 AM*/private static FlinkKafkaProducer<JSONObject> buildSinkKafka(ParameterTool parameterTool){Properties props = KafkaConfigUtil.buildSinkKafkaProps(parameterTool);return new FlinkKafkaProducer<>(parameterTool.get(PropertiesConstants.KAFKA_SINK_DEFAULT_TOPIC), (KafkaSerializationSchema<JSONObject>) (element, timestamp) ->new ProducerRecord<>(element.getString(BaseConstants.PARAM_LOG_TYPE), element.toJSONString().getBytes()),props, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);}

kafka 工具类

public class KafkaConfigUtil {/*** 设置 kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/4 - 2:39 PM*/public static Properties buildSourceKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));props.put("group.id", parameterTool.get(PropertiesConstants.KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));props.put("flink.partition-discovery.interval-millis", "10000");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");//0817 - 消费kafka数据超时时间和尝试次数props.put("request.timeout.ms", "30000");props.put("retries", 5);return props;}/*** 构建 sink kafka 配置** @param parameterTool* @return java.util.Properties* @date 2023/8/14 - 5:54 PM*/public static Properties buildSinkKafkaProps(ParameterTool parameterTool) {Properties props = parameterTool.getProperties();props.put("bootstrap.servers", parameterTool.get(PropertiesConstants.KAFKA_SINK_BROKERS));props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "latest");props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "PLAIN");props.setProperty(ProducerConfig.RETRIES_CONFIG, "5");props.put(ProducerConfig.ACKS_CONFIG, "1");props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "300");return props;}}

jdbc 工具类

public class JdbcDatasourceUtils {public static volatile Map<String, HikariDataSource> DATASOURCES = new ConcurrentHashMap<>();/*** 获取hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:23 PM*/public static HikariDataSource getHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {String md5Key = Md5Util.encrypt(jdbcUrl + " " + dsUname + " " + dsPwd + " " + dsDriver);if (!DATASOURCES.containsKey(md5Key)) {synchronized (JdbcDatasourceUtils.class) {if (!DATASOURCES.containsKey(md5Key)) {DATASOURCES.put(md5Key, createHikariDataSource(jdbcUrl, dsUname, dsPwd, dsDriver));}}}return DATASOURCES.get(md5Key);}/*** 构建hikari数据库链接池** @param jdbcUrl* @param dsUname* @param dsPwd* @param dsDriver* @return com.zaxxer.hikari.HikariDataSource* @date 2023/8/9 - 2:14 PM*/private static HikariDataSource createHikariDataSource(String jdbcUrl, String dsUname, String dsPwd, String dsDriver) {HikariConfig config = new HikariConfig();config.setJdbcUrl(jdbcUrl);config.setUsername(dsUname);config.setPassword(dsPwd);config.setDriverClassName(dsDriver);// 从池返回的连接的默认自动提交,默认值:trueconfig.setAutoCommit(true);//只读config.setReadOnly(true);// 连接超时时间:毫秒,默认值30秒config.setConnectionTimeout(10000);// 最大连接数config.setMaximumPoolSize(32);// 最小空闲连接config.setMinimumIdle(16);// 空闲连接超时时间config.setIdleTimeout(600000);// 连接最大存活时间config.setMaxLifetime(540000);// 连接测试查询config.setConnectionTestQuery("SELECT 1");return new HikariDataSource(config);}/*** 按列加载数据** @param dataSource* @param sql* @return java.util.List<java.util.Map<java.lang.String,java.lang.Object>>* @date 2023/8/15 - 6:03 PM*/public static List<Map<String, Object>> loadDatas(HikariDataSource dataSource, String sql) {return loadSql(dataSource, sql, resultSet -> {List<Map<String, Object>> datas = new ArrayList<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();//组装返回值Map<String, Object> entry;while (resultSet.next()) {entry = new LinkedHashMap<>();// getColumnLabel 取重命名,getColumnName 原始字段名for (int i = 1; i <= metaData.getColumnCount(); i++) {entry.put(metaData.getColumnLabel(i), resultSet.getObject(i));}datas.add(entry);}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 加载数据遍历放入set集合** @param dataSource* @param sql* @param function* @return java.util.Set<R>* @date 2023/8/15 - 6:03 PM*/public static <R> Set<R> loadSetDatas(HikariDataSource dataSource, String sql, Function<Object, R> function) {return loadSql(dataSource, sql, resultSet -> {Set<R> datas = new LinkedHashSet<>();try {if (null == resultSet){return datas;}ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {for (int i = 1; i <= metaData.getColumnCount(); i++) {datas.add(function.apply(resultSet.getObject(i)));}}} catch (Exception e) {e.printStackTrace();}return datas;});}/*** 执行查询sql** @param dataSource* @param sql* @param function* @return R* @date 2023/8/15 - 6:03 PM*/private static <R> R loadSql(HikariDataSource dataSource, String sql, Function<ResultSet, R> function) {Connection connection = null;PreparedStatement preparedStatement = null;ResultSet resultSet = null;try {connection = dataSource.getConnection();preparedStatement = connection.prepareStatement(sql);resultSet = preparedStatement.executeQuery();return function.apply(resultSet);} catch (Exception e){e.printStackTrace();} finally {if (connection != null){try {connection.close();} catch (SQLException e) {e.printStackTrace();}}if (preparedStatement != null){try {preparedStatement.close();} catch (SQLException e) {e.printStackTrace();}}if (resultSet != null){try {resultSet.close();} catch (SQLException e) {e.printStackTrace();}}}return function.apply(null);}
}

http://www.lryc.cn/news/138740.html

相关文章:

  • 2023年你需要知道的最佳预算Wi-Fi路由器清单
  • Go语言基础之流程控制
  • Git 安装、配置并把项目托管到码云 Gitee
  • C++信息学奥赛1147:最高分数的学生姓名
  • STM32使用PID调速
  • 【UE5:CesiumForUnreal】——3DTiles数据属性查询和单体高亮
  • 无涯教程-PHP - 返回类型声明
  • DOS常见命令
  • Qt应用开发(拓展篇)——示波器/图表 QCustomPlot
  • 【精度丢失】后端接口返回的Long类型参数,不同浏览器解析出的结果不一样
  • 2023年国赛 高教社杯数学建模思路 - 案例:感知机原理剖析及实现
  • java-红黑树
  • vue2 vue中的常用指令
  • AI驱动下的智能制造:工业自动化的新纪元
  • docker 命令
  • 2023年高教社杯数学建模思路 - 复盘:光照强度计算的优化模型
  • 生成式人工智能的潜在有害影响与未来之路(二)
  • 如何自己实现一个丝滑的流程图绘制工具(三)自定义挂载vue组件
  • UNIAPP调用API接口
  • 理解 Delphi 的类(五) - 认识类的继承
  • mybatis概述及搭建
  • DNDC模型---土壤碳储量、温室气体排放、农田减排、土地变化、气候变化中的应用
  • Android studio 2022.3.1 鼠标移动时不显示快速文档
  • 五度易链最新“产业大数据服务解决方案”亮相,打造数据引擎,构建智慧产业!
  • 简述hive环境搭建
  • 小米AI音箱联网升级折腾记录(解决配网失败+升级失败等问题)
  • tensorRT安装
  • 电脑重装+提升网速
  • Modelica由入门到精通—为什么要学习Modelica语言
  • opencv 进阶20-随机森林示例