当前位置: 首页 > news >正文

SparkSql---用户自定义函数UDFUDAF

文章目录

  • 1.UDF
  • 2.UDAF
    • 2.1 UDF函数实现原理
    • 2.2需求:计算用户平均年龄
      • 2.2.1 使用RDD实现
      • 2.2.2 使用UDAF弱类型实现
      • 2.2.3 使用UDAF强类型实现

1.UDF

用户可以通过 spark.udf 功能添加自定义函数,实现自定义功能。

如:实现需求在用户name前加上"Name:"字符串,并打印在控制台

  def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._//创建DataFrameval dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24)))val dataframe = dataRDD.toDF("name","age")//注册udf函数sc.udf.register("addName",(x:String)=>"Name:"+x)//创建临时视图dataframe.createOrReplaceTempView("people")//对临时视图使用udf函数sc.sql("select addName(name) from people").show()sc.stop()}

在这里插入图片描述

2.UDAF

强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数, 如 count(),countDistinct(),avg(),max(),min()。除此之外,用户可以设定自己的自定义聚合函数。**通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。**从 Spark3.0 版本后,UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator。

2.1 UDF函数实现原理

在这里插入图片描述
在Spark中,UDF(用户自定义函数)在对表中的数据进行处理时,通常会将数据放入缓冲区中以便进行计算。这种缓冲策略可以提高数据处理的效率,特别是对于大数据集。

2.2需求:计算用户平均年龄

2.2.1 使用RDD实现

    val dataRDD: RDD[(String,Int)] = sc.sparkContext.makeRDD(List(("zhangsan",21),("lisi",24),("wangwu",26)))val reduceResult: (Int, Int) = dataRDD.map({case (name, age) => {(age, 1)}}).reduce((t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})println(reduceResult._1/reduceResult._2)

在这里插入图片描述

2.2.2 使用UDAF弱类型实现

需要用户自定义类实现UserDefinedAggregateFunction,并重写其中的方法,当前已不推荐使用。

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo02 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")dataFrame.createOrReplaceTempView("user")//创建聚合函数var myAvg=new MyAverageUDAF()//在Spark中注册自定义的聚合函数sc.udf.register("avgMy",myAvg)sc.sql("select avgMy(age) from user").show()sc.stop()}case class User(var name:String,var age:Int)}class MyAverageUDAF extends UserDefinedAggregateFunction{//输入的要进行聚合的参数的类型override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))//聚合函数缓冲区中的值的数据类型override def bufferSchema: StructType = StructType(Array(StructField("sum",LongType),StructField("count",LongType)))//函数返回的值的数据类型override def dataType: DataType = DoubleType//判断函数的稳定性//对于相同类型的输入是否有相同类型的输出override def deterministic: Boolean = true//聚合函数缓冲区中值的初始化//因为数据是弱类型的,函数缓冲区中是根据索引来找到对应的变量override def initialize(buffer: MutableAggregationBuffer): Unit = {//年龄的总和buffer(0)=0L//年龄的个数buffer(1)=0L}//更新缓冲区中的数据(执行操作步骤)override def update(buffer: MutableAggregationBuffer, input: Row): Unit ={//第0个索引值是否为空if(!input.isNullAt(0)) {//更新年龄sum的值buffer(0)=buffer.getLong(0)+input.getInt(0)//更新年龄个数buffer(1)=buffer.getLong(1)+1;}}//合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0)=buffer1.getLong(0)+buffer2.getLong(0)buffer1(1)=buffer1.getLong(1)+buffer2.getLong(1)}//计算最终结果override def evaluate(buffer: Row): Double = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}

在这里插入图片描述

2.2.3 使用UDAF强类型实现

Spark3.0 版本可以采用强类型的 Aggregator 方式代替 UserDefinedAggregateFunction

package bigdata.wordcount.udfimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Encoder, Encoders, Row, SparkSession, TypedColumn}
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.util.AccumulatorV2/*** 用户自定义函数*/
object UDF_Demo03 {def main(args: Array[String]): Unit = {//创建上下文环境配置对象val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo03")//创建 SparkSession 对象val sc: SparkSession = SparkSession.builder().config(conf).getOrCreate()import sc.implicits._val dataRDD: RDD[(String, Int)] = sc.sparkContext.makeRDD(List(("zhangsan", 19), ("lisi", 21), ("wangwu", 22)))val dataFrame: DataFrame = dataRDD.toDF("name","age")val dataset: Dataset[User01] = dataFrame.as[User01]//创建聚合函数var myAvg=new MyAverageUDAF01()//将聚合函数转换为查询的列val col: TypedColumn[User01, Double] = myAvg.toColumn//执行查询操作dataset.select(col).show()sc.stop()}case class User(var name:String,var age:Int)}//输入数据类型
case class User01(var name:String,var age:Int)
//缓存中的数据类型
case class AgeBuffer(var sum:Long,var count:Long)class MyAverageUDAF01 extends Aggregator[User01,AgeBuffer,Double]{//设置初始值override def zero: AgeBuffer = {AgeBuffer(0L,0L)}//缓冲区实现聚合override def reduce(b: AgeBuffer, a: User01): AgeBuffer = {b.sum = b.sum + a.ageb.count = b.count + 1b}//合并缓冲区override def merge(b1: AgeBuffer, b2: AgeBuffer): AgeBuffer = {b1.sum+=b2.sumb1.count+=b2.countb1}//计算最终结果override def finish(buff: AgeBuffer): Double = {buff.sum.toDouble/buff.count}//设置编码器和解码器//自定义类型就是 product 自带类型根据类型选择override def bufferEncoder: Encoder[AgeBuffer] = {Encoders.product}override def outputEncoder: Encoder[Double] = {Encoders.scalaDouble}
}

在这里插入图片描述

http://www.lryc.cn/news/289862.html

相关文章:

  • 系统架构15 - 软件工程(3)
  • 两个近期的计算机领域国际学术会议(软件工程、计算机安全):欢迎投稿
  • (二十一)Flask之上下文管理第二篇(细细扣一遍源码)
  • Java项目:基于SSM框架实现的企业员工岗前培训管理系统(ssm+B/S架构+源码+数据库+毕业论文)
  • 深入了解Redis:选择适用于你的场景的持久化方案
  • 【Git配置代理】Failed to connect to github.com port 443 问题解决方法
  • python提取word文档内容的示例
  • MarkDown快速入门-以Obsidian编辑器为例
  • 【计算机网络】协议,电路交换,分组交换
  • 加速应用开发:低代码云SaaS和源码交付模式如何选
  • ATT汇编
  • java split 拆分字符串
  • 【InternLM 大模型实战】作业与笔记汇总
  • 解析PreMaint在石油化工设备预测性维护领域的卓越表现
  • C++面试宝典第25题:阶乘末尾零的个数
  • PCIE 4.0 Equalizaiton(LTSSM 均衡流程)
  • [libwebsockets]lighttpd+libwebsockets支持ws和wss配置方法说明
  • 常用软件安装
  • 翻译: GPT-4 Vision静态图表转换为动态数据可视化 升级Streamlit 三
  • Qt QPlainTextEdit高亮显示当前行
  • Linux编辑器vim(含vim的配置)
  • Oracle DG环境下的秘钥管理
  • 【Sql Server】新手一分钟看懂在已有表基础上增加字段和说明
  • 亚信安慧AntDB打造开放创新的数据库生态
  • 在Mixamo网站上,下载的动画导入unity给自己的模型添加后出错怎么解决
  • java servlet运输公司管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目
  • React中antd的使用技巧
  • 2024年第一篇博客
  • Nginx负载均衡下的webshell连接
  • JAVA编程语言单词汇总