当前位置: 首页 > news >正文

Spark数据源的读取与写入、自定义函数

1. 数据源的读取与写入

1.1 数据读取

  • 读文件
    • read.json
    • read.csv
      • csv文件由两个部分组成:头部数据(也就是字段数据)、行数据。
    • read.orc
  • 读数据库
    • read.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’})
      数据库创建测试数据:
create database itcast charset=utf8;create table itcast.tb_user(id int,name varchar(20),age int,gender varchar(20)
);insert into  itcast.tb_user values (1,'张三',20,'男');

表查看:
在这里插入图片描述
读取数据库数据:

# 读取数据源,将数据转为DF
from pyspark.sql import SparkSessionss = SparkSession.builder.getOrCreate()# read读取数据库数据
# 使用jdbc方法通过jdbc读取数据库数据,在读取数据库之前,需要现将数据库连接驱动放入spark的jars目录下
#
df = ss.read.jdbc('jdbc:mysql://192.168.88.100:3306/itcast',table='tb_user',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})
df.show()

运行结果:
在这里插入图片描述

1.2 数据写入

因为数据是在df中存储,所以使用DataFrame进行数据写入

使用DataFrame的的write方法


写入文件有个模式,覆盖和追加两种方式,用mode参数指定
覆盖 overwrite
追加 append

  • 写入文件
    • write.json
    • write.csv
    • write.orc
  • 写入数据库
    • write.jdbc(jdbc连接地址,table=‘表名’,properties={‘user’=用户名,‘password’=密码,‘driver’=‘驱动信息’},mode=‘写入方式’)
# 数据写入
from pyspark.sql import SparkSession,Row
ss = SparkSession.builder.getOrCreate()df = ss.createDataFrame([Row(id=1,name='张三',age=20),Row(id=2,name='李四',age=20),Row(id=3,name='王五',age=20)],schema='id int,name string,age int'
)# 将df数据写入hdfs文件中  mode='overwrite' 覆盖写入   append 追加写入
df.write.json('hdfs://node1:8020/data_json',mode='overwrite')# 写入数据库
# create table itcast.tb_stu(
#     id int,
#     name varchar(20),
#     age int
# );
# 在jdbc连接中指定编码字符集为utf-8
df.write.jdbc('jdbc:mysql://192.168.88.100:3306/itcast?characterEncoding=utf8',table='tb_stu',mode='overwrite',properties={'user':'root','password':'123456','driver':'com.mysql.jdbc.Driver'})

运行结果:
在这里插入图片描述

2. 自定义函数

在这里插入图片描述

2.1 函数分类

  • udf
    • 自定义
    • 一进一出
  • udaf
    • 聚合
    • 自定义
    • 多进一出
  • udtf
    • 爆炸
    • 一进多出

2.2 UDF函数

对每一行数据依次进行计算,返回每一行的结果。

#UDF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *ss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv',header=True,sep=',')df.show()#自定义字符串长度计算函数
def len_func(field):if field is None:return 0else:data = len(field)return data
#将自定义的函数注册到spark中使用
len_func = ss.udf.register('len_func', len_func,returnType=IntegerType())#在spark中使用
df2 = df.select('id','name','gender',len_func('name'))
df2.show()#sql语句中使用
df.createTempView('stu')
df3= ss.sql('select *,len_func(name) from stu')
df3.show()

2.3 UDAF函数

多进一出 主要是聚合
使用pandas中的series实现,可以读取一列数据存储在pandas的series中进行数据的聚合。

#UDAF函数
from pyspark.sql import SparkSession,functions as F
from pyspark.sql.types import *
import pandas as pdss = SparkSession.builder.getOrCreate()#读取文件数据转为df
df = ss.read.csv('hdfs://node1:8020/data/students.csv', header=True,sep=',',schema = 'id int,name string,age int,gender string,cls string')df.show()#对某个字段的整列数据进行计算,自定义udaf函数
# 第一步,装饰器注册
@F.pandas_udf(returnType=IntegerType())
def sub(field:pd.Series) -> int:n=field[0] #取出第一个值作为初始值for i in field[1::]:n-=ireturn n
#第二步,register方法注册
sub = ss.udf.register('sub', sub)df2 = df.select(sub('age'))
df2.show()
http://www.lryc.cn/news/465961.html

相关文章:

  • LeetCode 每日一题 2024/10/14-2024/10/20
  • 接口测试(六)jmeter——参数化(配置元件 --> 用户定义的变量)
  • 【学习笔记】网络流
  • 【鸡翅Club】项目启动
  • python+大数据+基于热门视频的数据分析研究【内含源码+文档+部署教程】
  • 【电子电力】基于PMU相量测量单元的电力系统状态评估
  • ubuntu修改默认开机模式(图形/终端)
  • LaMI-DETR:基于GPT丰富优化的开放词汇目标检测 | ECCV‘24
  • AI大模型是否有助于攻克重大疾病?
  • 【渗透测试】-红日靶场-获取web服务器权限
  • python 深度学习 项目调试 图像分割 segment-anything
  • 【GO实战课】第六讲:电子商务网站(6):支付和订单处理
  • 专题十三_记忆化搜索_算法专题详细总结
  • 已发布金融国家标准目录(截止2024年3月)
  • 【论文#快速算法】Fast Intermode Decision in H.264/AVC Video Coding
  • Git核心概念图例与最常用内容操作(reset、diff、restore、stash、reflog、cherry-pick)
  • 【人工智能在医疗企业个人中的应用】
  • IPv4头部和IPv6头部
  • 从零开始手把手带你训练LLM保姆级教程,草履虫都能学会!零基础看完这篇就足够了~
  • strcat函数追加字符串
  • 每月洞察:App Store 和 Google Play 的主要更新
  • 【python openai function2json小工具】
  • super()和super().__init__()的解释
  • 【C++】—— 多态(下)
  • idea 2023 配置 web service
  • MYSQL数据库SQL+DQL
  • Java中的异常Throwable
  • Day4顺序表c++代码实现
  • 将图片转换成base64格式
  • 征服ES(ElasticSearch)的慢查询实战