当前位置: 首页 > news >正文

Pyspark读写csv,txt,json,xlsx,xml,avro等文件

1. Spark读写txt文件

读:

df = spark.read.text("/home/test/testTxt.txt").show()
+-------------+
|        value|
+-------------+
|      a,b,c,d|
|123,345,789,5|
|34,45,90,9878|
+-------------+

2. Spark读写csv文件

读:

# 文件在hdfs上的位置
file_path = r"/user/lanyue/data.csv"
# 方法一
# 推荐这种,指定什么文件格式都可以,只需要修改参数format即可
# 不同的格式其load函数会有不同,用的时候请自行搜索。
df = spark.read.format("csv").load(file_path, header=True, inferSchema=True, encoding="utf-8", sep=',') 
# sep=',',表示指定分隔符为逗号,同参数delimiter。
# header=TRUE,表示数据的第一行为列名
# inferSchema,表示是否对字段类型进行推测。=False,默认读取后都按照文本字符处理。=True表示自动推断schema。# 或者下面这种形式。这两种形式都可以
df = spark.read.format("csv").option("encoding","utf-8").option("header",True).load(file_path, schema=schema)  # 使用指定的schema# 方法二
df = spark.read.csv(file_path, encoding='utf-8', header=True, inferSchema=True) 
df = spark.read.csv(file_path, encoding='utf-8', header=True, schema=schema) 
# 如果想指定文件格式是json,那就是spark.read.json,其他类似

写:

# 保存在【hdfs上】,以csv文件的格式。指定什么文件格式都可以,只需要修改参数format即可
df.repartition(1).write.mode('append').format("csv").option("encoding","utf-8").option("header",True).save("/lanyue/data.csv") 
# mode,保存模式:ovewriter重写、append文件末尾追加、error如果文件存在抛出异常、ignore如果文件存在忽略不更新
# repartition, 在yarn模式下,Spark会根据hdfs文件的块数据大小来划分默认的分区数目,但是我们也可以自己设置分区数目,使用参数repartition。=1表示只保存成一个数据块# 或者
df.write.csv("/lanyue/data.csv", sep="\t", encoding="utf-8", mode='overwrite') 
# 如果想指定文件格式是json,那就是df.write.json,其他类似
# 通过指定参数sep,来指定分隔符,可以是",", "\t","\x01"等。同参数delimiter。

3. Spark读写parquet文件

读:

file = "/user/muzili/data.parquet"
spark_df=spark.read.parquet(file)
df.show()

写:

spark_df.write.parquet(path=file,mode='overwrite')

4. Spark读写json文件

读:

file = "/user/muzili/data.json"
df = spark.read.json(file)
df.show()

写:

df.repartition(1).write.mode('append').format("json").option("encoding","utf-8").option("header",True).save("/user/muzili/data.json")

5. Spark读写excel文件

读:

写:

6. Spark读写xml文件

读:

写:

7. Spark读写orc文件

读:

写:

8. Spark读写avro文件

读:

写:

9. Spark读写mysql中的表

读:

url="jdbc:mysql://host:port/database"
table="table_name"
driver="com.mysql.jdbc.Driver"
user="XXX"
password="XXX"df = spark.read.format("jdbc").option("url",url) # database地址,格式为jdbc:mysql://主机:端口/数据库.option("dbtable",table) # 表名.option("user",user).option("password",password).option("driver",driver).load()# 或者以下形式
df = spark.read.format('jdbc').options(url="jdbc:mysql://host:port/database", # database地址driver="com.mysql.jdbc.Driver",dbtable="table_name", user="XXX",password="XXX").load()# 或者以下形式
# mysql的相关配置
prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.jdbc.Driver'}
url = 'jdbc:mysql://host:port/database' # database地址
df = spark.read.jdbc(url=url, table='mysql_table_name', properties=prop)

写:

# 会自动对齐字段,也就是说,spark_df 的列不一定要全部包含MySQL的表的全部列才行
prop = {'user': 'xxx', 'password': 'xxx', 'driver': 'com.mysql.jdbc.Driver'}
url = 'jdbc:mysql://host:port/database' # database地址
df.write.jdbc(url=url, table='table_name', mode='append', properties=prop)
# append 追加方式# 或者以下形式
df.write.format("jdbc").option("url","jdbc:mysql://host:port/database") # database地址.option("dbtable","table_name").option("user",user).option("password",password).option("driver",driver).option("batchsize","1000").mode("overwrite") # overwrite 清空表再导入.save()

http://www.lryc.cn/news/177220.html

相关文章:

  • LeetCode 接雨水 双指针
  • 【Linux】【网络】传输层协议:UDP
  • 数字音频工作站FL Studio 21中文版下载及电音编曲要用乐理吗 电音编曲步骤
  • 金蝶云星空与旺店通·企业奇门对接集成其他出库查询打通创建其他出库单
  • Visual Studio 如何删除多余的空行,仅保留一行空行
  • java spring cloud 企业电子招标采购系统源码:营造全面规范安全的电子招投标环境,促进招投标市场健康可持续发展
  • 112. 路径总和
  • 国货疯抢流量,B站接连爆发800万播放实现破圈
  • (高阶) Redis 7 第14讲 数据统计分析 实战篇
  • SpringCloud nacos1.x.x版本升级到2.2.3版本并开启鉴权踩坑
  • 软件测试/测试开发丨探索AI与测试报告的完美结合,提升工作效率
  • Ubuntu 设置开机自动执行脚本
  • 【笔记】Splay
  • opencv英文识别tesseract-orc安装
  • JNA封装C/C++动态库在flink内使用记录
  • Android gradle dependency tree change(依赖树变化)监控实现
  • 5个流程图模板网站,帮你轻松绘制专业流程图
  • 【AI视野·今日Robot 机器人论文速览 第四十二期】Wed, 27 Sep 2023
  • 后端面试关键问题大总结
  • uni-app:实现图片周围的图片按照圆进行展示
  • Django之视图
  • 【软件工程_设计模式】——为什么要使用设计模式?
  • 大数据之Kafka
  • 灵活运用OSI模型提升排错能力
  • 【最新!企知道AES加密分析】使用Python实现完整解密算法
  • 前端架构师之11_JavaScript事件
  • 文本过滤工具:grep
  • 【Linux】生产者和消费者模型
  • 开发APP的费用是多少
  • start()方法源码分析