当前位置: 首页 > news >正文

Flink SQL

  1. 进入 JobManager 容器

    docker exec -it 21442d9ca797 /bin/bash
  2. 启动 Flink 的 SQL 客户端

    /opt/flink/bin/sql-client.sh embedded
  3. 尝试创建 Kafka 表

     

    在启动的 SQL 客户端中,尝试创建一个 Kafka 表,看看是否能够成功:

    CREATE TABLE test_kafka_table (message STRING
    ) WITH ('connector' = 'kafka','topic' = 'test_topic','properties.bootstrap.servers' = '110.40.130.231:9092','format' = 'json'
    );

    如果没有报错,说明 Kafka 连接器已成功加载。


以下是一个使用 Flink SQL 从 Kafka 读取数据、进行简单聚合计算、并将结果写入 MySQL 和 HDFS 的示例。这个示例假设你已经安装并配置好了 Flink、Kafka、MySQL 和 HDFS。

1. 从 Kafka 读取数据

首先,创建一个 Kafka 表来定义数据源。假设 Kafka 主题名为 user_behavior,包含用户行为数据,每条消息格式为 JSON,包含字段 user_id, item_id, category_id, behavior, ts (时间戳)。

CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3),proctime AS PROCTIME(), -- 添加处理时间列WATERMARK FOR ts AS ts - INTERVAL '5' SECOND -- 设置水印,允许5秒延迟
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','format' = 'json','scan.startup.mode' = 'latest-offset'
);

2. 进行简单的聚合计算

接下来,对用户行为数据进行简单的聚合计算,例如按类别统计每分钟的行为次数。

CREATE VIEW behavior_count AS
SELECTcategory_id,TUMBLE_START(ts, INTERVAL '1' MINUTE) as window_start,COUNT(*) as behavior_count
FROM user_behavior
GROUP BY category_id, TUMBLE(ts, INTERVAL '1' MINUTE);

使用了 TUMBLE 函数来创建滚动窗口,按每分钟对数据进行分组,并计算每个类别的行为次数。

3. 将处理后的数据写入 MySQL

为了将上述聚合结果写入 MySQL,首先创建一个 MySQL 表。

CREATE TABLE behavior_summary (category_id BIGINT,window_start TIMESTAMP(3),behavior_count BIGINT,PRIMARY KEY (category_id, window_start) NOT ENFORCED
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://localhost:3306/mydatabase','table-name' = 'behavior_summary','username' = 'myuser','password' = 'mypassword'
);

然后,可以INSERT INTO 语句将数据插入到 MySQL 表中。

INSERT INTO behavior_summary
SELECT * FROM behavior_count;

4. 将处理后的数据写入 HDFS

如果想将数据写入 HDFS,先创建一个 HDFS 表。

CREATE TABLE behavior_summary_hdfs (category_id BIGINT,window_start TIMESTAMP(3),behavior_count BIGINT
) WITH ('connector' = 'filesystem','path' = 'hdfs://localhost:9000/user/flink/behavior_summary','format' = 'csv'
);

接着,使用 INSERT INTO 语句将数据写入 HDFS。

INSERT INTO behavior_summary_hdfs
SELECT * FROM behavior_count;

总结

以上步骤展示了如何使用 Flink SQL 从 Kafka 读取数据、进行聚合计算,并将结果分别写入 MySQL 和 HDFS。这是一个基本的流程,根据实际需求,可以调整表结构、连接器配置以及 SQL 查询以适应不同的应用场景。

http://www.lryc.cn/news/481046.html

相关文章:

  • 鸿蒙UI开发——实现环形文字
  • QT版发送邮件程序
  • JavaSE:初识Java(学习笔记)
  • ClickHouse创建分布式表
  • Flink转换算子
  • ThinkBook 14+ 2024 Ubuntu 触控板失效 驱动缺失问题解决
  • 【青牛科技】应用方案 | D75xx-150mA三端稳压器
  • WPF之iconfont(字体图标)使用
  • 08、Java学习-面向对象中级:
  • springboot集成onlyoffice(部署+开发)
  • LabVIEW编程基础教学(二)--数据类型
  • 「Mac畅玩鸿蒙与硬件29」UI互动应用篇6 - 多选问卷小应用
  • Flutter中文字体设置指南:打造个性化的应用体验
  • git下载慢下载不了?Git国内国外下载地址镜像,git安装视频教程
  • 安卓属性动画插值器(Interpolator)详解
  • OSPF总结
  • Spring Boot驱动的多维分类知识管理系统
  • CSS教程(七)- 背景
  • PNG图片批量压缩exe工具+功能纯净+不改变原始尺寸
  • 【双十一特惠】腾讯云省钱攻略:如何智取云计算资源
  • 爬虫学习8
  • 双指针算法的妙用:提高代码效率的秘密(2)
  • 笔记--(网络3)、交换机、VLAN
  • 昇思大模型平台打卡体验活动:基于MindSpore实现GPT1影评分类
  • 如何调整pdf的页面尺寸
  • IDA*算法 Power Calculus————poj 3134
  • 重磅!CoRL 2024顶刊会议 清华大学高阳研究组发布“基于大模型先验知识的强化学习”
  • 泷羽sec学习打卡-Windows基础命令
  • RTC精度及校准
  • jQuery案例