当前位置: 首页 > news >正文

Flink SQL 常用作业sql

目录

  • flink sql常用配置
  • kafka source to mysql sink
  • 窗口函数 开窗
  • datagen 自动生成数据表
    • tumble 滚动窗口
    • hop 滑动窗口
    • cumulate 累积窗口
  • grouping sets 多维分析
  • over 函数
  • TopN

flink sql常用配置

设置输出结果格式
SET sql-client.execution.result-mode=tableau;

kafka source to mysql sink

kafka 
topic: bop_log_realtime
数据结构:
{"timestamp":"2023-10-31 14:26:02.528","serverip":"10.13.177.209","level":"INFO","servicename":"bop-fms-query-info","traceid":"","spanid":"","parent":"","message":"Resolving eureka endpoints via configuration"}mysql表:
库名:flink_test
CREATE TABLE `bop_log_realtime_warning` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`serverip` varchar(255) NOT NULL DEFAULT '',`timestamp` varchar(255) NOT NULL DEFAULT '',`level` varchar(255) NOT NULL DEFAULT '',`servicename` varchar(255) NOT NULL DEFAULT '',`traceid` varchar(255) NOT NULL DEFAULT '',`spanid` varchar(255) NOT NULL DEFAULT '',`parent` varchar(255) NOT NULL DEFAULT '',`message` varchar(255) NOT NULL DEFAULT '',`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;CREATE TABLE kafka_log_realtime_json (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH ('connector' = 'kafka','topic' = 'bop_log_realtime','properties.bootstrap.servers' = '10.2.25.221:9092,10.2.25.221:9093','properties.group.id' = 'testGroup2','format' = 'json','scan.startup.mode' = 'latest-offset'
);CREATE TABLE bop_log_realtime_warning (`serverip` STRING,`timestamp` STRING,`level` STRING,`servicename` STRING,`traceid` STRING,`spanid` STRING,`parent` STRING,`message` STRING
) WITH (
'connector' = 'jdbc'
,'url' = 'jdbc:mysql://m3309i.hebe.grid.xx.com.cn:3309/flink_test?zeroDateTimeBehavior=convertToNull&characterEncoding=utf-8&useSSL=false&autoReconnect=true&serverTimezone=Asia/Shanghai'
,'username' = 'super_mis'
,'password' = 'mis_password'
,'table-name' = 'bop_log_realtime_warning'
);insert into bop_log_realtime_warning 
SELECT`serverip` ,`timestamp` ,`level` ,`servicename` ,`traceid` ,`spanid` ,`parent` ,`message` FROM kafka_log_realtime_json;

窗口函数 开窗

datagen 自动生成数据表

CREATE TABLE ws (id INT,vc INT,pt AS PROCTIME(), --处理时间et AS cast(CURRENT_TIMESTAMP as timestamp(3)), --事件时间WATERMARK FOR et AS et - INTERVAL '5' SECOND --watermark
) WITH ('connector' = 'datagen','rows-per-second' = '10','fields.id.min' = '1','fields.id.max' = '3','fields.vc.min' = '1','fields.vc.max' = '100'
);CREATE TABLE sink (id INT,ts BIGINT,vc INT
) WITH ('connector' = 'print'
);

tumble 滚动窗口

滚动窗口 窗口大小5selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;

hop 滑动窗口

滑动窗口 滑动步长5秒 窗口大小10秒
注意:窗口大小=滑动步长的整数倍(底层会优化成多个小滚动窗口)
selectid,sum(vc) vcSum,window_start,window_endfrom table(hop(table ws, descriptor(et), INTERVAL '5' SECOND, INTERVAL '10' SECOND))group by id, window_start, window_end;

cumulate 累积窗口

注意:窗口大小=累积步长的整数倍
selectid,sum(vc) vcSum,window_start,window_endfrom table(CUMULATE(table ws, descriptor(et), INTERVAL '5' SECOND))group by id, window_start, window_end;

grouping sets 多维分析

selectid,sum(vc) vcSum,window_start,window_endfrom table(TUMBLE(table ws, descriptor(et), INTERVAL '5' SECOND))group by window_start, window_end,grouping sets ( (id) );

over 函数

TopN

http://www.lryc.cn/news/219723.html

相关文章:

  • nodejs国内镜像及切换版本工具nvm
  • 用Rust和Scraper库编写图像爬虫的建议
  • Java 语言环境搭建
  • 酷开科技 | 酷开系统里萌萌哒小维在等你!
  • Bash 4关联数组:错误“声明:-A:无效选项”
  • 干货|AI辅助完成论文的正确打开方式!
  • SpringBoot--Web开发篇:含enjoy模板引擎整合,SpringBoot整合springMVC;及上传文件至七牛云;restFul
  • 线上JAVA应用平稳运行一段时间后出现JVM崩溃问题 | 京东云技术团队
  • 进口跨境商城源码:高效、安全、可扩展的电商平台解决方案
  • GEE数据集——2019、2020、2021、2022和2023年全球固定宽带和移动(蜂窝)网络性能Shapefile 格式数据集
  • 什么是防火墙?详解三种常见的防火墙及各自的优缺点
  • 动态规划算法实现0-1背包问题Java语言实现
  • linux查看系统版本
  • pg14-sql基础(四)-多表联查
  • el-date-picker 日期时间选择器 限时时间范围 精确到时分秒
  • 轮廓线dp:GYM103446C
  • 羊驼免疫制备纳米抗体
  • 【AI好好玩02】利用Lama Cleaner本地实现AIGC试玩:擦除对象、替换对象、更换风格等等
  • SQL FULL OUTER JOIN 关键字(完整外部连接)||SQL自连接 Self JOIN
  • 专科医院污水处理设备构造解析及工艺流程
  • 【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制
  • 百万套行泊一体量产定点,中国市场「开启」智驾高低速集成
  • Gopro hero5运动相机格式化后恢复案例
  • Microsoft Dynamics 365 CE 扩展定制 - 6. 增强代码
  • 基于libopenh264 codec的svc分层流实现方案
  • 为机器学习算法准备数据(Machine Learning 研习之八)
  • 基于Python OpenCV的金铲铲自动进游戏、D牌...
  • c++中httplib使用
  • Vite 的基本原理,和 webpack 在开发阶段的比较
  • [开源]免费开源MES系统/可视化数字大屏/自动排班系统