当前位置: 首页 > news >正文

【Spark征服之路-3.6-Spark-SQL核心编程(五)】

自定义函数:

UDF:

val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SQLDemo")
//创建SparkSession对象
val spark :SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

import spark.implicits._
//读取json文件
val df : DataFrame = spark.read.json("Spark-SQL/input/user.json")

spark.udf.register("addName",(x:String)=>"Name:"+x)

df.createOrReplaceTempView("people")
spark.sql("select addName(username),age from people").show()

spark.stop()

UDAF(自定义聚合函数)

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),

countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。Spark3.0之前我们使用的是UserDefinedAggregateFunction作为自定义聚合函数,从 Spark3.0 版本后可以统一采用强类型聚合函数 Aggregator

实验需求:计算平均工资

实现方式一:RDD

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
val resRDD: (Int, Int) = sc.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40))).map {
case (name, salary) => {
    (salary, 1)
  }
}.reduce {
  (t1, t2) => {
    (t1._1 + t2._1, t1._2 + t2._2)
  }
}
println(resRDD._1/resRDD._2)
// 关闭连接
sc.stop()

实现方式二:弱类型UDAF

class MyAverageUDAF extends UserDefinedAggregateFunction{
def inputSchema: StructType =
StructType(Array(StructField("salary",IntegerType)))
// 聚合函数缓冲区中值的数据类型(salary,count)
  def bufferSchema: StructType = {

StructType(Array(StructField("sum",LongType),StructField("count",LongType)))
  }
// 函数返回值的数据类型
  def dataType: DataType = DoubleType
// 稳定性:对于相同的输入是否一直返回相同的输出。
  def deterministic: Boolean = true
  // 函数缓冲区初始化
  def initialize(buffer: MutableAggregationBuffer): Unit = {
// 存薪资的总和
    buffer(0) = 0L
// 存薪资的个数
    buffer(1) = 0L
}
// 更新缓冲区中的数据
  def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
if (!input.isNullAt(0)) {
      buffer(0) = buffer.getLong(0) + input.getInt(0)
      buffer(1) = buffer.getLong(1) + 1
}
  }
// 合并缓冲区
  def merge(buffer1: MutableAggregationBuffer,buffer2: Row): Unit = {
    buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
    buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
  }
// 计算最终结果
  def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble /
    buffer.getLong(1)
}

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))

val df :DataFrame = res.toDF("name","salary")
df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
// spark 中注册聚合函数
spark.udf.register("avgSalary",myAverage)
spark.sql("select avgSalary(salary) from user").show()

// 关闭连接
spark.stop()

实现方式三:强类型UDAF

case class Buff(var sum:Long,var cnt:Long)
class MyAverageUDAF extends Aggregator[Long,Buff,Double]{
override def zero: Buff = Buff(0,0)
override def reduce(b: Buff, a: Long): Buff = {
    b.sum += a
    b.cnt += 1
b
  }
override def merge(b1: Buff, b2: Buff): Buff = {
    b1.sum += b2.sum
    b1.cnt += b2.cnt
    b1
  }
override def finish(reduction: Buff): Double = {
    reduction.sum.toDouble/reduction.cnt
  }
override def bufferEncoder: Encoder[Buff] = Encoders.product
  override def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

val sparkconf: SparkConf = new SparkConf().setAppName("app").setMaster("local[*]")
val spark:SparkSession = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._
val res :RDD[(String,Int)]= spark.sparkContext.makeRDD(List(("zhangsan", 20), ("lisi", 30), ("wangwu",40)))

val df :DataFrame = res.toDF("name","salary")
df.createOrReplaceTempView("user")
var myAverage = new MyAverageUDAF
// spark 中注册聚合函数
spark.udf.register("avgSalary",functions.udaf(myAverage))
spark.sql("select avgSalary(salary) from user").show()

// 关闭连接
spark.stop()

http://www.lryc.cn/news/594715.html

相关文章:

  • Linux 文件操作详解:结构、系统调用、权限与实践
  • 第二阶段-第二章—8天Python从入门到精通【itheima】-134节(SQL——DQL——分组聚合)
  • leetcode-sql-627变更性别
  • 深入解析IP协议:组成、地址管理与路由选择
  • Tomato靶机通关教程
  • 安装docker可视化工具 Portainer中文版(ubuntu上演示,所有docker通用) 支持控制各种容器,容器操作简单化 降低容器门槛
  • 板凳-------Mysql cookbook学习 (十二--------4)
  • 技能学习PostgreSQL中级专家
  • 借助AI学习开源代码git0.7之六write-cache
  • 基于 STM32 的数字闹钟系统 Proteus 仿真设计与实现
  • 从一开始的网络攻防(六):php反序列化
  • 金仓数据库:融合进化,智领未来——2025年数据库技术革命的深度解析
  • STM32 USB键盘实现指南
  • 最严电动自行车新规,即将实施!
  • FreeSwitch通过Websocket(流式双向语音)对接AI实时语音大模型技术方案(mod_ppy_aduio_stream)
  • 朝歌智慧盘古信息:以IMS MOM V6重构国产化智能终端新生态
  • 【初识数据结构】CS61B中的最小生成树问题
  • Car Kit重构车机开发体验,让车载应用开发驶入快车道
  • 【PTA数据结构 | C语言版】拓扑排序
  • OR条件拆分:避免索引失效的查询重构技巧
  • 【web自动化】-5- fixture集中管理和项目重构
  • 2024年ASOC SCI2区TOP,基于Jaya算法的粒子滤波器用于非线性模型贝叶斯更新,深度解析+性能实测
  • 代码随想录算法训练营第二十七天
  • 为什么 tcp_syncookies 不能取代半连接队列?
  • 【前端】jszip+file-saver:多个视频url下载到zip、页面预加载视频、预览视频、强制刷新视频
  • Python并发编程:突破GIL枷锁,高效利用多核CPU
  • 服务器系统时间不准确怎么办?
  • PHP反序列化漏洞详解
  • 4 种更新的方法将消息从安卓传输到 Mac
  • 2025三掌柜赠书活动第二十五期 网络安全应急响应实战