当前位置: 首页 > news >正文

Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class FlinkUnionStream {public static void main(String[] args) throws Exception {StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();streamExecutionEnvironment.setParallelism(1);SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment.socketTextStream("localhost", 1111).map(a -> Integer.parseInt(a));SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment.socketTextStream("localhost", 2222).map(a -> Integer.parseInt(a));DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));unionResult.print();streamExecutionEnvironment.execute();}
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

http://www.lryc.cn/news/246550.html

相关文章:

  • 工业园区重金属废水深度处理工程项目,稳定出水0.1mg/l
  • element table滚动条失效
  • 代码随想录算法训练营 ---第四十六天
  • MySQL-02-InnoDB存储引擎
  • Qt路径和Anaconda中QT路径冲突(ubuntu系统)
  • vue2.js添加水印
  • Eureka简单使用做微服务模块之间动态请求
  • 竞赛选题 题目:基于深度学习卷积神经网络的花卉识别 - 深度学习 机器视觉
  • css-tricks网站图例
  • Scrapy框架内置管道之图片视频和文件(一篇文章齐全)
  • Linux文件与路径
  • 【Qt】获取当前系统用户名:9种获取方式
  • ECMAScript2023你学习了吗?
  • 【从删库到跑路 | MySQL总结篇】数据库基础(增删改查的基本操作)
  • 【JMeter】配置元件
  • 数据采集静态存储SRAM芯片EMI7064
  • 网络运维与网络安全 学习笔记2023.11.27
  • ansible学习
  • 使用Kibana让es集群形象起来
  • 机器学习调参指南:提升模型性能的关键步骤
  • 图书管理系统源码,图书管理系统开发,图书借阅系统源码四TuShuManager应用程序MVC视图View
  • Visual Studio2010保姆式安装教程(VS2010 旗舰版),以及如何运行第一个C语言程序,超详细
  • 第四节HarmonyOS 熟知开发工具DevEco Studio
  • 安防视频监控/视频融合/云存储EasyCVR页面数据显示不全该如何解决?
  • vatee万腾的数字化奇点:Vatee科技的前沿创新之路
  • C#,《小白学程序》第六课:队列(Queue)其二,队列的应用,编写《实时叫号系统》
  • 打造数字人偶像的意义与影响
  • Spring加载Bean的多种方式
  • minio分布式存储系统
  • Kafka 如何保证消息消费的全局顺序性