当前位置: 首页 > news >正文

Flink之KeyedState

前面的文章中介绍过Operator State,这里介绍一下Keyed State.
在使用Operator State时必须要实现CheckpointFunction接口,而Keyed State则不需要,在使用keyBy(...)分组分组后,调用的函数必须是实现RichFuntion接口的函数才可以使用Keyed State.同样使用Keyed State也必须开启Checkpoint.

  • 需求
    将接收到的Socket数据源中的字符串进行拼接
    在命令行开启socket命令:
    nc -lk 8888
    
  • 业务代码
    public class FlinkKeyedState {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度为1,便于观察env.setParallelism(1);// 开启Checkpoint, 8秒一个周期并开启一次性语义env.enableCheckpointing(8000, CheckpointingMode.EXACTLY_ONCE);// 指定checkpoint持久化路径env.getCheckpointConfig().setCheckpointStorage("file:///Users/xxx/data/testData/checkpoint");// 开启Task级别故障自动failover,通过fixedDelayRestart设置Task重启上限和重启间隔,这里设置的重启次数为2次,一旦Task重启次数超过这个次数,整个job也会停止env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(5)));// 获取Socket数据源DataStreamSource<String> socketSource = env.socketTextStream("localhost", 8888);// 将数据进行分组,将分组key给一个常量值SingleOutputStreamOperator<String> map = socketSource.keyBy(s -> "1")// 使用Keyed State的算子必须实现RichFunction接口,如RichMapFunction,ProcessFunction等.map(new RichMapFunction<String, String>() {ListState<String> listState;// open方法可以理解为和Operator State中的initializeState方法一样,需要在这个方法中构造和获取状态存储器@Overridepublic void open(Configuration parameters) throws Exception {// 获取上下文RuntimeContext ctx = getRuntimeContext();// 获取ListState,不同于Operator State的是在这里有更多的选择,如ListState,MapState等listState = ctx.getListState(new ListStateDescriptor<>("demo", String.class));}// 在map方法中正常编写业务逻辑@Overridepublic String map(String s) throws Exception {// 模拟Task失败if (s.equals("k") && RandomUtils.nextInt(0, 5) == 3) {throw new Exception("Task 异常");}// 将数据添加到状态存储器中listState.add(s);Iterable<String> strings = listState.get();StringBuilder builder = new StringBuilder();for (String string : strings) {builder.append(string);}return builder.toString();}});map.print();env.execute("Keyed State");}
    }
    
    API的使用大概就这些内容,不过在使用Keyed Sate时首先要对keyBy的特性有所了解,才能得到最终想要的结果数据,如使用keyBy时上下游之间的数据分发模式、所设置的默认并行度上下游算子的并行度是否一致等问题,这些都是需要注意的,然后根据实际业务需求开发对应的逻辑就可以了.
http://www.lryc.cn/news/235988.html

相关文章:

  • c语言:模拟实现qsort函数
  • 从0开始学习数据结构 C语言实现 1.前篇及二分查找算法
  • VSCode 使用CMakePreset找不到cl.exe编译器的问题
  • 【Linux系统化学习】进程的状态 | 僵尸进程 | 孤儿进程
  • 深信服AC流量管理技术
  • 二元关系及关系代数中的象集、除运算
  • [PHP]关联和操作MySQL数据库然后将数据库部署到ECS
  • 23.11.19日总结
  • 系列一、JVM概述
  • milvus数据管理-压缩数据
  • SpringBoot项目连接linux服务器数据库两种解决方法(linux直接开放端口访问本机通过SSH协议访问,以mysql为例)
  • 【Rust】快速教程——闭包与生命周期
  • redis高级案列case
  • Vue3+Vite实现工程化,attribute属性渲染v-bind指令
  • 下一代搜索引擎会什么?
  • WPF中如何在MVVM模式下关闭窗口
  • 【数据结构&C++】二叉平衡搜索树-AVL树(25)
  • Python算法——树的最大深度和最小深度
  • 46.全排列-py
  • 系列三、GC垃圾回收算法和垃圾收集器的关系?分别是什么请你谈谈
  • WPF中的虚拟化是什么
  • 免费稳定几乎无门槛,我的ChartGPT助手免费分享给你
  • 奇瑞金融:汽车金融行业架构设计
  • milvus数据库分区管理
  • pytorch.nn.Conv1d详解
  • 大数据HCIE成神之路之数学(2)——线性代数
  • 音视频学习(十八)——使用ffmepg实现视音频解码
  • nginx的GeoIP模块
  • mac控制台命令小技巧
  • Postman:API测试之Postman使用完全指南