当前位置: 首页 > news >正文

Flink SQL 基础操作

Flink SQL是建立在Apache Flink之上的SQL处理引擎,它允许用户以SQL的方式处理流数据和批数据。以下是一些Flink SQL的基础操作:

一、环境准备

1.启动flink集群

./start-cluster.sh
  1. 启动sql-client
./sql-client.sh

二、数据源定义

  1. 创建表(Source):
  • 使用CREATE TABLE语句定义输入数据源,包括其schema、存储格式(如CSV、JSON等)以及连接器的配置(如Kafka、FileSystem等)。
  • 示例:
CREATE TABLE students (  id STRING,  name STRING,  age INT,  sex STRING,  clazz STRING  
) WITH (  'connector' = 'kafka',  'topic' = 'students',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'csv'  
);

三、数据处理

  1. 编写SQL查询:
  • 使用标准的SQL语句对数据进行查询、过滤、聚合等操作。
  • 示例:
SELECT id, name, age  
FROM students  
WHERE age > 18;

四、数据输出

  1. 创建表(Sink):
  • 使用CREATE TABLE语句定义输出数据源,用于将处理后的数据写入外部系统,如Kafka、数据库等。
  • 示例:
CREATE TABLE results (  id STRING,  name STRING,  age INT  
) WITH (  'connector' = 'kafka',  'topic' = 'results',  'properties.bootstrap.servers' = 'localhost:9092',  'format' = 'csv'  
);
  1. 插入数据:
  • 使用INSERT INTO语句将查询结果写入Sink表。
  • 示例:
INSERT INTO results  
SELECT id, name, age  
FROM students  
WHERE age > 18;

五、执行与监控

  1. 执行SQL语句:
  • 在Flink SQL客户端或程序中执行SQL语句。
  • 可以通过Flink的Dashboard或其他监控工具来查看作业的执行状态和性能指标。
  1. 结果展示:
  • Flink SQL客户端支持多种结果显示模式,如表格模式、变更日志模式和Tableau模式,可以根据需要设置。

六、其他操作

  1. 动态表:
  • Flink SQL中的表是动态表,支持对流数据的实时查询和处理。
  1. Join操作:
  • Flink SQL支持多种Join方式,包括Regular Joins、Interval Joins、Temporal Joins和Lookup Joins,用于处理表之间的关联查询。
  1. 窗口函数:
  • Flink SQL支持窗口函数,用于对时间序列数据进行分组和聚合操作。

注意事项

  • 在进行Flink SQL操作时,需要确保已经正确配置了Flink环境,并且已经添加了必要的依赖库。
  • Flink SQL的语法和功能可能会随着Flink版本的更新而发生变化,因此建议查阅最新的官方文档以获取准确的信息。

样例操作

1、 从csv中读取数据

CREATE TABLE well_casting_alarm (_id VARCHAR,comCode VARCHAR,wellCode VARCHAR,uuid VARCHAR,type INT,alarmType INT,alarmGrade INT,zp INT,startAlarmTime TIME,startAlarmValue DECIMAL,threshold INT,warnStatus INT,isDeal INT,createTime TIME,_class VARCHAR
) WITH ( 'connector' = 'filesystem','path' = '/wfg/data/sjzz.wellCastingAlarm0606.csv','format' = 'csv'
);

2、查看所有表

Flink SQL> show tables;
+----------------------+
|           table name |
+----------------------+
| employee_information |
|   well_casting_alarm |
+----------------------+
2 rows in set

3、删除表

DROP TABLE well_casting_alarm;

4、查询数据

select *from well_casting_alarm limit 1;

5、删除一条数据

DELETE FROM well_casting_alarm where '_id'='_id';
http://www.lryc.cn/news/419351.html

相关文章:

  • 海思AE模块Lines_per_500ms参数的意义
  • 【代码随想录】区间和——前缀和方法
  • Bug 解决 | 前端项目无法正确安装依赖?
  • 【mysql 第四篇章】bin log 的作用是啥呢?
  • Linux 操作系统:基于环形队列的生产者消费者模型
  • python求解二次方程
  • Spring框架面试总结
  • java之网络编程篇
  • stm32f103c8t6与TB6612FNG解耦测试
  • 2253336 - 资源库 - OAC0 中的脱机状态
  • uni-app总结
  • 【JavaEE初阶】线程安全的集合类
  • 关于Vue项目npm快捷键,点击run启动报错,及npm i也报错的解决办法
  • React中,className属性自定义组件不生效的问题
  • Ubuntu22.04搭建fabric开发环境、开发环境下运行链码
  • [BSidesCF 2019]Kookie1
  • LCM红外小目标检测
  • 振德医疗选择泛微千里聆RPA,助力电商、人事业务流程自动化
  • VBA高级应用30例应用3在Excel中的ListObject对象:创建表
  • IP 地址在 SQL 注入攻击中的作用及防范策略
  • Unity VR黑屏
  • Vue.js 中使用 Watcher 的强大场景和案例
  • 《实现 DevOps 平台(2) · GitLab CI/CD 交互》
  • 【机器学习sklearn实战】岭回归、Lasso回归和弹性网络
  • Python 爬虫项目实战六:抓取猫眼电影排行榜的数据
  • YOLO系列:从yolov1至yolov8的进阶之路 持续更新中
  • 欧拉系统离线安装界面ukui
  • Milvus向量数据库的简介以及用途
  • 恒创科技:IPv4 和 IPv6 之间的主要区别
  • TinyWebserver的复现与改进(1):服务器环境的搭建与测试