当前位置: 首页 > news >正文

【实战-08】 flink自定义Map中的变量的行为

场景

自定义Map或者别的算子的时候,有时候需要定义一些类变量,在flink内部高并发的情况下需要正确理解这些变量的行为

代码

package com.pg.function;import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.ArrayList;public class FlinkFunction {//对于自定义函数中的变量,只有内置的状态是完全按照flink内置的 keyBy行为来的//如果是自定义的缓存比如ArrayList 则可能不会按照预期的行为public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStream<String> dataStream = env.fromElements( "b","b","b","c","c","c","d","d","d");dataStream.keyBy(x->{return x;}).map(new MyMap()).print();env.execute();}}class MyMap extends RichMapFunction<String, String> {public ArrayList<String> list= new ArrayList<>();
//     public ValueState<Integer> counter;//存储数据条数
//     public ValueState<String> element;//存储临时数据
//     @Override
//     public void open(Configuration parameters) throws Exception {
//         counter = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("counter", Types.INT));
//         element = getRuntimeContext().getState(new ValueStateDescriptor<>("element", Types.STRING));
//     }@Overridepublic String map(String s) throws Exception {list.add(s);if(list.size()==2){String re = list.toString();list.clear();return re;}else {return "null";}
//        if (counter.value() == null) {
//            counter.update(1);//遇见第一条数据的时候,计数器为1
//        } else {
//            counter.update(counter.value() + 1);
//        }
//        if (element.value() == null) {
//            element.update(s);//element只存储上一次到来的数据
//        }else {
//            element.update(element.value()+s);
//        }
//        if (counter.value() == 2) {
//            String re = element.value();
//            //发出结果之后清楚状态
//            counter.clear();
//            element.clear();
//            return re;
//        }else {
//            return "null";
//        }}
}

分析

keyBy之后,理论上相同key的会在map中用同样的处理逻辑,我们的预期行为是输出:bb,cc,dd
但是用ArrayList实现的逻辑最终输出却是:bb,bc,cc,dd
用ValueState的输出是:bb,cc,dd
这说明了,keBy后的逻辑,ArrayList不会按照预期的行为执行。这是因为在flink中,当多个并发的时候,多个key如果落入同一个线程
则当前线程的valueState是和某一个key绑定的,符合flink预期行为,但是ArrayList以及其它你定义的变量则不做保证, 它是线程级别的局部变量, 这点要注意。

http://www.lryc.cn/news/306842.html

相关文章:

  • Docker Volume
  • 开源计算机视觉库OpenCV常用的API介绍
  • pytorch -- torch.nn下的常用损失函数
  • daydayEXP: 支持自定义Poc文件的图形化漏洞利用工具
  • 无法访问云服务器上部署的Docker容器(二)
  • 在Pycharm中运行Django项目如何指定运行的端口
  • Android将 ViewBinding封装到BaseActivity基类中(Java版)
  • JSP实现数据传递与保存(一)
  • 【论文笔记之 YIN】YIN, a fundamental frequency estimator for speech and music
  • 水印相机小程序源码
  • NXP实战笔记(八):S32K3xx基于RTD-SDK在S32DS上配置LCU实现ABZ解码
  • 【深度好文】simhash文本去重流程
  • 主流的开发语言和开发环境介绍
  • List去重有几种方式
  • 使用C#+NPOI进行Excel处理,实现多个Excel文件的求和统计
  • 华清远见嵌入式学习——驱动开发——day9
  • formality:set_constant应用
  • sqllabs的order by注入
  • 《The Art of InnoDB》第二部分|第4章:深入结构-磁盘结构-redo log
  • 大模型安全相关论文
  • 回归预测 | Matlab实现PSO-BiLSTM-Attention粒子群算法优化双向长短期记忆神经网络融合注意力机制多变量回归预测
  • [算法沉淀记录] 排序算法 —— 堆排序
  • C++ //练习 9.33 在本节最后一个例子中,如果不将insert的结果赋予begin,将会发生什么?编写程序,去掉此赋值语句,验证你的答案。
  • [corCTF 2022] CoRJail: From Null Byte Overflow To Docker Escape
  • thinkphp6定时任务
  • 支持国密ssl的curl编译和测试验证(上)
  • 包装类详解
  • vue3与vue2的区别
  • SSL OV证书和DV、EV证书的区别
  • 一款.NET下 WPF UI框架介绍