当前位置: 首页 > news >正文

FlinkCDC快速搭建实现数据监控

引入依赖

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sand</groupId><artifactId>flinkcdc</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>Flink Quickstart Job</name><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.17.1</flink.version><!--        <flink.version>1.14.4</flink.version>--><target.java.version>1.8</target.java.version><scala.binary.version>2.12</scala.binary.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><log4j.version>2.17.1</log4j.version></properties><repositories><repository><id>apache.snapshots</id><name>Apache Development Snapshot Repository</name><url>https://repository.apache.org/content/repositories/snapshots/</url><releases><enabled>false</enabled></releases><snapshots><enabled>true</enabled></snapshots></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-streaming-java_2.12</artifactId>--><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><!--        <dependency>--><!--            <groupId>org.apache.flink</groupId>--><!--            <artifactId>flink-scala_2.12</artifactId>--><!--            <version>${flink.version}</version>--><!--        </dependency>--><dependency><groupId>org.apache.flink</groupId><!--            <artifactId>flink-clients_2.12</artifactId>--><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-core</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.12</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch7 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7</artifactId><version>3.0.1-1.17</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.4.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.1-1.17</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.32</version></dependency><!-- 打印日志的jar包 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.30</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.16</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.17.2</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.30</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.10</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><createDependencyReducedPom>false</createDependencyReducedPom><artifactSet><excludes><exclude>org.apache.flink:flink-shaded-force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.sand.DataStreamJob</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.1.1,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build>
</project>

数据库配置类

package com.sand;import org.apache.commons.collections.CollectionUtils;import java.util.Arrays;
import java.util.List;
import java.util.StringJoiner;/*** @author zdd*/
public class CDCKit {public static void main(String[] args) {String tempDir = System.getProperty("java.io.tmpdir");System.out.println("tempDir = " + tempDir);}/*** 数据库*/private static final String database = "byyy_iowtb_wms_test";/*** 表名*/private static final List<String> tableList = Arrays.asList("inv_tt_stock_info","base_tm_sku","base_tm_third_sku_certificate","base_tm_sku_gsp");/*** ip*/private static final String hostname = "192.168.111.107";/*** 端口*/private static final int port = 3306;/*** 用户名*/private static final String username = "test_cdc";/*** 密码*/private static final String password = "Test_cdc@123";public static String getDatabase() {return database;}public static String getTableList() {if (CollectionUtils.isEmpty(tableList)) {return null;}//,分割StringJoiner stringJoiner = new StringJoiner(",");for (String tableName : tableList) {stringJoiner.add(getDatabase() + "." + tableName);}return stringJoiner.toString();}public static String getHostname() {return hostname;}public static int getPort() {return port;}public static String getUsername() {return username;}public static String getPassword() {return password;}}

监控类

package com.sand;import cn.hutool.core.io.FileUtil;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.io.File;
import java.util.Objects;
import java.util.Properties;public class DataStreamJob {public static void main(String[] args) throws Exception {//获取临时文件目录String tempDir = System.getProperty("java.io.tmpdir");String latestCheckpoint = getLatestCheckpoint();System.out.println("latestCheckpoint = " + latestCheckpoint);Configuration configuration = new Configuration();if(StringUtils.isNotBlank(latestCheckpoint)){configuration.setString("execution.savepoint.path", "file:///" + latestCheckpoint);}StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);env.setParallelism(1);//2.1 开启 Checkpoint,每隔 60 秒钟做一次 CKenv.enableCheckpointing(1000L * 60);//2.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//2.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//2.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//2.5 设置状态后端env.setStateBackend(new FsStateBackend("file:///" + tempDir + "ck"));// ck 设置env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);Properties properties = new Properties();properties.setProperty("snapshot.locking.mode", "none");properties.setProperty("decimal.handling.mode", "string");MySqlSource<String> sourceFunction = MySqlSource.<String>builder().hostname(CDCKit.getHostname()).port(CDCKit.getPort()).databaseList(CDCKit.getDatabase()).tableList(CDCKit.getTableList()).username(CDCKit.getUsername()).password(CDCKit.getPassword()).scanNewlyAddedTableEnabled(true).deserializer(new JsonDebeziumDeserializationSchema()).startupOptions(StartupOptions.initial()).debeziumProperties(properties).build();//4.使用 CDC Source 从 MySQL 读取数据env.fromSource(sourceFunction, WatermarkStrategy.noWatermarks(), "mysql-source").addSink(new MysqlSink());//5.打印数据
//        mysqlStream.print();//6.执行任务env.execute();}private static String getLatestCheckpoint() {File ckDir = new File(System.getProperty("java.io.tmpdir") + "ck");File[] files = ckDir.listFiles();if (files == null) {return null;}String path = null;long lastModified = 0;for (File file : files) {//获取文件夹下-chk-开头文件夹-最新的文件夹if (file.isDirectory()) {File[] files1 = file.listFiles();if (files1 == null) {continue;}for (File file1 : files1) {if (!file1.isDirectory() || !file1.getName().startsWith("chk-")) {continue;}if (file1.lastModified() > lastModified) {lastModified = file1.lastModified();path = file1.getAbsolutePath();}}}}//删除其余目录if (StringUtils.isEmpty(path)) {return null;}String tempPath = path.substring(0, path.lastIndexOf("\\"));for (File file : files) {if (file.isDirectory() && !Objects.equals(file.getAbsolutePath(), tempPath)) {FileUtil.del(file);}}return path;}
}

数据处理类

package com.sand;/*** @author zdd*/
public class MysqlSink implements org.apache.flink.streaming.api.functions.sink.SinkFunction<String> {@Overridepublic void invoke(String value, org.apache.flink.streaming.api.functions.sink.SinkFunction.Context context) throws Exception {System.out.println("value = " + value);}
}
http://www.lryc.cn/news/313502.html

相关文章:

  • 应急布控球远程视频监控方案:视频监控平台EasyCVR+4G/5G应急布控球
  • 3.6 C语言和汇编语言混合编程 “每日读书”
  • 利用“定时执行专家”循环执行BAT、VBS、Python脚本——含参数指定功能
  • 【算法集训】基础算法:模拟
  • 基于SSM的房客源信息管理系统设计与实现
  • 常见数据类型
  • 基于vue的联通积分商城数据可视化APP设计与实现
  • 2024年flink面试真题(一)
  • Java面试挂在线程创建后续,不要再被八股文误导了!创建线程的方式只有1种
  • JavaEE面试题
  • 探索macOS上的最佳MySQL客户端工具
  • [Android] MediaPlayer SDK API glance
  • 原始手写helloworld并打jar包允许
  • maven 的安装与配置(Command ‘mvn‘ not found)修改配置文件后新终端依旧无法识别到 mvn 命令
  • Pycharm无法粘贴外部文本问题
  • 学习Java的第四天
  • 【Javaweb】【瑞吉外卖】登录功能plus--拦截器filterinterceptors实现
  • 关于 Runes 协议及「公开铭刻」发行机制的拓展讨论
  • chkdsk修复会造成文件丢失吗?chkdsk数据丢失还能恢复吗
  • Hypermesh2019快捷键大全
  • CSS布局——Flexbox基础使用
  • Kubernetes(K8s):容器化应用的航空母舰
  • Java配置49-nginx 反向代理 sftp 服务器
  • Qt添加VTK并绘制图形
  • VsCode搭建Spring Boot项目环境
  • (黑马出品_05)SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式
  • window mysql 安装出现的问题
  • 【3GPP】【核心网】【5G】5G核心网协议解析(二)(超详细)
  • 物联网带来的六大运营挑战
  • 【ETCD】简介安装常用操作---图文并茂详细讲解