当前位置: 首页 > news >正文

flink的KeyedBroadcastProcessFunction测试

背景

我们经常需要对KeyedBroadcastProcessFunction函数进行单元测试,以确保上线之前这个函数的功能是正常的,包括里面的广播状态和键值分区状态

测试KeyedBroadcastProcessFunction类

    @Testpublic void testHarnessForKeyedBroadcastProcessFunction() throws Exception {KeyedBroadcastProcessFunction<String, String, String, String> function = new MyKeyedBroadcastProcessFunction();// 键值分区状态final ValueStateDescriptor<String> valueStateDescriptor =new ValueStateDescriptor<>("item", BasicTypeInfo.STRING_TYPE_INFO);// 广播状态final MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);KeyedBroadcastOperatorTestHarness<String, String, String, String> harness =ProcessFunctionTestHarnesses.forKeyedBroadcastProcessFunction(function, x -> x,TypeInformation.of(String.class), ruleStateDescriptor);harness.processBroadcastElement("0", 1);harness.processBroadcastElement("000", 2);harness.processElement("1", 10);// 判断键值分区状态(注意这里最好就只是某个key下面,也就是分组key直接设置为x->"固定常数值"即可)ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);Assert.assertEquals(valueState.value(), "1");// 判断广播状态BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);Assert.assertTrue(broadcastState.contains("0"));Assert.assertTrue(broadcastState.contains("000"));// 判断输出的列表Assert.assertEquals(harness.extractOutputValues(), Arrays.asList("0", "000", "1"));}

关键代码:
1.获取键值分区状态

ValueState<String> valueState = function.getRuntimeContext().getState(valueStateDescriptor);

2.获取广播状态:

BroadcastState<String, String> broadcastState = harness.getBroadcastState(ruleStateDescriptor);

3.工具类

public class ProcessFunctionTestHarnesses {public ProcessFunctionTestHarnesses() {}public static <IN, OUT> OneInputStreamOperatorTestHarness<IN, OUT> forProcessFunction(ProcessFunction<IN, OUT> function) throws Exception {OneInputStreamOperatorTestHarness<IN, OUT> testHarness = new OneInputStreamOperatorTestHarness(new ProcessOperator((ProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.setup();testHarness.open();return testHarness;}public static <K, IN, OUT> KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> forKeyedProcessFunction(KeyedProcessFunction<K, IN, OUT> function, KeySelector<IN, K> keySelector, TypeInformation<K> keyType) throws Exception {KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> testHarness = new KeyedOneInputStreamOperatorTestHarness(new KeyedProcessOperator((KeyedProcessFunction)Preconditions.checkNotNull(function)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> forCoProcessFunction(CoProcessFunction<IN1, IN2, OUT> function) throws Exception {TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> testHarness = new TwoInputStreamOperatorTestHarness(new CoProcessOperator((CoProcessFunction)Preconditions.checkNotNull(function)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> forKeyedCoProcessFunction(KeyedCoProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector1, KeySelector<IN2, K> keySelector2, TypeInformation<K> keyType) throws Exception {KeyedTwoInputStreamOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedTwoInputStreamOperatorTestHarness(new KeyedCoProcessOperator((KeyedCoProcessFunction)Preconditions.checkNotNull(function)), keySelector1, keySelector2, keyType, 1, 1, 0);testHarness.open();return testHarness;}public static <IN1, IN2, OUT> BroadcastOperatorTestHarness<IN1, IN2, OUT> forBroadcastProcessFunction(BroadcastProcessFunction<IN1, IN2, OUT> function, MapStateDescriptor<?, ?>... descriptors) throws Exception {BroadcastOperatorTestHarness<IN1, IN2, OUT> testHarness = new BroadcastOperatorTestHarness(new CoBroadcastWithNonKeyedOperator((BroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), 1, 1, 0);testHarness.open();return testHarness;}public static <K, IN1, IN2, OUT> KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> forKeyedBroadcastProcessFunction(KeyedBroadcastProcessFunction<K, IN1, IN2, OUT> function, KeySelector<IN1, K> keySelector, TypeInformation<K> keyType, MapStateDescriptor<?, ?>... descriptors) throws Exception {KeyedBroadcastOperatorTestHarness<K, IN1, IN2, OUT> testHarness = new KeyedBroadcastOperatorTestHarness(new CoBroadcastWithKeyedOperator((KeyedBroadcastProcessFunction)Preconditions.checkNotNull(function), Arrays.asList(descriptors)), keySelector, keyType, 1, 1, 0);testHarness.open();return testHarness;}
}
http://www.lryc.cn/news/228528.html

相关文章:

  • 【pytorch深度学习】torch-张量Tensor
  • odoo16前端框架源码阅读——rpc_service.js
  • Nat. Med. | 成年人的城市生活环境对心理健康的影响
  • stm32 WIFI模块_8266使用
  • 【C/C++】malloc 或者 new 动态分配内存
  • 如果让你重新开始学 C/C++,你的学习路线会是怎么选择?
  • PCL安装与使用
  • 力扣刷题-二叉树-对称二叉树
  • 常见面试题-计算机网络相关
  • leetcode做题笔记231. 2 的幂
  • AI主播“败走”双11,想用AI省成本的商家醒醒吧,程序员不必担心失业,发展空间依旧很大
  • ◢Django 自写分页与使用
  • 某城高速综合管控大数据大屏可视化【可视化项目案例-04】
  • 如何在Linux下进行文件查看
  • OSG练习:模仿Ventsim制作三维矿井智能通风系统
  • 【数据结构】非递归实现二叉树的前 + 中 + 后 + 层序遍历(听说面试会考?)
  • 32 Feign性能优化
  • 星岛专栏|从Web3发展看金融与科技的融合之道
  • 什么是网络爬虫?
  • 酷柚易汛ERP - 商品库存余额表操作指南
  • 第27期 | GPTSecurity周报
  • 大数据-玩转数据-Flume
  • 【Linux】进程概念IV 进程地址空间
  • Flink在汽车行业的应用【面试加分系列】
  • 智慧工地源码:助力数字建造、智慧建造、安全建造、绿色建造
  • Spring Boot(二)
  • 上海亚商投顾:沪指缩量调整跌 高位强势股继续退潮
  • 药理学试卷
  • SpringBoot3-快速入门
  • 具名挂载和匿名挂载