当前位置: 首页 > news >正文

详解 Spark 核心编程之 RDD 分区器

一、RDD 分区器简介

  • Spark 分区器的父类是 Partitioner 抽象类
  • 分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数
  • 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
  • 每个 RDD 的分区索引的范围:0~(numPartitions - 1)

二、HashPartitioner

默认的分区器,对于给定的 key,计算其 hashCode 并除以分区个数取余获得数据所在的分区索引

class HashPartitioner(partitions: Int) extends Partitioner {require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case null => 0case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)}override def equals(other: Any): Boolean = other match {case h: HashPartitioner => h.numPartitions == numPartitionscase _ => false}override def hashCode: Int = numPartitions
}

三、RangePartitioner

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序

class RangePartitioner[K: Ordering: ClassTag, V](partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner {// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")private var ordering = implicitly[Ordering[K]]// An array of upper bounds for the first (partitions - 1) partitionsprivate var rangeBounds: Array[K] = {...}def numPartitions: Int = rangeBounds.length + 1private var binarySearch: ((Array[K], K) => Int) =  CollectionsUtils.makeBinarySearch[K]def getPartition(key: Any): Int = {val k = key.asInstanceOf[K]var partition = 0if (rangeBounds.length <= 128) {// If we have less than 128 partitions naive searchwhile(partition < rangeBounds.length && ordering.gt(k, rangeBounds(partition))) {partition += 1}} else {// Determine which binary search method to use only once.partition = binarySearch(rangeBounds, k)// binarySearch either returns the match location or -[insertion point]-1if (partition < 0) {partition = -partition-1}if (partition > rangeBounds.length) {partition = rangeBounds.length}}if (ascending) {partition} else {rangeBounds.length - partition}}override def equals(other: Any): Boolean = other match {...}override def hashCode(): Int = {...}@throws(classOf[IOException])private def writeObject(out: ObjectOutputStream): Unit =  Utils.tryOrIOException {...}@throws(classOf[IOException])private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {...}
}

四、自定义 Partitioner

/**1.继承 Partitioner 抽象类2.重写 numPartitions: Int 和 getPartition(key: Any): Int 方法
*/
object TestRDDPartitioner {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[*]").setAppName("partition")val sc = new SparkContext(conf)val rdd = sc.makeRDD(List(("nba", "xxxxxxxxxxx"),("cba", "xxxxxxxxxxx"),("nba", "xxxxxxxxxxx"),("ncaa", "xxxxxxxxxxx"),("cuba", "xxxxxxxxxxx")))val partRdd = rdd.partitionBy(new MyPartitioner)partRdd.saveAsTextFile("output")}
}class MyPartitioner extends Partitioner {// 重写返回分区数量的方法override def numPartitions: Int = 3// 重写根据数据的key返回数据所在的分区索引的方法override def getPartition(key: Any): Int = {key match {case "nba" => 0case "cba" => 1case _ => 2}}}
http://www.lryc.cn/news/360269.html

相关文章:

  • Selenium番外篇文本查找、元素高亮、截图、无头运行
  • Java 22的FFM API,比起Java 21的虚拟线程
  • 用c语言实现简易三子棋
  • 2024年华为OD机试真题-执行时长-Python-OD统一考试(C卷D卷)
  • 对未知程序所创建的 PDF 文档的折叠书签层级全展开导致丢签的一种解决方法
  • 计算机系统结构之FORK和JOIN
  • Yocto - virtual/kernel介绍
  • 如何在 DigitalOcean 云服务器上创建自定义品牌名称服务器
  • 心链6----开发主页以及后端数据插入(多线程并发)定时任务
  • 【Linux】日志管理
  • AI 绘画爆火背后:扩散模型原理及实现
  • 详解智慧互联网医院系统源码:开发医院小程序教学
  • 【技术实操】银河高级服务器操作系统实例分享,数据库日志文件属主不对问题分析
  • 函数的创建和调用
  • 数模混合芯片设计中的修调技术是什么?
  • MySQL 自定义函数(实验报告)
  • 一次职业院校漏洞挖掘
  • 洪师傅代驾系统开发 支持公众号H5小程序APP 后端Java源码
  • View->Bitmap缩放到自定义ViewGroup的任意区域(Matrix方式绘制Bitmap)
  • Centos 7部署NTP
  • 【前缀和】42. 接雨水
  • 我的名字叫大数据
  • 数据库漫谈-infomix
  • 【Qt】Qt界面美化指南:深入理解QSS样式表的应用与实践
  • 七彩云南文化旅游网站的设计
  • 7-zip安装教程
  • oracle 12c DB卸载流程
  • Docker学习笔记 - 创建自己的image
  • java web爬虫
  • MySQL开发教程和具体应用案例