当前位置: 首页 > news >正文

【Flink】关于jvm元空间溢出,mysql binlog冲突的问题解决

问题一:7张表是同一个mysql中的,我们进行增量同步时分别用不同的flink任务读取,造成mysql server-id冲突问题,如下:

在这里插入图片描述

Caused by: io.debezium.DebeziumException: A slave with the same server_uuid/server_id as this slave has connected to the master; the first event ‘’ at 4, the last event read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118, the last byte read from ‘/home/mysql/log/mysql/mysql-bin.003630’ at 62726118. Error code: 1236; SQLSTATE: HY000.

问题分析:主要是多个任务都读取的同一个binlog造成serverid冲突;

网上有设置server-id的方法,这个是可以解决的,但是需要分方案来谈;
例如:我们是需要一个任务通用的(7张表重复提交7次,更改flink传参)
这种就不适合我们,因为每次的任务都是同一个jar,读取一段时间后还是会报错;

解决方案:其实我们可以把在同一个mysql库里面的表放到同一个source里面;

解决代码:我们是mysql同步到starrocks里面,使用的sr官方的sink function,不同sink的可以参考这个源代码;

官方是推荐采用StarRocksSink.sink(options)这种方式,我们查看源码,其实是采用的v2这一个fun,点进去发现,sr官方对数据进行了处理,只需要匹配对应的类型即可;
在这里插入图片描述
所以我们只需要对mysqlSource进行一个算子转换即可,关键代码如下:

MySqlSourceBuilder<String> builder = new MySqlSourceBuilder<>();MySqlSource<String> mySqlSource = builder.hostname(srcHost).port(3306).databaseList(srcDb)// 格式 db.table,db.table2.......tableList(srcTable).username(srcUsername).password(srcPassword).jdbcProperties(jbdcProperties).debeziumProperties(properties)// 这里反序列化我进行了StarRocks增删类型处理,具体可以看我这片文章https://blog.csdn.net/JGMa_TiMo/article/details/128327546.deserializer(jsonStringDebeziumDeserializationSchema).serverId("5400-6400").build();DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.forMonotonousTimestamps(), "[<< job: >>" + propKey + "]");
//                .setParallelism(parallelism);// todo 这里是关键地方,我们反序列化只能返回flink认可的类型,一般都是string,这里转换成上面sr可以处理的对象 StarRocksSinkRowDataWithMetaSingleOutputStreamOperator<StarRocksSinkRowDataWithMeta> streamOperator = streamSource.flatMap(new FlatMapFunction<String, StarRocksSinkRowDataWithMeta>() {@Overridepublic void flatMap(String value, Collector<StarRocksSinkRowDataWithMeta> collector) throws Exception {HashMap hashMap = JsonUtils.parseObject(value, HashMap.class);StarRocksSinkRowDataWithMeta sinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta();sinkRowDataWithMeta.addDataRow(value);assert hashMap != null;sinkRowDataWithMeta.setTable(hashMap.get("__table").toString());sinkRowDataWithMeta.setDatabase(sinkDb);collector.collect(sinkRowDataWithMeta);}}).name("Data Filtering");StarRocksSinkOptions sinkOptions = StarRocksSinkOptions.builder().withProperty("jdbc-url", "jdbc:mysql://" + sinkHost + ":9030?characterEncoding=utf-8&useSSL=false&connectionTimeZone=Asia/Shanghai").withProperty("load-url", sinkHost + ":8030").withProperty("database-name", sinkDb).withProperty("username", sinkUsername).withProperty("password", sinkPassword)// 这里的设置会被多srcTable覆盖.withProperty("table-name", "").withProperty("sink.properties.format", "json").withProperty("sink.properties.strip_outer_array", "true").build();// 这里查看sr sink源码实际使用的是这个fun,我们不要使用SinkFunctionFactory生成:会泛型不支持streamOperator.addSink(new StarRocksDynamicSinkFunctionV2<>(sinkOptions)).name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());
//        streamOperator.addSink(StarRocksSink.sink(sinkOptions))
//                .name(">>>StarRocks " + propKey + " Sink<<<").uid(UUID.randomUUID().toString());env.execute(propKey + "<< stream sync job >>" + srcHost + srcDb);

解读:原理就是我们把原来7张在一个数据库的表放到一个flink source中读取,在指定传输到那个starrocks表时,官方已经实现了代码支持,我们只需要增加一个flink算子转换成sink支持的对象即可,(关联一个source对应多个sink解决思路)

问题二:我们是采用datastream api开发的flink任务,在web-ui界面提交任务,造成taskManager的jvm Metaspace一直增长直到节点挂掉。

报错就不贴了,就是taskmanager会自动挂掉,查看tm的日志是oom异常:jvm metaspace溢出;

问题分析:我们采用web-ui多次提交flink任务时,flink是动态类加载,所以不会释放上一个jar的元空间,才会造成jvm垃圾不回收;

可以看官方的issues:
https://issues.apache.org/jira/browse/FLINK-11205
https://issues.apache.org/jira/browse/FLINK-16408

问题解决:把jar放到flink/lib目录下就可以了,这样flink会优先加载父加载器中的类;

这里需要注意和以前的jar会造成版本冲突,具体解决你可以根据报错信息慢慢调试,我这里贴一个我的环境信息:
当前环境中的lib包
在这里插入图片描述
我增加的lib包
在这里插入图片描述
下面是我的jar包依赖,打完包放到lib中即可

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.15.3</flink.version><flink.connector.sr>1.2.5_flink-1.15</flink.connector.sr><flink.connector.mysql>2.3.0</flink.connector.mysql><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><!-- 基础包 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>${flink.connector.sr}</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>${flink.connector.mysql}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-test-utils</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.26</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>2.0.4</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.3.4</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>connect-api</artifactId><version>2.7.1</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-hadoop-2</artifactId><version>2.8.3-10.0</version></dependency><dependency><groupId>commons-cli</groupId><artifactId>commons-cli</artifactId><version>1.5.0</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.15</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>5.3.26</version></dependency><dependency><groupId>org.yaml</groupId><artifactId>snakeyaml</artifactId><version>2.0</version></dependency></dependencies>

问题二:最后就是在web-ui提交任务,你也可以用命令行,这里我用的源码包,就是我开发的实际编写代码,就几十K

在这里插入图片描述
最后如果解决了你的问题,请点个赞吧在这里插入图片描述

http://www.lryc.cn/news/156422.html

相关文章:

  • C#常用多线程(线程同步,事件触发,信号量,互斥锁,共享内存,消息队列)
  • OpenWrt系统开发笔记
  • 实战 - Restful APi 格式规范
  • 《Linux从练气到飞升》No.21 Linux简单实现一个shell
  • 【iVX】iVX的低代码未来发展趋势:加速应用开发的创新之路
  • zookee 安装
  • OpenWrt编译自己的应用程序
  • MySQL 50 题。
  • 强化学习算法总结 (1)
  • Qt应用开发(基础篇)——向导对话框 QWizard
  • Python类的方法
  • 变电站自动化监控系统
  • MySql学习笔记11——DBA命令介绍
  • Webpack 复习小结
  • Laravel chunk和chunkById的坑
  • 从零开始学习 Java:简单易懂的入门指南之泛型及set集合(二十二)
  • JVM----GC(垃圾回收)详解
  • 数据库的三个范式
  • 谷歌浏览器打开白屏 后台还有还有很多google chrome进程在运行
  • Java EE 突击 15 - Spring Boot 统一功能处理
  • JasperReport定义变量后打印PDF变量为null以及整个pdf文件为空白
  • Python 及 Pycharm 的安装 2023.8
  • java中的线程中断
  • 【跟小嘉学 Rust 编程】二十三、Cargo 使用指南
  • R Removing package报错(as ‘lib’ is unspecified)
  • 金融信创,软件规划需关注自主安全及生态建设
  • 无重叠区间【贪心算法】
  • nlp系列(7)实体识别(Bert)pytorch
  • Uniapp学习之从零开始写一个简单的小程序demo(新建页面,通过导航切换页面,发送请求)
  • uniapp微信小程序隐私保护引导新规