当前位置: 首页 > news >正文

【大数据面试知识点】Spark中的累加器

Spark累加器

累加器用来把Executor端变量信息聚合到Driver端,在driver程序中定义的变量,在Executor端的每个task都会得到这个变量的一份新的副本,每个task更新这些副本的值后,传回driver端进行merge。

累加器一般是放在行动算子中进行操作的。

Spark累加器有哪些特点?

1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态

2)在Executor中修改它,在Driver读取

3)executor级别共享的,广播变量是task级别的共享两个application不可以共享累加器,但是同一个app不同的job可以共享

应用举例

不经过Shuffle实现词频统计

object Spark06_Accumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("a", 3), ("b", 4)))// 声明累加器val sumAcc: LongAccumulator = sc.longAccumulator("sumAcc")rdd.foreach {case (word, count) => {// 使用累加器sumAcc.add(count)}}// 累加器的toString方法//println(sumAcc)//取出累加器中的值println(sumAcc.value)sc.stop()}
}

不经过shuffle,计算以H开头的单词出现的次数。

object Spark07_MyAccumulator {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")val sc = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(List("Hello", "HaHa", "spark", "scala", "Hi", "Hello", "Hi"))// 创建累加器val myAcc = new MyAccumulator//注册累加器sc.register(myAcc, "MyAcc")rdd.foreach{datas => {// 使用累加器myAcc.add(datas)}}// 获取累加器的结果println(myAcc.value)sc.stop()}
}// 自定义累加器
// 泛型分别为输入类型和输出类型
class MyAccumulator extends AccumulatorV2[String, mutable.Map[String, Int]] {// 定义输出数据变量var map: mutable.Map[String, Int] = mutable.Map[String, Int]()// 累加器是否为初始状态override def isZero: Boolean = map.isEmpty// 复制累加器override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {val MyAcc = new MyAccumulator// 将此累加器中的数据赋值给新创建的累加器MyAcc.map = this.mapMyAcc}// 重置累加器override def reset(): Unit = {map.clear()}// 累加器添加元素override def add(v: String): Unit = {if (v.startsWith("H")) {// 判断map集合中是否已经存在此元素map(v) = map.getOrElse(v, 0) + 1}}// 合并累加器中的元素override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {val map1: mutable.Map[String, Int] = this.mapval map2: mutable.Map[String, Int] = other.value// 合并两个mapmap = map1.foldLeft(map2) {(m, kv) => {m(kv._1) = m.getOrElse(kv._1, 0) + kv._2m}}}// 获取累加器中的值override def value: mutable.Map[String, Int] = {map}
}

参考:Spark累加器的作用和使用-CSDN博客

http://www.lryc.cn/news/270924.html

相关文章:

  • 深度学习核心技术与实践之深度学习基础篇
  • Kafka安装及简单使用介绍
  • 20231229在Firefly的AIO-3399J开发板的Android11使用挖掘机的DTS配置单前后摄像头ov13850
  • 九台虚拟机网站流量分析项目启动步骤
  • 迅软科技助力高科技防泄密:从华为事件中汲取经验教训
  • 数据结构期末复习(2)链表
  • Hive中支持毫秒级别的时间精度
  • 【深度学习:Recurrent Neural Networks】循环神经网络(RNN)的简要概述
  • HTML 基础
  • 大学物理II-作业1【题解】
  • Unity引擎有哪些优点
  • 【华为机试】2023年真题B卷(python)-猴子爬山
  • 【Harmony OS - Stage应用模型】
  • Java 8 中的 Stream 轻松遍历树形结构!
  • Openwrt修改Dropbear ssh root密码
  • js 对象
  • 【SpringBoot】常用注解
  • 【模拟电路】软件Circuit JS
  • 从入门到精通,30天带你学会C++【第十天:猜数游戏】
  • 使用ASP.NET MiniAPI 调试未匹配请求路径
  • 数据结构: 位图
  • Nginx 反向代理负载均衡
  • SAP FIORI 初步了解
  • chrome浏览器记录不住网站登录状态,退出后再打开就需要重新登陆的解决办法
  • Linux lpd命令教程:打印服务管理技巧全解析(附实例教程和注意事项)
  • 利用STM32和可控硅控制220V加热电路
  • 在高并发场景下,缓存“雪崩”了怎么办
  • 本地git服务器的使用
  • Mybatis Java API - SqlSessionFactoryBuilder
  • 【动态规划】 LCR 099. 最小路径和