当前位置: 首页 > news >正文

如何使用SparkSql

一、SparkSql的前世今生

Hive->Shark->Spark SQL

二、SparkSql依赖

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.1.2</version>

</dependency>

三、SparkSql DataFrame

DataFrame,以列的形式组织的,分布式的数据集合。它其实和关系型数据库中的表非常类似,但是底层做了很多的优化。DataFrame可以通过很多来源进行构建,包括:结构化的数据文件,Hive中的表,外部的关系型数据库,以及RDD。

四、创建SQLContext

val sqlContext = SparkSession.builder().appName("RDD2DataFrameReflection").master("local").getOrCreate;

五、SparkSql创建DataFrame

val properties = new Properties();

properties.setProperty("user", "root");

properties.setProperty("password", "root");

val testDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/test", "user", properties);

val testDF =sqlContext.read

.format("jdbc")

.option("url", "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8")

.option("user", "root")

.option("password", "root")

.option("dbtable","user")

.option("dbtable","(select id from user where id < 50) as t") // sql语句必须是子查询

.load()

六、SparkSql DataFrame的操作

// 查询

testDF.show()

testDF.printSchema()

testDF.select("username").show()

testDF.select(testDF("username"), testDF("id") + 1).show()

testDF.filter(testDF("id") % 2 === 0).show()

testDF.groupBy("sex").count().show

七、RDD转换为DataFrame

1、使用反射来推断包含了特定数据类型的RDD的元数据

case class Student(id: Int, name: String, age: Int) // 定义模式类,属性名对应数据表的字段名

// RDD.toDF()转换为DataFrame

val studentDF = sc.textFile("data/students.txt", 1)

.map { line => line.split(",") }

.map { arr => Student(arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }

.toDF()

//.map { arr => (arr(0).trim().toInt, arr(1).trim(), arr(2).trim().toInt) }

//.toDF("id", "name", "age")

val properties = new Properties()

properties.setProperty("user", "root")

properties.setProperty("password", "root")

studentDF.write.mode(SaveMode.Append).jdbc("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8","student",properties);

2、编程方式使用Row将RDD转换为DataFrame

// 第一步,构造Row数据

val studentRDD = sc.textFile("data/students.txt", 1)

.map { line => Row(line.split(",")(0).toInt, line.split(",")(1), line.split(",")(2).toInt) }

// 第二步,构造元数据

val structType = StructType(Array(

StructField("id", IntegerType, true),

StructField("name", StringType, true),

StructField("age", IntegerType, true)))

// 第三步,转换RDD为DataFrame

val studentDF = sqlContext.createDataFrame(studentRDD, structType)

八、读入每个数据源,生成一个临时视图,通过一个sql去操作这些视图

val studentScoreDF = sqlContext.read.json("data/student_score.json");

studentScoreDF.createOrReplaceTempView("student_score");

val studentInfoDF = sqlContext.read.json("data/student_info.json")

studentInfoDF.createOrReplaceTempView("student_info")

val goodStudentInfoDF = sqlContext.sql("select s.name, s.score, i.gender from student_score s, student_info i where s.score>80 and s.name=i.name");

goodStudentInfoDF.rdd.collect().foreach(row => println(row(0) + " " + row(1) + " " + row(2)))

goodStudentInfoDF.write.json("data/result/goodstudents")

九、SparkSql UDF用户自定义函数

UDAF:User Defined Aggregate Function。用户自定义聚合函数

http://www.lryc.cn/news/517992.html

相关文章:

  • YOLOv8实战人员跌倒检测
  • QT-TCP-server
  • 【STM32+QT项目】基于STM32与QT的智慧粮仓环境监测与管理系统设计(完整工程资料源码)
  • robot 仿真环境安装测试 [持续更新]
  • 【FlutterDart】 拖动边界线改变列宽类似 vscode 那种拖动改变编辑框窗口大小(11 /100)
  • R语言的循环实现
  • Web应用安全-漏洞扫描器设计与实现
  • 视频生成Sora的全面解析:从AI绘画、ViT到ViViT、TECO、DiT、VDT、NaViT等
  • 【已解决】如何让容器内的应用程序使用代理?
  • DC/AC并网逆变器模型与仿真MATLAB
  • P10424 [蓝桥杯 2024 省 B] 好数
  • 【Word_笔记】Word的修订模式内容改为颜色标记
  • oracle位运算、左移右移、标签算法等
  • spring boot学习第二十三篇:Spring Boot集成RocketMQ
  • 去掉el-table中自带的边框线
  • C语言gdb调试
  • Spring项目创建流程及配置文件bean标签参数简介
  • reactor中的并发
  • 太速科技-418-基于AD9361 +ZYNQ7020 的软件无线电 SDR 套件
  • 监控易:一体化智能运维的扩展性优势深度解析
  • 朴素贝叶斯算法:从生活到数学的完整解析
  • Echarts的认识和基本用法
  • Linux文件系统的安全保障---Overlayroot!
  • 【Linux 之一 】Linux常用命令汇总
  • 【线性代数】通俗理解特征向量与特征值
  • Unity 热更新基础知识
  • 安全基础-互联网技术基础
  • 深度学习从入门到实战——卷积神经网络原理解析及其应用
  • React快速上手到项目实战总篇
  • HTMLHTML5革命:构建现代网页的终极指南 - 0. 课程目录设计