当前位置: 首页 > news >正文

Spark SQL函数定义【博学谷学习记录】

1 如何使用窗口函数

窗口函数格式:

分析函数 over(partition by xxx order by xxx [asc|desc] [rows between xxx and xxx])

学习的相关分析函数有那些?

第一类: row_number() rank() dense_rank() ntile()

第二类: 和聚合函数组合使用 sum() avg() max() min() count()

第三类: lag() lead() first_value() last_value()

SQL中: 与HIVE中应用基本没啥区别, 更多关注的是DSL写法

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql import Window as win
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示: 如何在Spark SQL中使用窗口函数...")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()# 2-读取外部文件的数据df = spark.read.csv(path='file:///export/data/workspace/ky06_pyspark/_03_SparkSql/data/pv.csv',header=True,inferSchema=True)df.createTempView('t1')# 3- 执行相关的操作# 需要: 统计每个cookie中, pv数量排名前二内容是哪些? (分组TOPN 问题)# SQLspark.sql("""with t2 as(select*,row_number() over (partition by cookieid order by pv desc) as rank1from t1 )select  * from  t2 where rank1 <=2""").show()# DSL:df.select('*',F.row_number().over(win.partitionBy('cookieid').orderBy(F.desc('pv'))).alias('rank1')).where('rank1 <= 2').show()

2 SQL函数的分类说明

整个SQL函数, 主要是分为以下三大类:

  • UDF函数: 用户自定义函数

  • 表示: 一进一出

  • 整个函数中, 大多数的函数都是属于一进一出的函数: split() substr()

  • UDAF函数: 用户自定义聚合函数

  • 表示: 多进一出

  • 例如: sum() avg() count() ….

  • UDTF函数: 用户自定义表生成函数

  • 表示: 一进多出

  • 指的: 进去一行数据, 最终产生多行 或者多列的数据

  • 例如: explode

在SQL中提供的内置函数, 都是属于以上三类中某一类函数

思考: 提供了那么多的函数, 为啥还需要自定义函数呢?

扩充函数. 在实际使用中, 并不能保证所有的操作可能用的函数都已经提前的内置好, 尤其是很多具有特殊业务处理功能, 其实并没有相对应函数 , 提供的函数更多是以公共的功能为主函数, 此时需要进行自定义, 从而扩充新的功能

在Spark SQL中, 对于自定义函数, 原生支持的粒度并不是特别好, 目前原生的PY方案仅支持自定义UDF函数, 无法自定义UDAF函数和UDTF函数, 在1.6版本之后, Java 和scala语言支持了自定义UDAF函数,但是Python并不支持,Spark官方提供了解决的方案: 基于pandas来自定义UDF 和 UDAF函数. 但是对于UDTF函数, Spark是不支持自定义,但是Spark支持HIVE的函数定义, 所以可以通过HIVE自定义函数来解决

虽然Python支持自定义UDF函数, 但是其效率并不是特别的高效, 因为在使用的时候, 传递一行处理一行, 返回一行的操作, 这样会带来非常大的序列化开销问题, 以及网络开销问题, 导致原生UDF函数效率不好

早期解决方案: 基于 scala/Java来编写自定义UDF函数, 然后基于Python使用即可

目前主要解决方案: 引入Arrow框架, 可以基于内存来完成数据传输工作, 可以大大降低了序列化开销问题, 提供传输的效率, 解决了原生问题, 同时还可以基于pandas的自定义函数, 利用pandas函数优势, 完成各种处理操作

所以后期主推的方案: 基于pandas 自定义函数, 然后底层基于arrow完成数据传输工作

3 Spark SQL原生自定义函数

第一步: 在Python中创建一个python的函数, 在这个函数中书写自定义函数的功能逻辑代码即可

第二步: 将Python函数注册到Spark SQL中, 成为Spark SQL的函数

注册方式一: udf对象 = sparkSession.udf.register(参数1,参数2,参数3)

参数1: UDF函数的名称, 此名称用于后续在SQL语法中使用 , 可以任意定义名称, 但是要符合定义名称规范

参数2: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数

参数3: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型

udf对象: 此对象主要是用于DSL中

注册方式二: udf对象 = F.udf(参数1,参数2)

参数1: python函数的名称, 表示将哪个python的函数注册为Spark SQL的函数

参数2: 返回值的类型, 用于表示当前这个Python的函数返回的类型对应的Spark SQL的数据类型

udf对象: 此对象主要是用于DSL中

说明: 此种方式还支持语法糖写法: @F.udf(returnType=返回值类型) 需要放置到对应函数上面

第三步: 在Spark SQL的DSL/SQL中进行使用即可

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os# 锁定远端环境, 确保环境统一
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'if __name__ == '__main__':print("演示原生的自定义函数:")# 1- 创建SparkSession对象spark = SparkSession.builder.appName('df_write').master('local[*]').getOrCreate()# 2- 初始化一些数据df = spark.createDataFrame(data=[(1,'张三','北京'),(2,'李四','上海'),(3,'王五','广州'),(4,'赵六','深圳'),(5,'田七','杭州')],schema='id int,name string,address string')df.createTempView('t1')# 3- 执行相关的操作:# 请自定义一个函数, 完成对数据统一添加一个后缀名的操作# 3.1 定义一个Python的函数, 接收一个数据, 给数据添加一个后缀返回@F.udf(returnType=StringType())def add_post(data):return data+'_boxuegu'# 3.2 对函数进行注册操作# 注册方式一# 当采用注解方式注册函数的使用, 如果依然想在SQL中使用, 可以再次使用方式一注册,但是不需要设置返回值类型了spark.udf.register('add_post_sql',add_post)# 注册方式二: 还有一种语法糖模式#add_post_dsl = F.udf(add_post,StringType())# 3.3 使用自定义函数# SQLspark.sql("""select*,add_post_sql(address) as r1from t1""").show()# DSLdf.select('*',add_post('address').alias('r1')).show()
http://www.lryc.cn/news/41091.html

相关文章:

  • 模拟实现STL容器之vector
  • ChatGPT-4.0 : 未来已来,你来不来
  • Java反射(详细学习笔记)
  • 学习 Python 之 Pygame 开发魂斗罗(十二)
  • Linux下字符设备驱动开发以及流程介绍
  • Web自动化框架断言方法实现
  • 8大核心语句,带你深入python
  • 【批处理】- 批处理自动安装Mysql与Redis
  • 聊聊华为的工作模式
  • 燕山大学-面向对象程序设计实验-实验6 派生与继承:多重派生-实验报告
  • 分割两个字符串得到回文串[抽象--去除具体个性取共性需求]
  • 【LeetCode】1609. 奇偶树、1122. 数组的相对排序
  • 【C++初阶】4. Date类的实现
  • ES6新特性--变量声明
  • 【Django】缓存机制
  • 我的创作纪念日——一年的时间可以改变很多
  • Jetson Nano驱动机器人的左右两路电机
  • 如何通过openssl生成公钥和私钥?
  • Verilog的If语句和Case语句
  • HJ31 单词倒排
  • leetcode——203.移除链表元素
  • GPT-4来袭:开启人工智能新时代
  • 芯微电子IPO终止:业绩开始大幅下滑,王日新、王苟新兄弟不同命
  • 【C++】用手搓的红黑树手搓set和map
  • 【C++】空指针弃NULL用nullptr
  • 【selenium学习】数据驱动测试
  • 嵌入式硬件电路设计的基本技巧
  • Spring MVC 图片的上传和下载
  • 远程工具神器之MobaXterm (小白必看)
  • VRIK+Unity XR Interaction Toolkit 实现VR上半身的追踪(附带VRM模型导入Unity方法和手腕扭曲的解决方法)