当前位置: 首页 > news >正文

Flink 侧输出流(SideOutput)

🌸在平时大部分的 DataStream API 的算子的输出是单一输出,也就是某一种或者说某一类数据流,流向相同的地方。

🌸在处理不同的流中,除了 split 算子,可以将一条流分成多条流,这些流的数据类型也都相同。ProcessFunction 的 side outputs 功能可以产生多条流,并且这些流的数据类型可以不一样。一个 side output 可以定义为 OutputTag[X]对象,X 是输出流的数据类型。process function 可以通过 Context 对象发射一个事件到一个或者多个 side outputs。

当使用旁路输出时,首先需要定义一个OutputTag来标识一个旁路输出流

val OutPut=OutputTag[String]("side-output")

注意:OutputTag是如何根据旁路输出流包含的元素类型typed的    

 ✨可以通过以下几种函数发射数据到旁路输出

        ProcessFunction

        CoProcessFunction

        ProcessWindowFunction

        ProcessAllWindowFunction

//将含有特殊字符串的流区分开,数据由两个定义好的工具类向Kafka灌入不同内容的数据,
//然后通过侧输出流(SideOutput)将不同的流进行分离,得到不同的输出import com.alibaba.fastjson.JSON
import com.tech.bean.Person_t
import com.tech.util.KafkaSourceUtil
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collectorobject sideOutputPerson_t {def main(args: Array[String]): Unit = {// UI地址访问:http://localhost:8081/#/job/runningval env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration())val ksu = new KafkaSourceUtil("person_t", "test-consumer-group")val dstream = env.addSource(ksu.getSouceInfo())// 首先需要定义一个OutputTag来标识一个旁路输出流val outputTag = new OutputTag[String]("person_t_side-output")val mainDataStream = dstream.map(line => {JSON.parseObject(line, classOf[Person_t])})val sideOutput = mainDataStream.process(new ProcessFunction[Person_t, String] {override def processElement(value: Person_t,ctx: ProcessFunction[Person_t, String]#Context,out: Collector[String]): Unit = {if (!value.getName.contains("_side")) {out.collect(value.toString)} else {// 测输出流输出的部分ctx.output(outputTag, "sideOutput-> 带有_side标识的数据名称" + value.getName)}}})val sideOutputStream: DataStream[String] = sideOutput.getSideOutput(outputTag)// 测输出流处理sideOutputStream.print("测输出流")// 常规数据处理sideOutput.print("常规数据")env.execute("outSideput")}
}

http://www.lryc.cn/news/304512.html

相关文章:

  • C语言中关于#include的一些小知识
  • DSP芯片 机器码下载方法 【主要 “扯” 用Uniflash下载的方法】
  • 速盾网络:CDN用几天关了可以吗?安全吗?
  • MR混合现实情景实训教学系统在高空作业课堂中的应用
  • Windows系统中定时执行python脚本
  • HashMap 源码学习-jdk1.8
  • WebStorm 2023:让您更接近理想的开发环境 mac/win版
  • java面试题:数字与字母的映射表
  • Jmeter教程-JMeter 环境安装及配置
  • 十大基础排序算法
  • IP协议及相关技术协议
  • 小红书x-s算法及补环境 单旋转验证码
  • 代码检测规范和git提交规范
  • Elasticsearch:什么是搜索引擎?
  • 人工智能几个关键节点:深蓝,AlphaGo,ChatGPT,Sora
  • WordPres Bricks Builder 前台RCE漏洞复现(CVE-2024-25600)
  • 代码随想录算法训练营总结 | 慢慢总结,想起啥就先写上
  • 基于开源模型对文本和音频进行情感分析
  • SQL中为什么不要使用1=1
  • python 几种常见的音频数据读取、保存方式
  • 关于msvcr120.dll丢失怎样修复的详细解决步骤方法分享,msvcr120.dll文件的相关内容
  • 简单几步通过DD工具把云服务器系统Linux改为windows
  • 使用 package.json 配置代理解决 React 项目中的跨域请求问题
  • 生成 Let‘s Encrypt 免费证书
  • int128的实现(基本完成)
  • 【linux】使用 acme.sh 实现了 acme 协议生成免费的SSL 证书
  • MACOS上面C/C++获取网卡索引,索引获取网卡接口名
  • 解决SSH远程登录开饭板出现密码错误问题
  • 什么时候用ref和reactive
  • Java实战:Spring Boot实现邮件发送服务