当前位置: 首页 > news >正文

【博学谷学习记录】超强总结,用心分享|Spark的RDD算子分类

概念

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合,它是一种抽象的数据模型,本身并不存储数据,仅仅是一个数据传输的管道,作为使用者,只需要告知RDD,数据从哪里读,中间需要进行什么样的转换逻辑,以及最后需要将结果输出到什么位置即可,RDD启动后,会根据用户设置的规则,完成整个处理操作

分类

所有的RDD算子,共分为2大类

  1. Transformation(转换算子)
    1. 所有的转换算子执行后,都会返回一个新的RDD
    2. 所有转换算子是惰性的,不会立即执行,可以认为只是此时只是定义了RDD的计算规则
    3. 转换算子必须遇到动作算子都会触发执行
    4. 常见转换算子
      1. map, filter, flatMap, mapPartitions, mapPartitionsWithIndex
  2. Action(动作算子)
    1. 动作算子执行后,不会返回一个RDD,要么没有返回值,要么返回其它的
    2. 动作算子都是立即执行,一个动作算子会产生一个Jo任务,运行动作算子所依赖的所有RDD
    3. 常见动作算子
      1. collect, count, first, take, reduce

转换算子

值类型的算子

map算子
  • 格式:rdd.map(fn)
  • 说明:根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请对每一个元素进行 +1 返回
rdd_collect = rdd.map(lambda num: num + 1).collect()
print(rdd_collect)结果:
[2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

groupBy算子
  • 格式:rdd.groupBy(fn)
  • 说明:根据传入的函数对数据进行分组操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请将数据分为奇数和偶数二部分
rdd_collect = rdd.groupBy(lambda num: 'o' if num % 2 == 0 else 'j').mapValues(list).collect()
print(rdd_collect)结果:[('j', [1, 3, 5, 7, 9]), ('o', [2, 4, 6, 8, 10])]

filter算子
  • 格式:rdd.filter(fn)
  • 说明:过滤算子, 可以根据函数中指定的过滤条件, 对数据进行过滤操作, 条件返回True表示保留, 返回False表示过滤掉
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 需求: 请将 <=3的数据过滤掉
rdd_collect = rdd.filter(lambda num: num > 3).collect()
print(rdd_collect)结果:[4, 5, 6, 7, 8, 9, 10]

flatMap算子
  • 格式:rdd.flatMap(fn)
  • 说明:在map算子的基础上, 在加入一个压扁的操作, 主要适用于一行中包含多个内容的操作, 实现一转多的操作
rdd = sc.parallelize(['张三 李四 王五 赵六','田七 周八 李九'])# 需求: 将其转换为一个个的姓名
rdd_collect = rdd.flatMap(lambda line: line.split()).collect()
print(rdd_collect)结果:['张三', '李四', '王五', '赵六', '田七', '周八', '李九']

双值类型的算子

union算子
  • 格式:rdd1.union(rdd2)
  • 说明:取两组数据的并集
rdd1 = sc.parallelize([3,1,5,7,9])
rdd2 = sc.parallelize([5,8,2,4,0])# 需求: 取两组数据的并集
rdd1.union(rdd2).collect()结果:[3, 1, 5, 7, 9, 5, 8, 2, 4, 0]# 去重操作: 
rdd1.union(rdd2).distinct().collect()结果:[8, 4, 0, 1, 5, 9, 2, 3, 7]

intersection算子
  • 格式:rdd1.intersection(rdd2)
  • 说明:取两组数据的交集
rdd1.intersection(rdd2).collect()结果:[5]

KV类型的算子

groupByKey算子:
  • 格式: groupByKey()
  • 说明: 根据key进行分组操作
rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c03','赵六'),('c02','田七'),('c02','周八'),('c03','李九')])# 需求: 根据班级分组统计
rdd_collect = rdd.groupByKey().mapValues(list).collect()
print(rdd_collect)结果:[('c01', ['张三']), ('c02', ['李四', '王五', '田七', '周八']), ('c03', ['赵六', '李九'])]

reduceByKey()
  • 格式:  reduceByKey(fn)
  • 说明: 根据key进行分组, 将一个组内的value数据放置到一个列表中, 对这个列表基于 传入函数进行聚合计算操作
rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c03','赵六'),('c02','田七'),('c02','周八'),('c03','李九')])# 需求: 统计每个班级有多少个人
rdd_collect = rdd.map(lambda kv: (kv[0],1)).reduceByKey(lambda agg, curr: agg + curr).collect()
print(rdd_collect)结果:[('c01', 1), ('c02', 4), ('c03', 2)]# 如果不转为1:
rdd.reduceByKey(lambda agg,curr: agg + curr).collect()    
结果: [('c01', '张三'), ('c02', '李四王五田七周八'), ('c03', '赵六李九')]

sortByKey()算子
  • 格式: sortByKey(ascending = True|False)
  • 说明: 根据key进行排序操作, 默认按照key进行升序排序, 如果需要倒序, 设置 ascending  为False
rdd = sc.parallelize([('c03','张三'),('c05','李四'),('c011','王五'),('c09','赵六'),('c02','田七'),('c07','周八'),('c06','李九')])# 根据班级序号排序
rdd.sortByKey().collect()结果: 字典序 如果key是字符串[('c011', '王五'), ('c02', '田七'), ('c03', '张三'), ('c05', '李四'), ('c06', '李九'), ('c07', '周八'), ('c09', '赵六')]

动作算子

collect() 算子

  • 格式: collect()
  • 作用: 收集各个分区的数据, 将数据汇总到一个大的列表返回

reduce() 算子

  • 格式: reduce(fn)
  • 作用: 根据传入的函数对数据进行聚合操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 求第1数累加到最后一个数的和
rdd.reduce(lambda agg,curr: agg + curr)结果:55

first()算子

  • 格式: first()
  • 说明: 获取第一个元素
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 获取数据集中的第一个元素
rdd.first()结果:1

take() 算子

  • 格式: take(N)
  • 说明: 获取前N个元素, 类似于limit操作
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])# 获取数据集中的前5个元素
rdd.take(5)结果
[1, 2, 3, 4, 5]

top() 算子

  • 格式: top(N, [fn])
  • 说明: 对数据集进行倒序排序操作, 如果是kv类型, 默认是针对key进行排序, 获取前N个元素
  • fn: 可以自定义排序, 根据谁来排序
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])rdd.top(3)
结果:
[10, 9, 8]rdd = sc.parallelize([('c03','张三'),('c05','李四'),('c011','王五'),('c09','赵六'),('c02','田七'),('c07','周八'),('c06','李九')])rdd.top(3)
结果:
[('c09', '赵六'), ('c07', '周八'), ('c06', '李九')]rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.top(3,lambda kv: kv[1])
结果:
[('c02', 80), ('c07', 12), ('c06', 10)]

count()算子

  • 格式: count()
  • 说明: 统计多少个
rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.count()
结果:7

foreach()算子

  • 格式: foreach(fn)
  • 说明: 对数据集进行遍历操作, 遍历后做什么, 取决于传入的函数
rdd = sc.parallelize([('c03',5),('c05',9),('c011',2),('c09',6),('c02',80),('c07',12),('c06',10)])rdd.foreach(lambda kv: print(kv))
结果:('c03', 5)('c05', 9)('c011', 2)('c09', 6)('c02', 80)('c07', 12)('c06', 10)

takeSample()算子

  • 格式: takeSample(True|False, N,seed(种子值))
    • 参数1: 是否允许重复采样
    • 参数2: 采样多少个, 如果允许重复采样, 采样个数不限制, 否则最多等于本身数量个数
    • 参数3: 设置种子值, 值可以随便写, 一旦写死了, 表示每次采样的内容也是固定的(可选的) 如果没有特殊需要, 一般不设置
  • 作用: 数据抽样
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])rdd.takeSample(True,5)
[9, 9, 4, 8, 9]
rdd.takeSample(True,5)
[3, 8, 1, 3, 9]
rdd.takeSample(False,5)
[6, 1, 8, 7, 3]
rdd.takeSample(False,5)
[5, 7, 6, 3, 8]
rdd.takeSample(False,20)
[2, 10, 7, 5, 8, 9, 3, 4, 6, 1]
rdd.takeSample(False,5) 
[8, 3, 10, 7, 9]rdd.takeSample(False,5,2)  
[6, 10, 4, 5, 7]
rdd.takeSample(False,5,2)
[6, 10, 4, 5, 7]
rdd.takeSample(False,5,2)
[6, 10, 4, 5, 7]
rdd.takeSample(False,3,2)
[6, 10, 4]
http://www.lryc.cn/news/985.html

相关文章:

  • 云原生系列之使用 prometheus监控远程主机实战
  • 2023年地方两会政府工作报告汇总(各省市23年重点工作)
  • 第一章 企业管理概论
  • 独立图片服务器有什么突出之处
  • Linux驱动开发基础__mmap
  • 若依框架---为什么把添加和更新分成两个接口
  • 图论算法:Floyd算法
  • 回顾 | .NET MAUI 跨平台应用开发 - 用 .NET MAUI 开发一个无人机应用(下)
  • 部署有多个仓库的svn服务
  • Mapper文件注入问题
  • 基于微信小程序的国产动漫论坛小程序
  • 常用限流算法
  • 前端面经详解
  • 网页CAD开发快速入门
  • C#开发的OpenRA的mod.yaml文件
  • 【ESP32+freeRTOS学习笔记-(七)中断管理】
  • 【总结】1591- 从入门到精通:使用 TypeScript 开发超强的 CLI 工具
  • 【Java】int和Integer的区别?为什么有包装类?
  • 【LeetCode】石子游戏 IV [H](动态规划)
  • 修改Vue项目运行的IP和端口
  • 【C++提高编程】map/ multimap 容器详解(附测试用例与结果图)
  • laravel操作redis和缓存操作
  • 目标检测论文阅读:GaFPN算法笔记
  • 【转】Generative Pretrained Transformer
  • day34|343. 整数拆分、96.不同的二叉搜索树
  • WeNet - 初识
  • 为什么各个企业都在创建FAQ、常见问题页面?
  • 【React-Router】路由传参,路由嵌套,手动导航,路由文件配置
  • 面向对象分析与设计(OOAD)
  • 数据库调优