当前位置: 首页 > news >正文

Flink SQL 从一个SOURCE 写入多个Sink端实例

一. 背景

FLINK 任务从一个数据源读取数据, 写入多个sink端.

二. 官方实例

写入多个Sink语句时,需要以BEGIN STATEMENT SET;开头,以END;结尾。

--源表
CREATE TEMPORARY TABLE datagen_source (name VARCHAR,score BIGINT
) WITH ('connector' = 'datagen'
);--结果表A
CREATE TEMPORARY TABLE blackhole_sinkA(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--结果表B
CREATE TEMPORARY TABLE blackhole_sinkB(name VARCHAR,score BIGINT
) WITH ('connector' = 'blackhole' 
);--DML
BEGIN STATEMENT SET;      --写入多个Sink时,必填。
INSERT INTO blackhole_sinkA SELECT UPPER(name), sum(score) FROM datagen_source GROUP BY UPPER(name);
INSERT INTO blackhole_sinkB SELECT LOWER(name), max(score) FROM datagen_source GROUP BY LOWER(name);
END;      --写入多个Sink时,必填。

三. 实操

3.1. 启动Standlone集群

进入到flink引擎包目录, 启动Standlone模式.

./bin/start-cluster.sh

3.2. 启动flink sql-client.

./bin/sql-client.sh embedded

3.3. 执行sql

Flink SQL> CREATE TEMPORARY TABLE datagen_source (
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'datagen'
> );
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkA(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> CREATE TEMPORARY TABLE blackhole_sinkB(
>   name VARCHAR,
>   score BIGINT
> ) WITH (
>   'connector' = 'blackhole'
> );
>
[INFO] Execute statement succeed.Flink SQL> BEGIN STATEMENT SET;
[INFO] Begin a statement set.Flink SQL> INSERT INTO blackhole_sinkA
>   SELECT UPPER(name), sum(score)
>   FROM datagen_source
>   GROUP BY UPPER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> INSERT INTO blackhole_sinkB
>   SELECT LOWER(name), max(score)
>   FROM datagen_source
>   GROUP BY LOWER(name);
[INFO] Add SQL update statement to the statement set.Flink SQL> END;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 37a1390129c356374601a267cb8080b6

3.4. 查看flink ui

查看flink ui页面,验证结论.

http://master01:8081/#/job/37a1390129c356374601a267cb8080b6/overview

在这里插入图片描述

http://www.lryc.cn/news/506325.html

相关文章:

  • python飞机大战游戏.py
  • 【C++】14___String容器
  • 数据特性库 前言
  • jdk和cglib动态代理区别
  • 部署Mysql、镜像和容器、常见命令
  • 【数学】P2671 [NOIP2015 普及组] 求和
  • 【AI图像生成网站Golang】项目测试与优化
  • vue常用自定义指令
  • 以太网帧、IP数据报图解
  • 01.大模型起源与发展
  • leetcode刷题日记03——javascript
  • vue横向滚动日期选择器组件
  • 【大模型】大模型项目选择 RAGvs微调?
  • 2024年12月CCF-GESP编程能力等级认证Python编程一级真题解析
  • 【机器学习】元学习(Meta-learning)
  • 详解Redis的String类型及相关命令
  • android RadioButton + ViewPager+fragment
  • 给机器装上“脑子”—— 一文带你玩转机器学习
  • 论文笔记:是什么让多模态学习变得困难?
  • ChatGPT Search开放:实时多模态搜索新体验
  • Centos7.9 离线安装docker
  • C语言函数在调用过程中具体是怎么和栈互动的?
  • 【Java中常见的异常及其处理方式】
  • 如何更新项目中的 npm 或 Yarn 依赖包至最新版本
  • SpringBoot3整合FastJSON2如何配置configureMessageConverters
  • 《Vue3实战教程》2:Vue3快速上手
  • ubuntu 24.04.1安装FTP流程
  • 多功能护照阅读器港澳通行证阅读机RS232串口主动输出协议,支持和单片机/Linux对接使用
  • 5个用于构建Web应用程序的Go Web框架
  • Qt中的异步相关类