当前位置: 首页 > news >正文

【大数据学习 | Spark-SQL】SparkSQL读写数据

我们使用sparksql进行编程,编程的过程我们需要创建dataframe对象,这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。

但是sparksql给大家提供了多种便捷读取数据的方式。

//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json

write写出存储数据的时候也是文件夹的,而且文件夹不能存在。

  • csv是一个介于文本和excel之间的一种格式,如果是文本打开用逗号分隔的。
  • text文本普通文本,但是这个文本必须只能保存一列内容。

以上两个文本都是只有内容的,没有列的。

  • json是一种字符串结构,本质就是字符串,但是存在kv,例子 {"name":"zhangsan","age":20}

多平台解析方便,带有格式信息。

  • orc格式一个列式存储格式,hive专有的。
  • parquet列式存储,顶级项目

以上都是列式存储问题,优点(1.列式存储,检索效率高,防止冗余查询 2.带有汇总信息,查询特别快 3.带有轻量级索引,可以跳过大部分数据进行检索),他们都是二进制文件,带有格式信息。

jdbc 方式,它是一种协议,只要符合jdbc规范的服务都可以连接,mysql,oracle,hive,sparksql

整体代码:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}import java.util.Propertiesobject TestMovieWithSql {def main(args: Array[String]): Unit = {//??movie???//1.id  middle=name  last=typeval conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions","20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._//deal dataval df = sc.textFile("data/movies.txt").flatMap(t => {val strs = t.split(",")val mid = strs(0)val types = strs.reverse.headval name = strs.tail.reverse.tail.reverse.mkString(" ")types.split("\\|").map((mid, name, _))}).toDF("mid", "mname", "type")df.limit(1).show()val df1 = sc.textFile("data/ratings.txt").map(t=>{val strs = t.split(",")(strs(0),strs(1),strs(2).toDouble)}).toDF("userid","mid","score")df1.limit(1).show()import org.apache.spark.sql.functions._val df11 = df.join(df1, "mid").groupBy("userid", "type").agg(count("userid").as("cnt")).withColumn("rn", row_number().over(Window.partitionBy("userid").orderBy($"cnt".desc))).where("rn = 1").select("userid", "type")val df22 = df.join(df1, "mid").groupBy("type", "mname").agg(avg("score").as("avg")).withColumn("rn", row_number().over(Window.partitionBy("type").orderBy($"avg".desc))).where("rn<4").select("type", "mname")val df33 = df11.join(df22, "type")//spark3.1.2?? spark2.x//    df33.write.csv()df33.write.format("csv").save("data/csv")//    df33.write.
//      csv("data/csv")
//    df33.write.json("data/json")//    df33.write.parquet("data/parquet")
//    df33.write.orc("data/orc")
//    val pro = new Properties()
//    pro.put("user","root")
//    pro.put("password","hainiu")
//    df33.write.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro)}
}

为了简化存储的计算方式:

package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object TestSink {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("test sink")conf.setMaster("local[*]")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)import sqlSc.implicits._import org.apache.spark.sql.functions._val df = sc.textFile("data/a.txt").map(t=>{val strs = t.split(" ")(strs(0),strs(1),strs(2),strs(3))}).toDF("id","name","age","gender").withColumn("all",concat_ws(" ",$"id",$"name",$"age",$"gender")).select("all")
//    df.write.csv("data/csv")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2")
//      .save("data/csv")
//    df.write.parquet("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2")
//      .save("data/parquet")
//    df.write.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2")
//      .save("data/json")df.write.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").save("data/text")}
}

读取数据代码:

package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContextimport java.util.Propertiesobject TestReadData {def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("movie")conf.setMaster("local[*]")conf.set("spark.shuffle.partitions", "20")val sc = new SparkContext(conf)val sqlSc = new SQLContext(sc)
//    sqlSc.read.text("data/text").show()
//    sqlSc.read.csv("data/csv").show()
//  
//    sqlSc.read.parquet("data/parquet").show()
//    sqlSc.read.json("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2").load("data/text").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2").load("data/csv").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2").load("data/json").show()sqlSc.read.format("org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2").load("data/parquet").show()sqlSc.read.orc("data/orc").show()val pro = new Properties()pro.put("user","root")pro.put("password","hainiu")sqlSc.read.jdbc("jdbc:mysql://11.99.173.24:3306/hainiu","movie",pro).show()}
}
http://www.lryc.cn/news/492606.html

相关文章:

  • AI赋能公共服务转型升级 | 第十届中国行业互联网大会暨腾讯云TVP行业大使三周年庆典公共服务专场圆满举办!
  • 关于按天切割Tomcat的catalina.out日志文件的配置
  • 【人工智能】深入解析GPT、BERT与Transformer模型|从原理到应用的完整教程
  • 彻底理解如何保证ElasticSearch和数据库数据一致性问题
  • 2024-2025热门留学趋势
  • 寻找视频特效素材的优质网站推荐 轻松提升作品魅力
  • 【英特尔IA-32架构软件开发者开发手册第3卷:系统编程指南】2001年版翻译,2-36
  • 信息安全实验--密码学实验工具:CrypTool
  • python的class 类创建、方法调用以及属性赋值
  • Angular v19 (二):响应式当红实现signal的详细介绍:它擅长做什么、不能做什么?以及与vue、svelte、react等框架的响应式实现对比
  • IMX 平台UART驱动情景分析:write篇--从 TTY 层到硬件驱动的写操作流程解析
  • 网络安全拟态防御技术
  • 灵活开源低代码平台——Microi吾码(一)
  • frida_hook_libart(简单解释)
  • 计算机网络八股整理(二)
  • 强化学习off-policy进化之路(PPO->DPO->KTO->ODPO->ORPO->simPO)
  • Linux 如何创建逻辑卷并使用
  • java实现将图片插入word文档
  • 初识java(3)
  • coqui-ai TTS 初步使用
  • matlab代码--卷积神经网络的手写数字识别
  • Scala—Map用法详解
  • 极狐GitLab 17.6 正式发布几十项与 DevSecOps 相关的功能【六】
  • ES6 、ESNext 规范、编译工具babel
  • DeepSpeed 配置文件(DeepSpeed Configuration Files)详解:中英文解释
  • 前端JavaScript(一)---基本介绍
  • 文本处理之sed
  • uniapp在App端定义全局弹窗,当打开关闭弹窗会触发onShow、onHide生命周期怎么解决?
  • 计算机网络 实验七 NAT配置实验
  • 数据结构——排序算法第二幕(交换排序:冒泡排序、快速排序(三种版本) 归并排序:归并排序(分治))超详细!!!!