当前位置: 首页 > news >正文

SparkSQL---编程模型的操作,数据加载与落地及自定义函数的使用

一、SparkSQL编程模型的创建与转化

1、DataFrame的构建

people.txt数据:
1 zhangsan 20
2 lisi 29
3 wangwu 25
4 zhaoliu 30
5 tianqi 35
6 kobe 40
people.json数据:在SparkSQL—简介及RDD V.S DataFrame V.S Dataset编程模型详解里

1、从Spark数据源进行创建

 	//创建程序入口val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()val sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据val dataFrame = spark.read.format("json").load("F:\\test\\people.json")//展示数据dataFrame.show()

2、从RDD进行转换

	//创建程序入口val spark = SparkSession.builder().appName("dataFrame").master("local[*]").getOrCreate()val sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载文件val file :RDD[String] = sc.textFile("F:\\test\\person.txt")//按照分隔符进行切分val filemap :RDD[Array[String]] = file.map(_.split(" "))//指定数据类型val tran :RDD[(Int,String,Int)] = filemap.map(x=>(x(0).toInt,x(1),x(2).toInt))//带参数的是指定表头名字val dataFrame2=tran.toDF("id","name","age")

3、通过反射创建DataFrame

	case class Person(id:Int,name:String,age:Int)//样例类反射获取列名创建DataFrame//加载文件val file :RDD[String] = sc.textFile("F:\\test\\person.txt")//按照分隔符进行切分val filemap :RDD[Array[String]] = file.map(_.split(" "))//指定数据类型val tran= filemap.map(x1=>Person(x1(0).toInt,x1(1),x1(2).toInt))//将rdd转换为DataFrameval dataFrame1 = tran.toDF()

4、动态编程

    //数据和结构分离加载的方式动态创建dataFrame//加载数据val row:RDD[Row] = sc.parallelize(List(Row(1, "李伟", 1, 180.0),Row(2, "汪松伟", 2, 179.0),Row(3, "常洪浩", 1, 183.0),Row(4, "麻宁娜", 0, 168.0)))//指定schema/*val structType = StructType(List(StructField("id", DataTypes.IntegerType, false),StructField("name", DataTypes.StringType, false),StructField("sex", DataTypes.IntegerType, false),StructField("height", DataTypes.DoubleType, false)))*/val structType = new StructType().add("id","Int").add("name","string").add("sex","Int").add("height","Double")//创建DataFrameval dataFrame3 = spark.createDataFrame(row,structType)//Row:代表的是二维表中的一行记录,或者就是一个Java对象//StructType:是该二维表的元数据信息,是StructField的集合//StructField:是该二维表中某一个字段/列的元数据信息(主要包括,列名,类型,是否可以为null)
2、Dataset的构建
case class Student(id: Int, name: String, sex: Int, age: Int)
object Create_DataSet {def main(args: Array[String]): Unit = {//创建程序入口val spark = SparkSession.builder().appName("dataSet").master("local[*]").getOrCreate()//设置日志级别val sc = spark.sparkContextsc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val list = List(new Student(1, "王盛芃", 1, 19),new Student(2, "李金宝", 1, 49),new Student(3, "张海波", 1, 39),new Student(4, "张文悦", 0, 29))//创建DataSetval ds = spark.createDataset[Student](list)//展示输出ds.show()}

注:在创建Dataset的时候,需要注意数据的格式,必须使用case class,或者基本数据类型,同时需要通过import spark.implicts._来完成数据类型的编码,而抽取出对应的元数据信息,否则编译无法通过。

3、RDD和DataFrame以及DataSet的互相转换
//创建程序入口val spark = SparkSession.builder().appName("transform").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//导包import spark.implicits._//加载数据val file = sc.textFile("F:\\test\\person.txt")//切分val fileMap = file.map(_.split(" "))//指定数据类型val tran = fileMap.map(x=>(x(0).toInt,x(1),x(2).toInt))//----------三者之间的转换--------//rdd=>DFval dataFrame = tran.toDF("id","name","age")//rdd=>DSval dataSet = tran.toDS()//DS=>rdddataSet.rdd//DF=>rdddataFrame.rdd//DF=>DSval ds = dataFrame.as[(Int,String,Int)]//DS=>DFval df = ds.toDF()

二、SparkSQL统一数据加载与落地

1、数据加载
    //创建程序入口val spark = SparkSession.builder().appName("load").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据//第一种方式:spark.read.format(数据文件格式).load(path),默认加载的文件格式为parquetspark.read.format("parquet").load("F:\\test\\parquet").show()spark.read.format("json").load("F:\\test\\json").show()spark.read.format("csv").load("F:\\test\\csv").show()//加载数据库中的表的数据spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydata").option("user","root").option("password","root").option("dbtable","person").load().show()//第二种方式spark.read.parquet("F:\\test\\parquet").show()spark.read.json("F:\\test\\json").show()spark.read.csv("F:\\test\\csv").show()//加载数据库中的表val pro =new Properties()pro.put("user","root")pro.put("password","root")spark.read.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro).show()
2、数据落地
	//创建程序入口val spark = SparkSession.builder().appName("save").master("local[*]").getOrCreate()//调用sparkContextval sc = spark.sparkContext//设置日志级别sc.setLogLevel("WARN")//加载数据val dataFrame = spark.read.json("F:\\test\\people.json")//数据落地//第一种方式,save的默认格式也是parquetdataFrame.write.format("parquet").save("F:\\test\\parquet")dataFrame.write.format("json").save("F:\\test\\json")dataFrame.write.format("csv").save("F:\\test\\csv")*///将数据保存到数据库dataFrame.write.format("jdbc").option("url","jdbc:mysql://localhost:3306/mydata").option("user","root").option("password","root").option("dbtable","person").save()//第二种方式dataFrame.write.parquet("F:\\test\\parquet")dataFrame.write.json("F:\\test\\json")dataFrame.write.csv("F:\\test\\csv")//保存到数据库val pro = new Properties()pro.setProperty("user","root")pro.setProperty("password","root")dataFrame.write.jdbc("jdbc:mysql://localhost:3306/mydata","person",pro)

三、自定义函数的使用

	val spark = SparkSession.builder().appName("UDF").master("local[*]").getOrCreate()val sc = spark.sparkContextsc.setLogLevel("WARN")//案例//加载文件val dataFrame = spark.read.json("file:\\F:\\test\\people.json")//sql查询风格//首先将数据注册为一张表dataFrame.createOrReplaceTempView("people")//赋予函数功能val fun=(x:String)=>{x.toUpperCase()}//注册函数spark.udf.register("upper",fun)//使用sql风格查询spark.sql("select name, upper(name) from people").show()
http://www.lryc.cn/news/410707.html

相关文章:

  • 文件解析漏洞--IIS--Vulhub
  • 你知道缓存的这个问题到底把多少程序员坑惨了吗?
  • 飞创直线模组桁架机械手优势及应用领域
  • TongHttpServer 简介
  • 回溯法---组合总和
  • 将Android Library项目发布到JitPack仓库
  • JAVAWeb实战(后端篇)
  • 【vs】实用调试技巧——学会写优秀的代码!
  • 数组声明方式
  • Docker中Docker网络-理解Docker0与自定义网络的使用示例
  • 领域驱动大型结构之SYSTEM METAPHOR(系统隐喻)
  • web前端开发一、VScode环境搭建
  • DiAD代码use_checkpoint
  • nginx出现Refused to apply inline style because it violates
  • 【中项第三版】系统集成项目管理工程师 | 第 11 章 规划过程组⑥ | 11.15 - 11.17
  • 基础警务互联网app
  • 为了方便写CURD代码,我在UTools写了个插件SqlConvert来生成代码!
  • 在国产芯片上实现YOLOv5/v8图像AI识别-【2.2】RK3588上C++开发环境准备及测试更多内容见视频
  • 2024数据资产入表财务实操手册
  • react.16+
  • 如何实现MySQL对某一张表的binlog日志进行记录
  • 使用requests库进行网络爬虫:入门指南
  • 实验5-1 使用函数计算两点间的距离
  • 免费!OpenAI发布最新模型GPT-4o mini,取代GPT-3.5,GPT-3.5退出历史舞台?
  • 目标检测损失计算部分(YOLO)
  • Swift 方法
  • 哪些牌子充电宝性价比比较高?目前公认比较好用充电宝都在这儿!
  • 计算机网络必会面经
  • 深入理解 Go 数组、切片、字符串
  • STM32下的HAL库U盘数据读写