当前位置: 首页 > news >正文

Flink之JDBC Sink

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例,这里代码分为两种,事务和非事务

  • 非事务代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;import java.sql.PreparedStatement;
import java.sql.SQLException;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 构建jdbc sinkSinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句new JdbcStatementBuilder<CustomizeBean>() {@Overridepublic void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withBatchSize(10) // 批次大小,条数.withBatchIntervalMs(5000) // 批次最大等待时间.withMaxRetries(1) // 重复次数.build(), // 写入参数配置new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("com.mysql.jdbc.Driver").withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false").withUsername("root").withPassword("password").build() // jdbc信息配置);// 添加jdbc sinkcustomizeSource.addSink(jdbcSink);env.execute();}
}
  • 事务代码
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.apache.flink.connector.jdbc.JdbcExactlyOnceOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.function.SerializableSupplier;import javax.sql.XADataSource;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/2* @Description: 测试**/
public class FlinkJdbcSink {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());// 每20秒作为checkpoint的一个周期env.enableCheckpointing(20000);// 两次checkpoint间隔最少是10秒env.getCheckpointConfig().setMinPauseBetweenCheckpoints(10000);// 程序取消或者停止时不删除checkpointenv.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// checkpoint必须在60秒结束,否则将丢弃env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只能有一个checkpointenv.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置EXACTLY_ONCE语义,默认就是这个env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// checkpoint存储位置env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 构建ExactlyOne sink,要注意使用exactlyOnceSink需要开启checkpointSinkFunction<CustomizeBean> exactlyOneJdbcSink = JdbcSink.exactlyOnceSink("insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句(JdbcStatementBuilder<CustomizeBean>) (pStmt, customizeBean) -> {pStmt.setString(1, customizeBean.getName());pStmt.setInt(2, customizeBean.getAge());pStmt.setString(3, customizeBean.getGender());pStmt.setString(4, customizeBean.getHobbit());}, // 字段映射配置,这部分就和常规的java api差不多了JdbcExecutionOptions.builder().withMaxRetries(0) // 设置重复次数.withBatchSize(25) // 设置批次大小,数据条数.withBatchIntervalMs(1000) // 批次最大等待时间.build(),JdbcExactlyOnceOptions.builder()// 这里使用的mysql,所以要将这个参数设置为true,因为mysql不支持一个连接上开启多个事务,oracle是支持的.withTransactionPerConnection(true).build(),(SerializableSupplier<XADataSource>) () -> {// XADataSource 就是JDBC连接,不同的是它是支持分布式事务的连接MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();mysqlXADataSource.setUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false"); // 设置urlmysqlXADataSource.setUser("root"); // 设置用户mysqlXADataSource.setPassword("password"); // 设置密码return mysqlXADataSource;});// 添加jdbc sinkcustomizeSource.addSink(exactlyOneJdbcSink);env.execute();}
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 --><!-- JDBC connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.
http://www.lryc.cn/news/111067.html

相关文章:

  • lifecycleScope Unresolved reference
  • P5960 【模板】差分约束算法
  • VSCode---通过ctrl+鼠标滚动改变字体大小
  • 视频监控汇聚平台EasyCVR视频分享页面WebRTC流地址播放不了是什么原因?
  • Libevent开源库的介绍与应用
  • 【LNMP】LNMP
  • uniapp自定义头部导航栏
  • Django实现音乐网站 ⑹
  • dubbo-helloworld示例
  • 电脑ADB连接手机的方式通过网络无法adb连接手机的问题(已解决)
  • 79 | Python数据分析篇 —— Pandas中groupby聚合操作和透视表基础
  • iOS 搭建组件化私有库
  • 迅为全国产龙芯3A5000电脑运行统信UOS、银河麒麟、loongnix系统
  • 枫叶时代:打造中国特色的传统文化IP
  • 一条sql语句在mysql中如何执行(查询+更新)
  • 漫画 | TCP/IP之大明邮差
  • Zookeeper和Nacos的区别
  • O3DE的Pass
  • 如何建立含有逻辑删除字段的唯一索引
  • C语言基础知识点一
  • Python 潮流周刊#14:Lpython 高性能编译器、Python 与 JavaScript 实现互通
  • JVM深入 —— JVM的体系架构
  • dialog => :before-close的属性应用
  • <van-empty description=““ /> 滚动条bug
  • 使用swiper实现图片轮播功能
  • Qt应用开发(基础篇)——时间类 QDateTime、QDate、QTime
  • Modbus TCP转Profinet网关modbus tcp转以太网
  • 笔记 | P4387 【深基15.习9】验证栈序列 题解
  • PyTorch中nn-XXX与F-XXX的区别
  • zookeeper集群和kafka的相关概念就部署