当前位置: 首页 > article >正文

SparkSQL基本操作

以下是 Spark SQL 的基本操作总结,涵盖数据读取、转换、查询、写入等核心功能:

一、初始化 SparkSession

scala

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder()

  .appName("Spark SQL Demo")

  .master("local[*]") // 本地模式(集群用 `spark://host:port`)

  .getOrCreate()

// 导入隐式转换(用于 DataFrame 与 RDD 互转)

import spark.implicits._

 

二、数据读取

1. 读取文件(CSV/JSON/Parquet等)

scala

// 读取 CSV(带表头)

val csvDF = spark.read

  .option("header", "true")

  .option("inferSchema", "true") // 自动推断数据类型

  .csv("路径/文件.csv")

// 读取 JSON

val jsonDF = spark.read.json("路径/文件.json")

// 读取 Parquet(Spark 原生格式,高效)

val parquetDF = spark.read.parquet("路径/文件.parquet")

 

2. 读取数据库(如 MySQL)

scala

val jdbcDF = spark.read.format("jdbc")

  .option("url", "jdbc:mysql://host:port/db")

  .option("dbtable", "表名")

  .option("user", "用户名")

  .option("password", "密码")

  .load()

 

3. 从 RDD 创建 DataFrame

scala

// 示例:RDD 转 DataFrame(通过 case class 推断 Schema)

case class Person(id: Int, name: String, age: Int)

val peopleRDD = spark.sparkContext.parallelize(Seq(Person(1, "Alice", 25), Person(2, "Bob", 30)))

val peopleDF = peopleRDD.toDF() // 自动使用 case class 字段作为列名

 

三、基本数据操作

1. 查看数据

scala

df.show() // 打印前20行(默认)

df.show(false) // 不截断长字符串

df.printSchema() // 查看表结构

df.describe().show() // 统计摘要(均值、计数等)

 

2. 列操作

scala

// 选择列

df.select("name", "age").show()

// 新增列(表达式计算)

import org.apache.spark.sql.functions._

val dfWithNewColumn = df.withColumn("age_plus_1", col("age") + 1)

// 重命名列

val renamedDF = df.withColumnRenamed("old_name", "new_name")

// 删除列

val filteredDF = df.drop("column_to_drop")

 

3. 行过滤与排序

scala

// 过滤行(where/filter 等价)

df.filter(col("age") > 18).show()

df.where("age > 18 AND name LIKE 'A%'").show()

// 排序(asc/desc)

df.orderBy(col("age").desc, "name").show() // 按年龄降序、姓名升序

 

4. 分组与聚合

scala

import org.apache.spark.sql.functions._

// 分组统计(如计算每个年龄段的人数)

df.groupBy("age")

  .agg(

    count("*").alias("count"), // 计数

    avg("score").alias("avg_score") // 平均值

  ).show()

// 窗口函数(如按年龄分区排序)

import org.apache.spark.sql.window.Window

val windowSpec = Window.partitionBy("age").orderBy(col("score").desc)

df.withColumn("rank", rank().over(windowSpec)).show()

 

四、Spark SQL 查询(SQL 语法)

1. 注册临时视图

scala

df.createOrReplaceTempView("people") // 注册为临时视图(会话级)

 

2. 执行 SQL 查询

scala

val sqlResult = spark.sql("""

  SELECT name, age

  FROM people

  WHERE age > 25

  ORDER BY age DESC

""")

sqlResult.show()

 

3. 全局临时视图(跨会话)

scala

df.createGlobalTempView("global_people") // 全局视图,需用 `global_temp.表名` 访问

spark.sql("SELECT * FROM global_temp.global_people").show()

 

五、数据写入

1. 保存为文件

scala

// 保存为 CSV(覆盖模式)

df.write.mode("overwrite") // 模式:overwrite/append/ignore/replace

  .option("header", "true")

  .csv("路径/输出.csv")

// 保存为 Parquet(压缩高效)

df.write.parquet("路径/输出.parquet")

 

2. 写入数据库(如 MySQL)

scala

df.write.format("jdbc")

  .option("url", "jdbc:mysql://host:port/db")

  .option("dbtable", "表名")

  .option("user", "用户名")

  .option("password", "密码")

  .mode("append") // 追加数据

  .save()

 

3. 保存为 Hive 表

scala

df.write.saveAsTable("hive_table") // 需提前启用 Hive 支持(spark.sql.catalogImplementation = hive)

 

六、数据类型与转换

1. 常用数据类型

- 基础类型: IntegerType 、 StringType 、 DoubleType 、 TimestampType 

- 复杂类型: ArrayType 、 MapType 、 StructType (嵌套结构)

 

2. 类型转换

scala

import org.apache.spark.sql.functions._

// 字符串转整数

val castDF = df.withColumn("age_str", col("age").cast("string"))

// 时间格式转换

val timestampDF = df.withColumn("date", to_date(col("timestamp_col"), "yyyy-MM-dd"))

 

七、性能优化技巧

 

1. 使用 Parquet 格式:列式存储,压缩率高,查询更快。

2. 分区表:按日期/类别分区( partitionBy ),减少数据扫描范围。

3. 缓存数据: df.cache()  避免重复计算(适用于多次查询的数据集)。

4. 广播小表: spark.sql.autoBroadcastJoinThreshold  设置小表广播阈值(默认 10MB)。

 

八、停止 SparkSession

scala

spark.stop() // 释放资源

 

通过以上操作,可实现数据的读取、处理、分析和存储。实际应用中可结合业务需求灵活组合函数,或通过 Spark UI( http://localhost:4040 )监控作业执行情况。

http://www.lryc.cn/news/2379477.html

相关文章:

  • Web 架构之动静分离
  • 20250515配置联想笔记本电脑IdeaPad总是使用独立显卡的步骤
  • sparkSQL读入csv文件写入mysql
  • 大涡模拟实战:从区域尺度到街区尺度的大气环境模拟
  • centos安装方式的aarch64架构下的kylinv10安装docker23.0.0
  • 单目测距和双目测距 bev 3D车道线
  • 鸿蒙OSUniApp 实现一个精致的日历组件#三方框架 #Uniapp
  • 【爬虫】DrissionPage-3
  • Web开发-JavaEE应用SpringBoot栈SnakeYaml反序列化链JARWAR构建打包
  • 项目复习(2)
  • UE 材质基础 第一天
  • 短剧小程序系统开发源码上架,短剧项目市场分析
  • 学习FineBI
  • Oracle日期计算跟Mysql计算日期差距问题-导致两边计算不一致
  • 深入剖析某App视频详情逆向:聚焦sig3参数攻克
  • Java求职面试揭秘:从Spring到微服务的技术挑战
  • 【Linux】Linux安装并配置MongoDB
  • HANA数据库死锁
  • STC32G12K128实战:串口通信
  • Kotlin Multiplatform与Flutter、Compose共存:构建高效跨平台应用的完整指南
  • ElasticSearch深入解析(十二):聚合——分桶聚合、指标聚合、管道子聚合
  • spark小任务
  • Ubuntu 20.04 报错记录: Matplotlib 无法使用 OpenCV 的 libqxcb.so
  • JS 高级程序设计 设计模式
  • 新电脑软件配置二:安装python,git, pycharm
  • 数据仓库:企业数据管理的核心引擎
  • MCU开发学习记录17* - RTC学习与实践(HAL库) - 日历、闹钟、RTC备份寄存器 -STM32CubeMX
  • C++中的四种强制转换
  • YOLOv2目标检测算法:速度与精度的平衡之道
  • Quic如何实现udp可靠传输