当前位置: 首页 > news >正文

Flink SQL -- CheckPoint

1、开启CheckPoint

checkpoint可以定时将flink任务的状态持久化到hdfs中,任务执行失败重启可以保证中间结果不丢失

# 修改flink配置文件
vim flink-conf.yaml# checkppint 间隔时间
execution.checkpointing.interval: 1min
# 任务手动取消时保存checkpoint
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 同时允许1个checkpoint执行
execution.checkpointing.max-concurrent-checkpoints: 1
execution.checkpointing.min-pause: 0
# 数据处理的语义
execution.checkpointing.mode: EXACTLY_ONCE
# checkpoint超时时间
execution.checkpointing.timeout: 10min
execution.checkpointing.tolerable-failed-checkpoints: 0
execution.checkpointing.unaligned: false
# 状态后端(保存状态的位置,hashmap:内存)
state.backend: hashmap
# checkpoint路径
state.checkpoints.dir: hdfs://master:9000/flink/checkpoint
2、编写一个Flnik SQL 脚本:
vim word_count.sql
-- 实时从kafka中读取单词,统计单词的数量,将结果保存到mysql中-- 1、创建source表
CREATE TABLE words (word STRING
) WITH ('connector' = 'kafka','topic' = 'words', -- 数据的topic'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表'properties.group.id' = 'testGroup', -- 消费者组'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset'format' = 'csv' -- 读取数据的格式
);-- 2、创建sink表
CREATE TABLE word_count (word STRING,num BIGINT,PRIMARY KEY (word) NOT ENFORCED -- 主键
) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://master:3306/student','table-name' = 'word_count', -- 需要手动到mysql中创建表'username' ='root','password' ='123456'
);-- 3、编写sql处理数据将结果保存到sink表中
insert into word_count
select 
word,
count(1) as num
from
words
group by word;
3、使用sq-client.sh -f 启动脚本
sql-client.sh -f word_count.sql
 4、当任务失败的时候再重新启动任务:
-- 1、获取checkpoint的路径
/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23-- 2、再sql脚本中增加参数,增加到sql脚本的insert into 的前面
-- 指定任务会的checkpoint的地址
SET'execution.savepoint.path'='hdfs://master:9000/file/checkpoint/47ee348d8c9edadadfc770cf7de8e7ee/chk-23';-- 3、启动sql任务
sql-client.sh -f word_count.sql

http://www.lryc.cn/news/231652.html

相关文章:

  • Load-balanced-online-OJ-system 负载均衡的OJ系统项目
  • ES6 导入导出
  • 【Liunx】部署Ansible自动化运维工具
  • Python的基础语法
  • Skywalking流程分析_8(拦截器插件的加载)
  • 智能AI系统ChatGPT网站源码+支持OpenAI DALL-E3文生图+支持ai绘画(Midjourney)/支持GPT全模型+国内AI全模型
  • 腾讯云服务器可用区是什么意思?可用区选择方法
  • Jupyter运行显存爆炸,明明上一个单元格已经运行完毕为什么还是会炸?
  • 【ICE】webrtc lite 1:cmake构建
  • 国内最受欢迎电商API接口调用淘宝商品详情API接口数据
  • 第五篇 基于JSP 技术的网上购书系统——主页面和登录页面实现(网上商城、仿淘宝、当当、亚马逊)
  • 【 云原生 | K8S 】kubeadm 部署Kubernetes集群
  • 微信小程序rich-text 文本首行缩进和图片居中和富文本rich-text 解析多个空格不成功 nbsp
  • uniapp 设置重写uni-body-page样式,输入字母转大写,条形码扫描
  • 【uniapp/uview1.x】u-upload 在 v-for 中的使用时, before-upload 如何传参
  • 求组合数(笔记)
  • 《视觉SLAM十四讲》-- 后端 1(下)
  • io+day8
  • 【图像处理:OpenCV-Python基础操作】
  • Java 简单实现一个 TCP 回显服务器
  • 邦芒攻略:新手求职面试需要准备的材料
  • 在docker下安装suiteCRM
  • 【Python大数据笔记_day08_hive查询】
  • 魔众文库系统 v5.6.0 DWG文件格式支持,部分数据封面显示异常,定时调度清理临时文件
  • 2023 PostgreSQL 数据库生态大会:解读拓数派大数据计算系统及其云存储底座
  • Android10 手势导航
  • Pinia 插件 pinia-plugin-persist 添加 persist 属性时报错:没有与此调用匹配的重载
  • Django知识
  • vue2+antd——实现权限管理——js数据格式处理(回显+数据结构渲染)
  • ffmpeg 4.4 cenc-aes-ctr 加解密 MP4 工程性质分析