当前位置: 首页 > news >正文

Flink之Partitioner(分区规则)

Flink之Partitioner(分区规则)

方法注释
global()全部发往1个task
broadcast()广播(前面的文章讲解过,这里不做阐述)
forward()上下游并行度一致时一对一发送,和同一个算子连中算子的OneToOne是一回事
shuffle()随机分配(只是随机,同Spark的shuffle不同)
rebalance()轮询分配,默认机制就是rebalance()
recale()一般是下游task是上游task的并行度的倍数时,在生成job时,会将下游中的某几个subtask和上游的某个subtask绑成一组,然后在组内上游subtask以轮询的方式将数据发送给下游的subtask.
partitionCustom自定义分区器(这里不做演示,后续会单独写一个自定义分区器的内容)
keyBy()根据数据key的HashCode进行Hash分配
  • global

    global在实际业务场景中使用的不是很多,一般都是需要全局数据汇总的时候才会用到.global就是将上游的数据全部发往下游的第一个subtask中,也就是说下游设置再多的并行度是没意义的,所以使用global的时候,下游的task的并行度都是1.
    在这里插入图片描述
    这里结合代码看一下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置并行度为3,且设置数据分区方式为globalDataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).global();// 切分字符串,设置并行度为1SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(1);//......env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看代码中upperCaseMapStreamsplitFlatMapStream之间数据的发送方式
    在这里插入图片描述

  • forward

    forward其实就是一对一发送数据,和之前讲解Task的文章中提到的算子之间OneToOne的模式是一样的,就是可以将forward理解为同一个task chain[算子链]中算子之间的数据传输方式,但是使用forward的前提是上下游的算子并行度是一致的也就是上下游的subtask数量保持一致,图解如下:
    在这里插入图片描述

    代码内容如下:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为forward分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).forward();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(3).startNewChain(); // 这里加上.startNewChain是为了在WebUI能看到效果,因为upperCaseMapStream和splitFlatMapStream的并行度是一致的,不加startNewChain默认的机制会将两者划分到同一个算子链中,就看不到实际的效果了.// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • shuffle

    通过前面WebUI的图片我们可以看到,从Socket数据源将数据发送到第一个map的时候使用的是默认的rebalance方式,也就是轮询发送的方式,而这里说的shuffle虽然也是一对多的发送方式,但是发送数据时是随机的,举个例子,上游有3subtask,下游有5subtask,数据源有5条数据,上游中的某一个subtask向下游发送数据时,是随机发送的,下游的5subtask并不是每个都一定能接受到数据,可能有的接收到1条,有的接收到2条,有的接收到3条数据,这就是shuffle发送数据的方式.

    如果说上两个operator并行度一致,上游选择了shuffle发送数据的方式,那么两个operator会绑定成一个task chain么?不会,因为shuffle的数据发送方式就已经导致两个operator不是OneToOne的模式了.
    在这里插入图片描述
    代码示例:

    public class FlinkPartitioner {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();conf.setInteger("rest.port", 8081);// 开启本地WebUI,构建流环境StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);// 添加数据源,SocketDataStreamSource<String> sourceStream = env.socketTextStream("localhost", 9999);// 转大写,设置为shuffle分区方式DataStream<String> upperCaseMapStream = sourceStream.map(s -> s.toUpperCase()).setParallelism(3).shuffle();// 切分字符串SingleOutputStreamOperator<String> splitFlatMapStream = upperCaseMapStream.flatMap(new FlatMapFunction<String, String>() {@Overridepublic void flatMap(String value, Collector<String> out) throws Exception {String[] split = value.split(",");for (String s : split) {out.collect(s);}}}).setParallelism(7)// ...env.execute("Flink Partitioner");}
    }
    

    WebUI界面查看upperCaseMapStreamsplitFlatMapStream的数据发送方式,如下:
    在这里插入图片描述

  • Rebalance

    rebalance就是Flink默认的数据分发机制,直白的讲就是给每个小朋友一人一块糖果,直到发完为止,不偏不倚,这个不细说了,没什么可讲的.
    在这里插入图片描述

  • recale

    关于recale前面说到了是组内的方式进行轮询分发数据,这里就以图解的方式进行讲解,便于理解.

    Flink任务启动时,如果发现上下游中使用了recale分发数据的方式就会将上下游的subtask进行分组绑定,如上游有2个subtask,下游有四个subtask,就会将上游的一个subtask和下游的两个subtask进行绑定,如下图:
    在这里插入图片描述

    当上下游对应的subtask分组后,上下游组内的subtak就会以组内轮询的方式发送数据,如下图:
    在这里插入图片描述

  • keyBy

    keyBy使用的HASH分区方式,实际是hashCode() + murmurHash()的组合方式,这个在源码的KeyGroupRangeAssignment类中是可以看到的,简单来说根据keyhash值模除以下游的最大并行度(return MathUtils.murmurHash(keyHash) % maxParallelism;).

    关于keyBy的使用应该都很熟悉了,这里直接给大家看演示结果吧,如下图:
    在这里插入图片描述

以上就是对Flink中分区规则的讲解.

http://www.lryc.cn/news/126559.html

相关文章:

  • tk切换到mac的code分享
  • spark的standalone 分布式搭建
  • 浅析基于视频汇聚与AI智能分析的新零售方案设计
  • SpringMVC之异常处理
  • 保险龙头科技进化论:太保的六年
  • 升级STM32电机PID速度闭环编程:从F1到F4的移植技巧与实例解析
  • GaussDB 实验篇+openGauss的4种1级分区案例
  • Ruby软件外包开发语言特点
  • 《系统架构设计师教程》重点章节思维导图
  • mac录屏工具,录屏没有声音的解决办法
  • 神经网络基础-神经网络补充概念-33-偏差与方差
  • 单片机第一季:零基础13——AD和DA转换
  • 小区外卖跑腿,解决最后100米配送难题
  • ZooKeeper的应用场景(命名服务、分布式协调通知)
  • 网络套接字
  • 对话 4EVERLAND:Web3 是云计算的新基建吗?
  • iOS申请证书(.p12)和描述文件(.mobileprovision)
  • Java:PO、VO、BO、DO、DAO、DTO、POJO
  • c语言每日一练(8)
  • 周期 角频率 频率 振幅 初相角
  • 根据一棵树的两种遍历构造二叉树
  • stack 、 queue的语法使用及底层实现以及deque的介绍【C++】
  • 没学C++,如何从C语言丝滑过度到python【python基础万字详解】
  • haproxy负载均衡
  • 【数据结构】顺序队列模拟实现
  • TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka
  • Spring对象装配
  • bigemap如何添加mapbox地图?
  • python爬虫6:lxml库
  • Linux查找命令