当前位置: 首页 > news >正文

【大数据学习 | Spark-Core】Spark中的join原理

join是两个结果集之间的链接,需要进行数据的匹配。

演示一下join是否存在shuffle。

1. 如果两个rdd没有分区器,分区个数一致

,会发生shuffle。但分区数量不变。

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala> val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res116: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[108] at makeRDD at <console>:27scala> sc.makeRDD(arr1,3)
res117: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[109] at makeRDD at <console>:27scala> res116 join res117
res118: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[112] at join at <console>:28scala> res118.collect
res119: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))

2. 如果分区个数不一致,有shuffle,且产生的rdd的分区个数以多的为主。

3. 如果分区个数一样并且分区器一样,那么是没有shuffle的

scala> sc.makeRDD(arr,3)
res128: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[118] at makeRDD at <console>:27scala> sc.makeRDD(arr1,3)
res129: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[119] at makeRDD at <console>:27scala> res128.reduceByKey(_+_)
res130: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[120] at reduceByKey at <console>:26scala> res129.reduceByKey(_+_)
res131: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[121] at reduceByKey at <console>:26scala> res130 join res131
res132: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[124] at join at <console>:28scala> res132.collect
res133: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala> res132.partitions.size
res134: Int = 3

4. 都存在分区器但是分区个数不同,也会存在shuffle

scala> val arr = Array(("zhangsan",300),("lisi",400),("wangwu",350),("zhaosi",450))
arr: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala>  val arr1 = Array(("zhangsan",22),("lisi",24),("wangwu",30),("guangkun",5))
arr1: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res0: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at <console>:27scala> sc.makeRDD(arr1,4)
res1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[1] at makeRDD at <console>:27scala> res0.reduceByKey(_+_)
res2: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:26scala> res1.reduceByKey(_+_)
res3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[3] at reduceByKey at <console>:26scala> res2 join res3
res4: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[6] at join at <console>:28scala> res4.collect
res5: Array[(String, (Int, Int))] = Array((zhangsan,(300,22)), (wangwu,(350,30)), (lisi,(400,24)))scala> res4.partitions.size
res6: Int = 4

这里为啥stage3里reduceByKey和join过程是连在一起的,因为分区多的RDD是不需要进行shuffle的,数据该在哪个分区就在哪个分区,反而是分区少的RDD要进行join,要进行数据的打散。

分区以多的为主。

5. 一个带有分区器一个没有分区器,那么以带有分区器的rdd分区数量为主,并且存在shuffle

scala> arr
res7: Array[(String, Int)] = Array((zhangsan,300), (lisi,400), (wangwu,350), (zhaosi,450))scala> arr1
res8: Array[(String, Int)] = Array((zhangsan,22), (lisi,24), (wangwu,30), (guangkun,5))scala> sc.makeRDD(arr,3)
res9: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at <console>:27scala> sc.makeRDD(arr,4)
res10: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[8] at makeRDD at <console>:27scala> res9.reduceByKey(_+_)
res11: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:26scala> res10 join res11
res12: org.apache.spark.rdd.RDD[(String, (Int, Int))] = MapPartitionsRDD[12] at join at <console>:28scala> res12.partitions.size
res13: Int = 3scala> res12.collect
res14: Array[(String, (Int, Int))] = Array((zhangsan,(300,300)), (wangwu,(350,350)), (lisi,(400,400)), (zhaosi,(450,450)))

同理,stage6的reduceByKey过程和join过程是连在一起的,是因为有分区器的RDD并不需要进行shuffle操作,原来的数据该在哪在哪,而没有分区器的RDD要进行join要进行数据的打散,有shuffle过程,所以有stage4到stage6的连线。

http://www.lryc.cn/news/491687.html

相关文章:

  • 【代码pycharm】动手学深度学习v2-08 线性回归 + 基础优化算法
  • 李宏毅机器学习课程知识点摘要(1-5集)
  • React(五)——useContecxt/Reducer/useCallback/useRef/React.memo/useMemo
  • UE5时间轴节点及其设置
  • git 命令之只提交文件的部分更改
  • 算法 差分修改 极简
  • pcb元器件选型与焊接测试时的一些个人经验
  • OSG开发笔记(三十三):同时观察物体不同角度的多视图从相机技术
  • 模糊逻辑学习 | 模糊推理 | 模糊逻辑控制
  • 【JavaEE】Servlet:表白墙
  • C++特殊类设计(不能被拷贝的类、只能在堆上创建对象的类、不能被继承的类、单例模式)
  • 【小白学机器学习34】用python进行基础的数据统计 mean,var,std,median,mode ,四分位数等
  • 安装 Docker(使用国内源)
  • Ajax学习笔记,第一节:语法基础
  • 《用Python画蔡徐坤:艺术与编程的结合》
  • Unity中动态生成贴图并保存成png图片实现
  • Mac配置maven环境及在IDEA中配置Maven
  • Reactor 模式的理论与实践
  • vim 一次注释多行 的几种方法
  • 问题记录-Java后端
  • 李春葆《数据结构》-课后习题代码题
  • 51c~C语言~合集2
  • 【Python】构建事件驱动架构:用Python实现实时应用的高效系统
  • Git(一)基本使用
  • HarmonyOS应用开发者基础认证,Next版本发布后最新题库(10月8日更新题库未收录)
  • 【PGCCC】Postgresql BRIN 索引原理
  • 腾讯云 AI 代码助手:产品研发过程的思考和方法论
  • Matlab 深度学习 PINN测试与学习
  • 【Angular】async详解
  • 抖音SEO矩阵系统:开发技术分享