当前位置: 首页 > news >正文

flink1.18 广播流 The Broadcast State Pattern 官方案例scala版本

对应官网

https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/

测试数据

 * 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则

完整scala版本代码

package com.yy.state.operatorStateDemoimport org.apache.flink.api.common.state.MapStateDescriptor
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeHint, TypeInformation}
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.typeutils.ListTypeInfo
import org.apache.flink.streaming.api.datastream.KeyedStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment
import org.apache.flink.util.Collectorimport java.time.ZoneId
import scala.collection.JavaConverters.iterableAsScalaIterableConverter
import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._/*** 广播流 官方案例 scala版本* 广播状态*    https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/* 事件流:*    red,4side*    red,5side*    red,4side*    red,1side*    red,4side** 规则流:*    rule1,4side,1side* 广播规则流,使用mapstate存储每种规则和对应的事件流数据 eg: {rule1 -> [4side,4side]} 遇到1side到来,则全部输出.* map可存储多个规则*/
object BroadcastStateV1 {case class Item(color:Color,shape: Shape){def getShape()={shape}}case class Rule(name:String,first:Shape,second:Shape)case class Color(color:String)case class Shape(shape:String)def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentval tEnv = StreamTableEnvironment.create(env)// 指定国内时区tEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))val itemStream = env.socketTextStream("localhost", 9999).map(_.split(",")).map(arr => Item(Color(arr(0)), Shape(arr(1))))val ruleStream = env.socketTextStream("localhost", 9998).broadcast().map(s => Rule(s.split(",")(0), Shape(s.split(",")(1)), Shape(s.split(",")(2))))val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule](){}));val ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor)val colorPartitionedStream: KeyedStream[Item, Color] = itemStream.keyBy(new KeySelector[Item, Color] {override def getKey(value: Item): Color = value.color})colorPartitionedStream.connect(ruleBroadcastStream).process(// type arguments in our KeyedBroadcastProcessFunction represent://   1. the key of the keyed stream//   2. the type of elements in the non-broadcast side//   3. the type of elements in the broadcast side//   4. the type of the result, here a stringnew KeyedBroadcastProcessFunction[Color, Item, Rule, String]() {val  mapStateDesc =new MapStateDescriptor("items",BasicTypeInfo.STRING_TYPE_INFO,new ListTypeInfo(classOf[Item]))val ruleStateDescriptor =new MapStateDescriptor("RulesBroadcastState",BasicTypeInfo.STRING_TYPE_INFO,TypeInformation.of(new TypeHint[Rule]() {}))override def processElement(value: Item, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#ReadOnlyContext, out: Collector[String]): Unit = {val state = getRuntimeContext().getMapState(mapStateDesc)val shape = value.getShape()// 遍历广播的 rulectx.getBroadcastState(ruleStateDescriptor).immutableEntries().asScala.foreach{entry =>val ruleName = entry.getKey()val rule = entry.getValue()val stored: ListBuffer[Item] = {if (state.contains(ruleName)) {state.get(ruleName).asScala.to[ListBuffer]} else {new ListBuffer[Item]()}}//if (shape == rule.second && stored.nonEmpty) {stored.foreach { i =>out.collect("MATCH: " + i + " - " + value);}stored.clear();}// there is no else{} to cover if rule.first == rule.secondif (shape.equals(rule.first)) {stored.append(value);}if (stored.isEmpty) {// 规则已经匹配输出 清理状态state.remove(ruleName)} else {// 没输出则更新状态state.put(ruleName, stored.asJava)}}}override def processBroadcastElement(value: Rule, ctx: KeyedBroadcastProcessFunction[Color, Item, Rule, String]#Context, out: Collector[String]): Unit = {ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);}}).print("sink --> ")env.execute("flink-broadcast-state")}
}

http://www.lryc.cn/news/287988.html

相关文章:

  • vueRouter中scrollBehavior实现滚动固定位置
  • 解决WinForms跨线程操作控件的问题
  • 从零开始:Git 上传与使用指南
  • Docker compose部署Golang服务
  • Day36 435无重叠区间 763划分字母区间
  • 【Servlet】如何编写第一个Servlet程序
  • 读懂比特币—bitcoin代码分析(五)
  • uniapp使用uQRCode插件生成二维码的简单使用
  • 【寒假每日一题·2024】AcWing 4965. 三国游戏(补)
  • docker 安装mongodb 数据库
  • 整数反转算法(leetcode第7题)
  • 微信小程序(十)表单组件(入门)
  • xcode 设置 ios苹果图标,为Flutter应用程序配置iOS图标
  • 什么是IDE?新手用哪个IDE比较好?
  • 【数据库学习】pg安装与运维
  • 第二篇【传奇开心果短博文系列】Python的OpenCV库技术点案例示例:图像处理
  • 【vue oidc-client】invalid_requestRequest Id: 0HN0OOPFRLSF2:00000002
  • 什么是中间人攻击? ssh 连接出现 Host key verification failed 解决方法
  • 数据结构系统刷题
  • 【RabbitMQ】延迟队列之死信交换机
  • 2024交通运输工程与土木建筑工程国际会议(ICTECCE2024)
  • 搜索引擎Elasticsearch了解
  • 【操作系统基础】【CPU访存原理】:寄存 缓存 内存 外存、内存空间分区、虚拟地址转换、虚拟地址的映射
  • windows下git pull超时,ping不通github
  • springboot快速写接口
  • 数据结构排序算详解(动态图+代码描述)
  • 2024-01-25 力扣高频SQL50题目1174. 即时食物配送
  • java web 校园健康管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目
  • 回归预测 | Matlab基于SSA-SVR麻雀算法优化支持向量机的数据多输入单输出回归预测
  • Java转成m3u8,hls格式