当前位置: 首页 > news >正文

0基础学习PyFlink——流批模式在主键上的对比

假如我们将《0基础学习PyFlink——使用PyFlink的Sink将结果输出到外部系统》中的模式从批处理(batch)改成流处理(stream),则其在print连接器上产生的输出是不一样。

批处理

    env_settings = EnvironmentSettings \.new_instance() \.in_batch_mode() \.with_configuration(config) \.build()
# 批处理
+I[A, 3]
+I[B, 1]
+I[C, 2]
+I[D, 2]
+I[E, 1]

流处理

    env_settings = EnvironmentSettings \.new_instance() \.in_streaming_mode() \.with_configuration(config) \.build()
# 流处理
+I[A, 1]
+I[B, 1]
+I[C, 1]
+I[D, 1]
-U[A, 1]
+U[A, 2]
+I[E, 1]
-U[C, 1]
+U[C, 2]
-U[D, 1]
+U[D, 2]
-U[A, 2]
+U[A, 3]

我们看到批处理是一次性的达成了最终计算——只插入了5条数据,且每条数据都是最终结果。
而流处理则是进行了13次操作,其中插入操作5次,删除4次,更新4次。

只有插入操作

Mysql表无主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT
);

Sink表无主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

Sink表有主键

    my_sink_ddl = """CREATE TABLE WordsCountTableSink (`word` STRING,`count` BIGINT,PRIMARY KEY (`word`) NOT ENFORCED) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://127.0.0.1:3306/words_count_db?useSSL=false','table-name' = 'WordsCountTable','driver'='com.mysql.jdbc.Driver','username'='admin','password'='pwd123');"""

则对于只有插入操作的Batch模式,不管Sink表有没有主键,每次程序执行时都会插入新数据。比如我们执行两次批处理模式代码,则可以看到5的2倍=10条数据。

select * from WordsCountTable;
+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这个很好理解。

Mysql表有主键

CREATE TABLE WordsCountTable (word varchar(255) NOT NULL,count BIGINT,PRIMARY KEY (word)
);

Sink表无主键

因为word成为主键,不可以重复。第一次执行插入操作时成功了

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

但是第二次执行时,会因为主键冲突报错:

Caused by: java.sql.SQLIntegrityConstraintViolationException: Duplicate entry ‘E’ for key ‘WordsCountTable.PRIMARY’

Sink表有主键

因为Mysql和Sink表里主键一致,不管执行多少次程序,都不会产生多余的数据。

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

有删除和更新操作

在流模式中我们看到,流处理处理有插入操作外,还有其他操作。我们再对比下它们的表现。

Sink表无主键

Mysql表无主键

Mysql有无主键

因为流模式删除和更新操作需要通过主键来寻找对象,所以会报如下错误

java.lang.IllegalStateException: please declare primary key for sink table when query contains update/delete record.

Sink表有主键

由于Sink表设置了主键,于是流模式产生的更新和删除操作可以通过其找到对应项,就不会报错。

Mysql表无主键

由于Mysql表没有主键,导致每次执行都会插入一批数据。比如下面是我们执行两次的结果

+------+-------+
| word | count |
+------+-------+
| E    |     1 |
| A    |     3 |
| D    |     2 |
| C    |     2 |
| B    |     1 |
| A    |     3 |
| D    |     2 |
| B    |     1 |
| C    |     2 |
| E    |     1 |
+------+-------+
10 rows in set (0.00 sec)

这从另外一个方面说明:**流模式产生的一系列操作,在Execute环节,最终会对这些操作进行合并,将合并的操作同步给外部系统。**比如之前的流操作实际产生了13个行为,而最终落到数据库里只有5条数据,且第二次操作也是插入了5条新的、最终的数据,这就说明中间的操作在同步给数据库之前已经做了合并处理。

Mysql表有主键

因为Mysql表有主键,Sink过来的操作执行的是“有则更新,无则写入”的模式。
比如我们第一次执行程序时,得到

+------+-------+
| word | count |
+------+-------+
| A    |     3 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

然后我们将数据源中的E改成了A,则这次将出现4个A,但是不会出现E。执行后的结果是

+------+-------+
| word | count |
+------+-------+
| A    |     4 |
| B    |     1 |
| C    |     2 |
| D    |     2 |
| E    |     1 |
+------+-------+
5 rows in set (0.00 sec)

这个实验就证明了,当Sink和Mysql表的主键一致时,执行的是insert on duplicate key update操作。

http://www.lryc.cn/news/206066.html

相关文章:

  • Java学习笔记(五)——数组、排序和查找
  • python输出与数据类型
  • React-Redux总结含购物车案例
  • 攻克组合优化问题!美国DARPA选中全栈量子经典计算公司Rigetti
  • Kafka - 深入了解Kafka基础架构:Kafka的基本概念
  • [Docker]二.Docker 镜像,仓库,容器介绍以及详解
  • 软考高级系统架构设计师系列之:案例分析典型试题一
  • 2023年5个美国代理IP推荐,最佳代理花落谁家?
  • github.com/holiman/uint256 源码阅读
  • 排序-表排序
  • 勒索病毒最新变种.locked1勒索病毒来袭,如何恢复受感染的数据?
  • 信号补零对信号频谱的影响
  • 【Gan教程 】 什么是变分自动编码器VAE?
  • T113-S3-buildroot文件系统tar解压缩gz文件
  • 软件测试面试题:压测时,QPS一直上不去,如何排查?
  • 探索JavaScript ES6+新特性
  • Elasticsearch常见错误
  • mysql源码编译安装
  • On Moving Object Segmentation from Monocular Video with Transformers 论文阅读
  • [AutoSar NVM] 存储架构
  • ES10 新特性
  • 宝塔安装脚本
  • gulp打包vue3+jsx+less插件
  • 华为ICT——第四章深度学习和积卷神经
  • MongoDB 学习笔记(基础)
  • 【TGRS 2023】RingMo: A Remote Sensing Foundation ModelWith Masked Image Modeling
  • 性能测试 —— 生成html测试报告、参数化、jvm监控
  • 堆(二叉树,带图详解)
  • vue3 code format bug
  • 7-3、S曲线生成器【51单片机控制步进电机-TB6600系列】