当前位置: 首页 > news >正文

Spark高级用法-自定义函数

用户可以根据需求自己封装计算的逻辑,对字段数据进行计算

内置函数,是spark提供的对字段操作的方法 ,split(字段) 对字段中的数进行切割,F.sum(字段) 会将该字段下的数据进行求和

实际业务中又能内置函数不满足计算需求,此时就需要自定义行数,完成字段数据的业务处

 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

UDF函数

对每一行数据以此进行计算,返回每一行的结果 

1)不带装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
# @F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 将自定义的函数注册到spark中使用
# 参数一 指定spark中使用函数的名
# 参数二  指定自定义函数的名
# 参数三  指定函数的返回值类型
# 接收参数  定义和函数名一样的值
len_func = ss.udf.register('len_func',len_func,returnType = IntegerType())# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()# 使用sql语句
df.createTempView('stu')df3 = ss.sql('select * ,len_FUNC(name) from stu')
df3.show()

2)带有装饰器

# UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()# 自定义字符串长度计算函数
@F.udf(returnType=IntegerType())  # 使用装饰器注册函数,只能在DSL方法中使用,不能在SQL中使用
def len_func(field):"""自定义函数,函数名可以自己指定:param field: 是用来结构处理的字段数据,可以定义多个。根据实际处理的字段数量决定定义多少个接收参数:return: 返回处理后的数据"""if field is None:return 0else:data = len(field)return data# 在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()

装饰器注册

  • 只能在DSL方法中使用,在sql语句中无法使用

UDAF函数

多进一出 主要是聚合

使用pandas中的series实现,可以读取一列数据存储在pandas的seriess中进行数据的聚合

# 读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',',schema='id int,name string,gender string,age int,cls string')df.show()# 自定义udaf函数
# 装饰器注册
@F.pandas_udf(returnType=IntegerType())
# 自定义udaf函数
# fileds:pd.Series 给数据字段指定一个类型
#  -> float 指定返回值类型
# udaf函数注册需要两步,第一步现指定装饰器
def sub(filed:pd.Series) -> int:"""自定义udaf函数,实现累减:param field: 接收的字段列数据  pd.Series声明字段数据的类型,接收一列数据可以使用pandas的series类型:return:"""# field是series,就按照series方式操作n = filed[0] # 取出第一个值作为初始值for i in filed[1::]:n-=ireturn n# regidter方法注册
sub = ss.udf.register('sub',sub)# 使用udaf函数  缺少  PyArrow  pandas中series类型交个spark程序无法识别,spark是有scala实现,scala中没有对应的series类型
# 可以使用 PyArrow框架将series转为scale能识别的数据类型
df2 = df.select(sub('age'))
df2.show()

  • arrow框架 pyarrow
    • Apache Arrow 是一种内存中的列式数据格式,用于Spark中,以在JVM和Python进程之间有效地传输数据。目前这对使用 Pandas/NumPy 数据的 Python 用户最有益,提升传输速度。

    • 在线安装 三台机器安装

      • 进入虚拟环境 conda activate base

      • 在线安装 pip install pyspark[sql] -i Verifying - USTC Mirrors

    • 离线安装 三台机器安装

      • pip install pyarrow-10.0.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl

 

 安装pyarrow

conda activate base
pip install pyspark[sql] -i  https://pypi.mirrors.ustc.edu.cn/simple/

http://www.lryc.cn/news/459803.html

相关文章:

  • 『Mysql进阶』Mysql explain详解(五)
  • 【工具】音视频翻译工具基于Whisper+ChatGPT
  • 学成在线——关于nacos配置优先级的坑
  • Nginx在Windows Server下的启动脚本
  • 【国科大】C++程序设计秋季——五子棋
  • Docker 环境下多节点服务器监控实战:从 Prometheus 到 Grafana 的完整部署指南
  • 【动手学深度学习】6.3 填充与步幅(个人向笔记)
  • 【宝可梦】游戏
  • docker启动的rabbitmq如何启动其SSL功能
  • 易基因: cfMeDIP-seq揭示cfDNA甲基化高效区分原发性和转移性前列腺|Nat Commun
  • CMake 教程跟做与翻译 4
  • MySQL面试题分享
  • vue路由缓存问题
  • RabbitMQ中如何解决消息堆积问题,如何保证消息有序性
  • python爬虫案例——selenium爬取淘宝商品信息,实现翻页抓取(14)
  • 在VSCode中使用Excalidraw
  • 25中国投资中投笔试测评秋招校招SHL笔试题型分享
  • 【LeetCode热题100】分治-快排
  • Docker 教程四 (Docker 镜像加速)
  • 各类排序详解
  • 【c语言——指针详解(4)】
  • C# (.net6)实现Redis发布和订阅简单案例
  • 【golang】gorm 使用map实现in 条件查询用法
  • 理论篇| 移动端爬虫
  • systemd实现seatunnel自动化启停
  • MySQL-08.DDL-表结构操作-创建-案例
  • 完成Sentinel-Dashboard控制台数据的持久化-同步到Nacos
  • RocketMq详解:三、RocketMq通用生产和消费方法改造
  • 基于SpringBoot+Vue+Uniapp的仓库点单小程序的详细设计和实现
  • R语言从多波段tif数据中逐个提取单波段数据