当前位置: 首页 > news >正文

Flink的算子列表状态的使用

背景

算子的列表状态是平时比较常见的一种状态,本文通过官方的例子来看一下怎么使用算子列表状态

算子列表状态

算子列表状态支持应用的并行度扩缩容,如下所示:
在这里插入图片描述
使用方法参见官方示例,我加了几个注解:

public class BufferingSinkimplements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction {//要实现CheckpointedFunction接口private final int threshold;//算子操作状态对象--算子级别的private transient ListState<Tuple2<String, Integer>> checkpointedState;//本地变量,保存这个算子任务的本地变量--任务级别的 private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}//invoke方法中一般都是操作本地变量bufferedElements,不会直接操作算子列表状态@Overridepublic void invoke(Tuple2<String, Integer> value, Context contex) throws Exception {bufferedElements.add(value);if (bufferedElements.size() >= threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {// 把本地变量的值设置到算子列表状态中,算子列表状态会自动会被持久化checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {ListStateDescriptor<Tuple2<String, Integer>> descriptor =new ListStateDescriptor<>("buffered-elements",TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));// 定义算子列表状态checkpointedState = context.getOperatorStateStore().getListState(descriptor);if (context.isRestored()) {// 算子列表状态的值设置到本地变量中for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}
}
http://www.lryc.cn/news/196887.html

相关文章:

  • 使用 Github Actions 工作流自动部署 Github Pages
  • Xposed hook 抖音账户信息
  • 回顾 | E³CI效能认知与改进论坛,助力企业研发效能度量和提升
  • 科技的成就(五十二)
  • 【23种设计模式】装饰器模式
  • 解决IDEA中SpringBoot项目创建多个子模块时配置文件小绿叶图标异常问题
  • 【马蹄集】—— 概率论专题
  • Spring 6整合单元测试JUnit4和JUnit5
  • 【好书推荐】深入理解现代JavaScript
  • 高效协同: 打造分布式系统的三种模式
  • 机器学习-无监督学习之聚类
  • 智能垃圾桶丨悦享便捷生活
  • 【数据结构】线性表(一)线性表的定义及其基本操作(顺序表插入、删除、查找、修改)
  • MyBatis的自定义插件
  • 生物制剂\化工\化妆品等质检损耗、制造误差处理作业流程图(ODOO15/16)
  • vbv介绍
  • Linux CentOS 8(网卡的配置与管理)
  • python -m pip install 和 pip install 的区别解析
  • 深度解读js中数组的findIndex方法
  • ICML2021 | RSD: 一种基于几何距离的可迁移回归表征学习方法
  • 中国人民大学与加拿大女王大学金融硕士:在该奋斗的岁月里,对得起每一寸光阴
  • Python基础教程:装饰器的详细教程
  • Apache poi xwpf word转PDF中文显示问题解决
  • Gartner发布2024年十大战略技术趋势
  • 在UniApp中使用uni.makePhoneCall方法调起电话拨打功能
  • 苹果手机怎么刷机?掌握好这个方法!
  • 最新ai创作系统CHATGPT系统源码+支持GPT4.0+支持ai绘画(Midjourney)
  • 代码随想录算法训练营Day56|动态规划14
  • VsCode通过Git History插件查看某个页面的版本修改记录
  • 事件循环(渡一)