当前位置: 首页 > news >正文

Spark_SQL-DataFrame数据写出以及读写数据库(以MySQl为例)

                  一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

二、写出MySQL数据库


一、数据写出

        (1)SparkSQL统一API写出DataFrame数据

        统一API写法:

       常见源写出:

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':spark = SparkSession.builder.\appName('write').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 1.读取文件schema = StructType().add('user_id', StringType(), nullable=True).\add('movie_id', IntegerType(), nullable=True).\add('rank', IntegerType(), nullable=True).\add('ts', StringType(), nullable=True)df = spark.read.format('csv').\option('sep', '\t').\option('header', False).\option('encoding', 'utf-8').\schema(schema=schema).\load('../input/u.data')# write text 写出,只能写出一个列的数据,需要将df转换为单列dfdf.select(F.concat_ws('---', 'user_id', 'movie_id', 'rank', 'ts')).\write.\mode('overwrite').\format('text').\save('../output/sql/text')# write csvdf.write.mode('overwrite').\format('csv').\option('sep',';').\option('header', True).\save('../output/sql/csv')# write jsondf.write.mode('overwrite').\format('json').\save('../output/sql/json')# write parquetdf.write.mode('overwrite').\format('parquet').\save('../output/sql/parquet')

二、写出MySQL数据库

        API写法:

        注意:

        ①jdbc连接字符串中,建议使用useSSL=false 确保连接可以正常连接( 不使用SSL安全协议进行连接)

        ②jdbc连接字符串中,建议使用useUnicode=true 来确保传输中不出现乱码

        ③save()不要填参数,没有路径,是写出数据库

        ④dbtable属性:指定写出的表名

# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ == '__main__':spark = SparkSession.builder.\appName('write').\master('local[*]').\getOrCreate()sc = spark.sparkContext# 1.读取文件schema = StructType().add('user_id', StringType(), nullable=True).\add('movie_id', IntegerType(), nullable=True).\add('rank', IntegerType(), nullable=True).\add('ts', StringType(), nullable=True)df = spark.read.format('csv').\option('sep', '\t').\option('header', False).\option('encoding', 'utf-8').\schema(schema=schema).\load('../input/u.data')# 2.写出df到MySQL数据库df.write.mode('overwrite').\format('jdbc').\option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8').\option('dbtable', 'movie_data').\option('user', 'root').\option('password', '123456').\save()# 读取   df2 = spark.read.format('jdbc'). \option('url', 'jdbc:mysql://pyspark01:3306/bigdata?useSSL=false&useUnicode=true&serverTimezone=GMT%2B8'). \option('dbtable', 'movie_data'). \option('user', 'root'). \option('password', '123456'). \load()# 查看读取结果df2.printSchema()df2.show()'''JDBC写出,会自动创建表的因为DataFrame中的有表结构信息,StructType记录的 各个字段的名称 类型 和是否运行为空'''

        保存结果:

        读取结果:

http://www.lryc.cn/news/207101.html

相关文章:

  • Linux进程终止
  • 0036【Edabit ★☆☆☆☆☆】【让我加油】Let‘s Fuel Up!
  • React 中常用的几种路由跳转方式
  • C++内存管理:其七、标准库中的allocator
  • 【机器学习合集】人脸表情分类任务Pytorch实现TensorBoardX的使用 ->(个人学习记录笔记)
  • Maven - 国内 Maven 镜像仓库(加速包,冲冲冲~)
  • 【Solidity】智能合约案例——③版权保护合约
  • Cisco IOS XE Web UI 命令执行漏洞
  • qwen大模型,推理速度慢,单卡/双卡速度慢,flash-attention安装,解决方案
  • 3.SpringSecurity基于数据库的认证与授权
  • 【软件测试】自动化测试selenium
  • ​​​​​​​如何解决Google play开发者新注册账号,身份验证的地址证明问题?
  • Gin vs Beego: Golang的Web框架之争
  • javascript IP地址正则表达式
  • 【Bash】记录一个长命令换行的BUG
  • 【.net core】yisha框架imageupload组件多图上传修改
  • vscode markdown 使用技巧 -- 如何快速打出一个Tab 或多个空格
  • I/O 模型学习笔记【全面理解BIO/NIO/AIO】
  • 【Python学习笔记】字符编码
  • 华为昇腾NPU卡 大模型LLM ChatGLM2模型推理使用
  • Git 拉取远程更新报错
  • 腾讯云国际站服务器端口开放失败怎么办?
  • 一句话解释什么是出口IP
  • 深入理解强化学习——强化学习的历史:试错学习
  • 分享一个用HTML、CSS和jQuery构建的漂亮的登录注册界面
  • Java学习 习题 1.
  • 第六节——Vue中的事件
  • 设置GridView单选
  • [Python从零到壹] 七十二.图像识别及经典案例篇之OpenGL入门及绘制基本图形和3D图
  • 论文-分布式-并发控制-Lamport逻辑时钟