当前位置: 首页 > news >正文

Flink多流处理之connect拼接流

Flink中的拼接流connect的使用其实非常简单,就是leftStream.connect(rightStream)的方式,但是有一点我们需要清楚,使用connect后并不是将两个流给串联起来了,而是将左流和右流建立一个联系,作为一个大的流,并且这个大的流可以使用相同的逻辑处理leftStreamrightStream,也可以使用不同的逻辑处理leftStreamrightStream.
如下图:
在这里插入图片描述

下面的演示代码也可以通过这个图结合来看,其实connect算子最主要的作用就是共享状态,如常用的广播状态.

  • 代码
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;import java.util.Arrays;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/7* @Description: 多流操作-流连接**/
public class FlinkConnect {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 添加数据源1DataStreamSource<String> sourceStream1 = env.fromCollection(Arrays.asList("a", "b", "c", "d"));// 添加数据源2DataStreamSource<Double> sourceStream2 = env.fromCollection(Arrays.asList(22.2, 11.0, 6.0, 98.0, 100.0));// 拼接数据流ConnectedStreams<String, Double> connectedStream = sourceStream1.connect(sourceStream2);// 这里使用map算子作为演示SingleOutputStreamOperator<String> resultStream = connectedStream.map(new CoMapFunction<String, Double, String>() {/*** map1作为左流**/@Overridepublic String map1(String value) throws Exception {return "字符串: " + value;}/*** map2作为右流**/@Overridepublic String map2(Double value) throws Exception {return "数字: " + (value * 100);}});// 打印结果resultStream.print();env.execute("Connect Operator");}
}
  • 结果
3> 字符串: b
1> 数字: 600.0
2> 字符串: a
3> 数字: 1100.0
2> 数字: 2220.0
2> 字符串: d
2> 数字: 9800.0
3> 数字: 10000.0
1> 字符串: c
http://www.lryc.cn/news/115080.html

相关文章:

  • 对任意类型数都可以排序的函数:qsort函数
  • vue数据更新table内容不更新解决方法
  • 合宙Air724UG LuatOS-Air script lib API--record
  • 基于Vgg16和Vgg19深度学习网络的步态识别系统matlab仿真
  • Java分布式微服务3——Docker
  • js字符串替换
  • 网络防御(2)
  • [RCTF2019]DontEatMe
  • 6. CSS(三)
  • 计算机网络—HTTP
  • Tomcat线程池原理
  • 踩坑 视觉SLAM 十四讲第二版 ch13 编译及运行问题
  • 【设计模式】-装饰器模式
  • 七月学习总结
  • Camunda 7.x 系列【6】Spring Boot 集成 Camunda 7.19
  • Kubernetes —调度器配置
  • 【微信小程序】申请蓝牙、位置和数据库等相关权限
  • ORB-SLAM2学习笔记6之D435i双目IR相机运行ROS版ORB-SLAM2并发布位姿pose的rostopic
  • 【数据结构与算法——TypeScript】哈希表
  • JavaScript 中常用简写语法技巧总结
  • 漫画算法做题笔记
  • JDBC学习笔记
  • 亚信科技AntDB数据库与库瀚存储方案完成兼容性互认证,联合方案带来约20%性能提升
  • 【MySQL】基础知识(一)
  • Ansible专栏目录
  • 【locust】使用locust + boomer实现对接口的压测
  • 亿欧智库:2023中国宠物行业新趋势洞察报告(附下载)
  • 时序数据库 TDengine 与 WhaleStudio 完成相互兼容性测试认证
  • Spring-1-深入理解Spring XML中的依赖注入(DI):简化Java应用程序开发
  • 负载均衡–HAProxy安装及搭建tidb数据库负载服务