当前位置: 首页 > news >正文

Flink多流处理之join(关联)

Flink的API中只提供了join的算子,并没有left join或者right join,这里我们就介绍一下join算子的使用,其实join算子底层调用的就是coGroup,具体原理这里就不过多介绍了,如果感兴趣可以看我前面发布的文章Flink多流操作之coGroup.

  • 数据源
    ➜  ~ nc -lk 1111
    101,A
    102,B
    103,C
    104,D
    105,E
    106,F
    
    ➜  ~ nc -lk 2222
    101,A,,程序员
    102,B,,程序员
    103,C,,会计
    104,D,,安全工程师
    106,K,,程序员
    108,,本科,人事
    
  • 代码
    import org.apache.flink.api.common.functions.JoinFunction;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;/*** @Author: J* @Version: 1.0* @CreateTime: 2023/8/10* @Description: 多流操作-join**/
    public class FlinkJoin {public static void main(String[] args) throws Exception {// 构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置并行度env.setParallelism(3);// 数据源1,以socket作为数据源DataStreamSource<String> socketStream1 = env.socketTextStream("localhost", 1111);SingleOutputStreamOperator<String[]> mapStream1 = socketStream1.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 数据源2,以socket作为数据源DataStreamSource<String> socketStream2 = env.socketTextStream("localhost", 2222);SingleOutputStreamOperator<String[]> mapStream2 = socketStream2.map(str -> str.split(",")).returns(new TypeHint<String[]>() {});// 关联数据流DataStream<String> joinedStream = mapStream1.join(mapStream2).where(arr -> arr[0]) // mapStream1以数组中的第一个字段作为关联字段.equalTo(arr -> arr[0]) // mapStream2以数组中的第一个字段作为关联字段.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) // 以20秒作为一个窗口.apply(new JoinFunction<String[], String[], String>() {// 这里是写关联后的具体逻辑@Overridepublic String join(String[] first, String[] second) throws Exception {String result = first[0] + "," + second[1] + "," + second[2] + "," + second[3];return result;}});// 打印结果数据joinedStream.print();env.execute("Flink join");}
    }
    
  • 结果
    3> 103,C,男,会计
    2> 106,K,男,程序员
    2> 101,A,男,程序员
    3> 104,D,男,安全工程师
    3> 102,B,男,程序员
    
    这个API使用起来还是比较简单的,如果想实现left join或者right join的功能就需要通过coGroup来实现了.
http://www.lryc.cn/news/127198.html

相关文章:

  • LeetCode Top100 Liked 题单(序号34~51)
  • 视觉slam十四讲---第一弹三维空间刚体运动
  • 手把手教你配置Jenkins自动化邮件通知
  • Arcgis连续数据的分类(求不同值域的面积)
  • C++ 函数
  • 关于如何创建一个windows窗口的exe文件
  • re学习(33)攻防世界-secret-galaxy-300(动态调试)
  • springboot工程集成前端编译包,用于uni-app webView工程,解决其需独立部署带来的麻烦,场景如页面->画布->图片->pdf
  • NeuralNLP-NeuralClassifier的使用记录(二),训练预测自己的【中文文本多分类】
  • express学习笔记8 - 文件上传 下载以及预览
  • Python系统学习1-9-类(一)
  • 什么是公网、私网、内网、外网?
  • 一篇文章教会你搭建私人kindle图书馆,并内网穿透实现公网访问
  • 好用的安卓手机投屏到mac分享
  • df -h
  • 彻底卸载Android Studio
  • QT 5.12配置OpenCV3.4.10
  • Qt应用开发(基础篇)——选项卡窗口 QTabWidget
  • Socks5代理在多线程爬虫中的应用
  • 机器学习笔记:主动学习(Active Learning)初探
  • linux github 仓库管理常用操作
  • IT运维:使用数据分析平台监控深信服防火墙
  • 深入解析 Axios Blob 的使用方法及技巧
  • 爬虫逆向实战(十三)--某课网登录
  • 4.SpringCloud
  • OLED透明屏采购指南:如何选择高质量产品?
  • 机器学习编译系列
  • MySQL 数据库巡检系统的设计与应用
  • 工程项目管理系统源码+功能清单+项目模块+spring cloud +spring boot em
  • 前端笔试+面试分享