当前位置: 首页 > news >正文

详解 Spark SQL 代码开发之数据读取和保存

一、通用操作

/**
基本语法:1.读取:SparkSession.read[.format("format")[.option("...")]].load("path")2.保存:DataFrame.write[.format("format")[.option("...")]][.mode("SaveMode")].save("path")说明:1.默认读取和保存的文件格式为 parquet2."format"包含:"csv"、"jdbc"、"json"、"orc"、"parquet" 和 "textFile"3.option("…") 是在 "jdbc" 格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable4."SaveMode" 指定保存模式,SaveMode 是一个枚举类,其中的常量包括:4.1 "error":默认值,如果文件已经存在则抛出异常4.2 "append":如果文件已经存在则追加4.3 "overwrite":如果文件已经存在则覆盖4.4 "ignore":如果文件已经存在则忽略
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._// val df = spark.read.load("data/user.json") // errorval df = spark.read.format("json").load("data/user.json")df.show()// 直接查询文件:文件格式.`文件路径`spark.sql("select * from json.`data/user.json`").show()// df.write.save("output") // 默认保存为 parquet 格式df.write.format("json").save("output")// df.write.format("json").mode("overwrite").save("output") // 覆盖保存// 关闭环境spark.close()}}

二、parquet

/**SparkSQL默认的读取保存数据源为 Parquet 格式Parquet 是一种能够有效存储嵌套数据的列式存储格式基本语法:1.读取:1.1 SparkSession.read.load("path") 1.2 SparkSession.read.parquet("path")2.保存:2.1 DataFrame.write[.mode("SaveMode")].save("path") 2.2 DataFrame.write[.mode("SaveMode")].parquet("path")
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.load("data/user.parquet")val df1 = spark.read.parquet("data/user.parquet")df.show()df.write.save("output") df1.write.parquet("output1") // 关闭环境spark.close()}}

三、json

/**基本语法:1.读取:SparkSession.read.json("path") 2.保存:DataFrame.write[.mode("SaveMode")].json("path") 注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.json("data/user.json")df.show()df.write.json("output") // 关闭环境spark.close()}}

四、csv

/**基本语法:1.读取:1.1 SparkSession.read.format("csv")[.option(...)].load("path") 1.2 SparkSession.read.csv("path") 2.保存:2.1 DataFrame.write.format("csv")[.mode("SaveMode")].save("path") 2.2 DataFrame.write[.mode("SaveMode")].csv("path") 说明:csv 是默认以逗号为分隔符的文件格式
*/
object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._val df = spark.read.format("csv") // 指定读取文件格式.option("sep", ";") // 指定分隔符.option("inferSchema", "true") .option("header", "true") // 指定第一行是否为表头.load("data/user.csv")df.show()df.write.format("csv").save("output") // 关闭环境spark.close()}}

五、mysql

  • 导入 mysql 依赖

    <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
    </dependency>
    
  • 读取和保存数据

    /**
    基本语法:1.读取:1.1 SparkSession.read.format("jdbc").option("url", "..")…….load() 1.2 SparkSession.read.jdbc("url", "table", prop: Properties)2.保存:2.1 DataFrame.write.format("jdbc").option("url", "..")……[.mode("SaveMode")].save()2.2 DataFrame.write[.mode("SaveMode")].jdbc("url", "table", prop: Properties)*/
    object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 创建 sparksql 环境对象val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")val spark = SparkSession.builder().config(conf).getOrCreate()// 引入环境对象中的隐式转换import spark.implicits._// 1. 读取// 1.1 方式一:val df = spark.read.format("jdbc") // 指定读取文件格式.option("url", "jdbc:mysql://linux:3306/spark") // 连接url.option("driver", "com.mysql.jdbc.Driver") // 驱动.option("user", "root") // 用户名.option("password", "123123") // 密码.option("dbtable", "user") // 表名.load()// 1.2 方式二:val df1 = spark.read.format("jdbc").options(Map("url" -> "jdbc:mysql://linux:3306/spark?user=root&password=123123","dbtable" -> "user","driver" -> "com.mysql.jdbc.Driver")).load()// 1.3 方式三:val props: Properties = new Properties()props.setProperty("user", "root")props.setProperty("password", "123123")val df2 = spark.read.jdbc("jdbc:mysql://linux:3306/spark", "user", props)df.show()// 2. 保存// 2.1 方式一df.write.format("jdbc") // 指定读取文件格式.option("url", "jdbc:mysql://linux:3306/spark") // 连接url.option("driver", "com.mysql.jdbc.Driver") // 驱动.option("user", "root") // 用户名.option("password", "123123") // 密码.option("dbtable", "user1") // 表名.mode(SaveMode.Append).save() // 2.2 方式二df2.write.mode("append").jdbc("jdbc:mysql://linux:3306/spark", "user2", props)// 关闭环境spark.close()}}
    

六、hive

1. 内置 hive

Spark 在安装编译后内部已经可以支持 Hive 表访问、 UDF (用户自定义函数) 以及 Hive 查询语言(HiveQL/HQL) 等

// 内置 Hive 的元数据存储在 derby 中,默认仓库地址为 $SPARK_HOME/spark-warehouse
// 进入 spark-shell// 1. 创建 hive 表
spark.sql("create table user(username string, age bigint)")// 2. 加载数据到表中
spark.sql("load data local inpath 'data/user.txt' into table user")// 3. 展示所有表
spark.sql("show tables").show// 4. 查询表数据
spark.sql("select * from user").show

2. 外部 hive

  • 配置连接外部 Hive

    • 将外部 Hive 的安装目录下的 hive-site.xml 配置文件拷贝到 Spark 安装目录的 conf 目录下

    • 将 Mysql 连接的驱动 jar 包拷贝到 Spark 安装目录的 jars 目录下(外部 Hive 的元数据库使用 MySQL)

    • 如果访问不到 hdfs,则需要把 core-site.xmlhdfs-site.xml 两个配置文件拷贝到 Spark 安装目录的 conf 目录下

    • 启动 spark-shell,执行 spark.sql("show tables").show 检查是否可以连接外部 Hive

  • 程序代码操作外部 Hive

    • 引入依赖

      <dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.0.0</version>
      </dependency>
      <dependency><groupId>org.apache.hive</groupId><artifactId>hive-exec</artifactId><version>1.2.1</version>
      </dependency>
      <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
      </dependency>
      
    • hive-site.xml 文件拷贝到项目的 resources 目录中,同时确保 target/classes 目录下也有该文件

    • 编码

      object TestSparkSqlRead {def main(args: Array[String]): Unit = {// 若出现用户无权限的错误,可在首行添加// System.setProperty("HADOOP_USER_NAME", "root")// 创建 sparksql 环境对象,并开启 Hive 支持val conf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")// 配置修改数据库仓库的地址// conf.set("spark.sql.warehouse.dir", "hdfs://linux:8020/user/hive/warehouse")val spark = SparkSession.builder().enableHiveSupport() // 启用 hive 支持.config(conf).getOrCreate()// 使用 sparksql 操作 hivespark.sql("show tables").show()// 关闭环境spark.close()}}
      
  • 其他连接方式

    • spark-sql cli

      #进入 spark-sql CLI
      bin/spark-sql#编写HQL
      show tables;
      
    • spark beeline

      #启动 Thrift Server
      sbin/start-thriftserver.sh#使用 beeline 连接 Thrift Server
      bin/beeline -u jdbc:hive2://linux:10000 -n root#编写HQL
      show tables;
      
http://www.lryc.cn/news/362253.html

相关文章:

  • Pulsar 社区周报 | No.2024-05-30 | BIGO 百页小册《Apache Pulsar 调优指南》
  • 第二证券股票杠杆:4分钟直线涨停!这一赛道,AH股集体爆发!
  • JavaScript 进阶征途:解锁Function奥秘,深掘Object方法精髓
  • 斜拉桥智慧施工数字孪生
  • 【chatGPT API】Function Calling:将自然语言转换为API调用或数据库查询
  • Oracle Hint /*+APPEND*/插入性能总结
  • 正邦科技(day3)
  • mac电脑多协议远程管理软件:Termius 8.4.0激活版下载
  • 网络攻击的常见形式
  • ReactDOM 18版本 使用createRoot 替换render详解
  • 【赠书活动】好书推荐—《详解51种企业应用架构模式》
  • SpringBoot启动时使用外置yml文件
  • 【开源三方库】Fuse.js:强大、轻巧、零依赖的模糊搜索库
  • vue从入门到精通(六):数据代理
  • 【C++修行之道】类和对象(二)类的6个默认成员函数、构造函数、析构函数
  • 【LeetCode热题100总结】239. 滑动窗口最大值
  • 【YOLOv9改进[Conv]】使用YOLOv10的空间通道解耦下采样SCDown模块替换部分CONv的实践 + 含全部代码和详细修改内容
  • 简单小游戏制作
  • Delphi
  • Linux的shell脚本中的比大小
  • 每日复盘-20240603
  • adb server version (22000) doesn‘t match this client (41); killing...
  • 如何使用 Connector API 将数据提取到 Elasticsearch Serverless 中
  • 体育赛事直播系统开发源码搭建
  • 使用Jmeter进行性能测试
  • AI技术的发展,会让你工作轻松吗
  • Spring-DI入门案例
  • ubuntu18.04 报错:fatal error: execution
  • 开源大模型与大模型api的使用优缺点
  • 小红书前端2轮面试期望22K,全程问低代码设计