当前位置: 首页 > news >正文

2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)

1、JDBC SQL 连接器

FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

添加Maven依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>3.1.0-1.17</version>
</dependency>

注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

 相关jar可以通过官网下载:JDBC SQL 连接器 


2、读取 MySQL

FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01/flink','driver' = 'com.mysql.jdbc.Driver',  -- 【可选】不设置时,将自动从url中推导'username' = 'xxxx','password' = 'xxxx','table-name' = 'books'
);-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;

运行结果:


3、写入MySQL

3.1 何时批量写入MySQL呢?

FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :

# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;

设置 flush 前缓存记录的最大值 、flush 间隔时间:

-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'xxxx','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);

使用说明:

FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

        当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

        当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


3.2 sink mysql table 中主键的作用

在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

否则连接器将以 append 模式工作

         upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                               使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

       append 模式:对MySQL库中底表做insert操作

 upsert 模式:

-- TODO 创建MySQL 表
CREATE TABLE `books` (`id` int(11) NOT NULL,`title` varchar(99) DEFAULT NULL,`author` varchar(99) DEFAULT NULL,`price` double DEFAULT NULL,`qty` int(11) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT,PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

append 模式:

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (`id` INT,`title` STRING,`author` STRING,`price` DOUBLE,`qty` INT
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8','username' = 'root','password' = 'xxx','table-name' = 'books','sink.buffer-flush.max-rows' = '0' -- 实时写入
);-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES(5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

insert into 失败:

http://www.lryc.cn/news/197442.html

相关文章:

  • Flink日志收集到数据库/kafka
  • Go项目踩坑:go get下载超时,goFrame框架下的go项目里将vue项目的dist同步打包发布,go项目打包并压缩
  • DataCon【签到题】挖矿流量检测
  • Vivado详细使用教程 | LED闪烁示例
  • 一些经典的神经网络(第17天)
  • Hadoop-HA-Hive-on-Spark 4台虚拟机安装配置文件
  • Hutool工具类参考文章
  • 【 Python ModuleNotFoundError: No module named ‘xxx‘可能的解决方案大全】
  • eclipse 配置selenium环境
  • 数据挖掘(6)聚类分析
  • 在启智平台上安装anconda
  • 棒球省队建设实施办法·棒球1号位
  • 架构案例2017(五十二)
  • 给四个点坐标计算两条直线的交点
  • 从入门到进阶 之 ElasticSearch SpringData 继承篇
  • 中文编程开发语言工具编程案例:计时计费管理系统软件连接灯控器编程案例
  • YOLOv7改进:动态蛇形卷积(Dynamic Snake Convolution),增强细微特征对小目标友好,实现涨点 | ICCV2023
  • 从文心大模型4.0与FuncGPT:用AI为开发者打开新视界
  • Nginx集群负载均衡配置完整流程
  • 如何生成SSH服务器的ed25519公钥SHA256指纹
  • 设计模式:抽象工厂模式(C#、JAVA、JavaScript、C++、Python、Go、PHP)
  • ocpp-远程启动(RemoteStartTransaction)、远程停止(RemoteStopTransaction)
  • 【网络安全】安全的系统配置
  • conda使用一般步骤
  • 如何做好需求收集?方法和步骤
  • SpringBoo整合WebSocket实战演练——Java入职十三天
  • 众佰诚:抖音小店的体验分什么时候更新
  • 详解cv2.addWeighted函数【使用 OpenCV 添加(混合)两个图像-Python版本】
  • 单链表经典OJ题:反转链表
  • 软考高级信息系统项目管理师系列论文六:论信息系统项目的人力资源管理