当前位置: 首页 > news >正文

Spark常见算子汇总

创建RDD

在Spark中创建RDD的方式分为三种:

  1. 从外部存储创建RDD
  2. 从集合中创建RDD
  3. 从其他RDD创建

textfile

调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD

parallelize

调用SparkContext 的 parallelize()方法,将一个存在的集合,变成一个RDD

makeRDD

方法一

/** Distribute a local Scala collection to form an RDD.** This method is identical to `parallelize`.*/def makeRDD[T: ClassTag](seq: Seq[T],numSlices: Int = defaultParallelism): RDD[T] = withScope {parallelize(seq, numSlices)}

方法二:分配一个本地Scala集合形成一个RDD,为每个集合对象创建一个最佳分区。

/*** Distribute a local Scala collection to form an RDD, with one or more* location preferences (hostnames of Spark nodes) for each object.* Create a new partition for each collection item.*/def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope {assertNotStopped()val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMapnew ParallelCollectionRDD[T](this, seq.map(_._1), math.max(seq.size, 1), indexToPrefs)}

 举例

scala> val rdd = sc.parallelize(1 to 6, 2)
val rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:1scala> rdd.collect()
val res4: Array[Int] = Array(1, 2, 3, 4, 5, 6)scala> val seq = List(("American Person", List("Tom", "Jim")), ("China Person", List("LiLei", "HanMeiMei")), ("Color Type", List("Red", "Blue")))
val seq: List[(String, List[String])] = List((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))scala> val rdd2 = sc.makeRDD(seq)
val rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at makeRDD at <console>:1scala> rdd2.partitions.size
val res0: Int = 3scala> rdd2.foreach(println)
American Person
Color Type
China Personscala> val rdd1 = sc.parallelize(seq)
val rdd1: org.apache.spark.rdd.RDD[(String, List[String])] = ParallelCollectionRDD[1] at parallelize at <console>:1scala> rdd1.partitions.size
val res1: Int = 2scala> rdd2.collect()
val res2: Array[String] = Array(American Person, China Person, Color Type)scala> rdd1.collect()
val res3: Array[(String, List[String])] = Array((American Person,List(Tom, Jim)), (China Person,List(LiLei, HanMeiMei)), (Color Type,List(Red, Blue)))scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[4] at textFile at <console>:1scala> lines.collect()
val res6: Array[String] = Array(a,b,c)scala> lines.partitions.size
val res7: Int = 3

转换算子

flatMap

map

reduceByKey

groupByKey

举例

scala> var lines = sc.textFile("/root/tmp/a.txt",3)
var lines: org.apache.spark.rdd.RDD[String] = /root/tmp/a.txt MapPartitionsRDD[13] at textFile at <console>:1scala> lines.flatMap(x=>x.split(",")).map(x=>(x,1)).reduceByKey((a,b)=>a+b).foreach(println)
(c,2)
(b,1)
(d,1)
(a,2)scala> lines.collect()
val res27: Array[String] = Array(a,b,c, c, a,d)scala> lines.map(_.split(",")).collect()
val res25: Array[Array[String]] = Array(Array(a, b, c), Array(c), Array(a, d))scala> lines.flatMap(_.split(",")).collect()
val res26: Array[String] = Array(a, b, c, c, a, d)

行动算子

http://www.lryc.cn/news/254305.html

相关文章:

  • 【华为数据之道学习笔记】3-1 基于数据特性的分类管理框架
  • 电脑版便签软件怎么设置在桌面上显示?
  • 【华为数据之道学习笔记】2-建立企业级数据综合治理体系
  • 【IC前端虚拟项目】git和svn项目托管平台的简单使用说明
  • C++ IO库
  • Springboot 项目关于版本升级到 3.x ,JDK升级到17的相关问题
  • QGraphicsView实现简易地图7『异步加载-多瓦片-无底图』
  • Spring Boot学习(三十三):集成kafka
  • MOSFET
  • DriveWorks——参数化设计非标定制利器
  • DevEco Studio集成ArkUI-X
  • 网络视频服务器的作用是什么?
  • 解决vue3使用iconpark控制台预警提示问题
  • VMware 虚拟机 NAT 模式网络配置
  • 5-redis高级-哨兵
  • 鸿蒙HarmonyOS4.0开发应用学习笔记
  • 联通宽带+老毛子Padavan固件 开启IP v6
  • 唯创知音WT2003Hx系列单片机语音芯片:家庭理疗产品的智能声音伴侣
  • 2023_Spark_实验二十七:Linux中Crontab(定时任务)命令详解及使用教程
  • Java动态代理实现与原理详细分析
  • [实践总结] 使用Apache HttpClient 4.x进行进行一次Http请求
  • 易宝OA 两处任意文件上传漏洞复现
  • echart饼图高亮颜色设置,数据为0时候,labelLine不显示
  • Kafka 的消息格式:了解消息结构与序列化
  • 装箱 Box 数据类型
  • 多传感器融合SLAM在自动驾驶方向的初步探索的记录
  • ffmpeg与opencv-python处理视频
  • java 操作git
  • Linux 导入、导出 MySQL 数据库命令
  • 华为数通---BFD多跳检测示例