当前位置: 首页 > news >正文

spark代码

RDD


Tom,DataBase,80

Tom,Algorithm,50

Tom,DataStructure,60

Jim,DataBase,90

Jim,Algorithm,60

Jim,DataStructure,80

该系总共有多少学生;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val par = lines.map(row=>row.split(",")(0)) 
val distinct_par = par.distinct() //去重操作 
distinct_par.count //取得总数 

该系共开设来多少门课程;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val par = lines.map(row=>row.split(",")(1)) 
val distinct_par = par.distinct() 
distinct_par.count 

Tom 同学的总成绩平均分是多少;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val pare = lines.filter(row=>row.split(",")(0)=="Tom") 
pare.foreach(println) 
Tom,DataBase,26 
Tom,Algorithm,12 
Tom,OperatingSystem,16 
Tom,Python,40 
Tom,Software,60 
pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt)).mapValues(x=>(x,1)).reduceByKey((x,y0) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect() 
//res9: Array[(String, Int)] = Array((Tom,30)) 

求每名同学的选修的课程门数

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val pare = lines.map(row=>(row.split(",")(0),row.split(",")(1))) 
pare.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)

各门课程的平均分是多少;

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val pare = lines.map(row=>(row.split(",")(1),row.split(",")(2).toInt)) 
pare.mapValues(x=>(x,1)).reduceByKey((x,y) => (x._1+y._1,x._2 + y._2)).mapValues(x => (x._1 / x._2)).collect() 
res0: Array[(String, Int)] = Array((Python,57), (OperatingSystem,54), (CLanguage,50), 

使用累加器计算共有多少人选了 DataBase 这门课

val lines = sc.textFile("file:///usr/local/spark/sparksqldata/Data01.txt") 
val pare = lines.filter(row=>row.split(",")(1)=="DataBase").map(row=>(row.split(",")(1),1)) 
val accum = sc.longAccumulator("My Accumulator") 
pare.values.foreach(x => accum.add(x)) 
accum.value 
res19: Long = 126 

DateFrame

源文件内容如下(包含 id,name,age),将数据复制保存到 ubuntu 系统/usr/local/spark 下, 命名为 employee.txt,实现从 RDD 转换得到 DataFrame,并按 id:1,name:Ella,age:36 的格式 打印出 DataFrame 的所有数据。请写出程序代码。

1,Ella,36

2,Bob,29

3,Jack,29

方法一:

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 
import spark.implicits._ object RDDtoDF { def main(args: Array[String]) { case class Employee(id:Long,name: String, age: Long) val employeeDF = spark.sparkContext.textFile("file:///usr/local/spark/employee.txt").map(_.split(",")).map(attributes => Employee(attributes(0).trim.toInt,attributes(1), attributes(2).trim.toInt)).toDF() employeeDF.createOrReplaceTempView("employee") val employeeRDD = spark.sql("select id,name,age from employee") employeeRDD.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() 	} 
} 

方法二:

import org.apache.spark.sql.types._import org.apache.spark.sql.Encoder 
import org.apache.spark.sql.Row object RDDtoDF { def main(args: Array[String]) { val employeeRDD = 
spark.sparkContext.textFile("file:///usr/local/spark/employee.txt") 
val schemaString = "id name age" 
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, 
StringType, nullable = true)) 
val schema = StructType(fields) 
val rowRDD = employeeRDD.map(_.split(",")).map(attributes => 
Row(attributes(0).trim, attributes(1), attributes(2).trim)) 
val employeeDF = spark.createDataFrame(rowRDD, schema) 
employeeDF.createOrReplaceTempView("employee") 
val results = spark.sql("SELECT id,name,age FROM employee") 
results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show() } 
} 

{ “id”:1 ,“name”:" Ella",“age”:36 }

{ “id”:2,“name”:“Bob”,“age”:29 }

{ “id”:3 ,“name”:“Jack”,“age”:29 }

{ “id”:4 ,“name”:“Jim”,“age”:28 }

{ “id”:5 ,“name”:“Damon” }

{ “id”:5 ,“name”:“Damon” }

创建 DataFrame

scala> import org.apache.spark.sql.SparkSession  
scala> val spark=SparkSession.builder().getOrCreate()  
scala> import spark.implicits._  
scala> val df = spark.read.json("file:///usr/local/spark/employee.json") 

(1) 查询 DataFrame 的所有数据

答案:

scala> df.show()  

(2) 查询所有数据,并去除重复的数据

答案:

scala> df.distinct().show()  

(3) 查询所有数据,打印时去除 id 字段

答案:

scala> df.drop("id").show()  

(4) 筛选 age>20 的记录

答案:

scala> df.filter(df("age") > 30 ).show() 

(5) 将数据按 name 分组

答案:

scala> df.groupBy("name").count().show() 

(6) 将数据按 name 升序排列

答案:

scala> df.sort(df("name").asc).show() 

(7) 取出前 3 行数据

答案:

scala> df.take(3) 或 scala> df.head(3) 

(8) 查询所有记录的 name 列,并为其取别名为 username

答案:

scala> df.select(df("name").as("username")).show() 

(9) 查询年龄 age 的平均值

答案:

scala> df.agg("age"->"avg") 

(10) 查询年龄 age 的最小值

答案:

scala> df.agg("age"->"min")

编程实现利用 DataFrame 读写 MySQL 的数据

(1) 在 MySQL 数据库中新建数据库 sparktest,再建表 employee,包含下列两行数据;

1 employee 表原有数据

id name gender age

1 Alice F 22 2 John M 25

假设当前目录为/usr/local/spark/mycode/testmysql,在当前目录下新建一个目录 mkdir -p src/main/scala , 然 后 在 目 录 /usr/local/spark/mycode/testmysql/src/main/scala 下 新 建 一 个testmysql.scala

import java.util.Properties 
import org.apache.spark.sql.types._ 
import org.apache.spark.sql.Row object TestMySQL { def main(args: Array[String]) { val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" ")) 
val schema = StructType(List(StructField("id", IntegerType, true),StructField("name", StringType, true),StructField("gender", StringType, true),StructField("age", IntegerType, true))) 
val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1).trim, p(2).trim,p(3).toInt)) 
val employeeDF = spark.createDataFrame(rowRDD, schema) 
val prop = new Properties() 
prop.put("user", "root") 
prop.put("password", "hadoop") 
prop.put("driver","com.mysql.jdbc.Driver") 
employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", 
sparktest.employee", prop) 
val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "hadoop").load() 
jdbcDF.agg("age" -> "max", "age" -> "sum") } 
} 

启动spark-shell

cd /usr/local/spark
./bin/spark-shell

退出spark-shell

:quit

集群里安装Spark

sudo tar -zxf ~/下载/spark-2.1.0-bin-without-hadoop.tgz -C /usr/local
cd /usr/local
sudo mv ./spark-2.1.0-bin-without-hadoop ./spark
sudo chow -R hadoop:hadoop ./spark

RDD转换操作API

操作含义
filter(func)帅选出满足函数func的元素,并返回一个新的数据集
map(func)将每个元素传入到函数func中,并将结构返回一个新的数据集
flatMap(func)与上类似,但每个元素都可以映射到0个或多个输出结果
groupByKey()应用于(K,V)键值对数据集时,返回一个(K,Iterable)形式的数据集
reduceByKey(func)应用于(K,V)键值对数据集时,返回一个新的(K,V)形式的数据集,V是每个Key传递到函数func中的聚合后的结果。

RDD行动操作API

操作含义
count()返回数据集中的元素个数
collect()以数组的形式返回数据集中的所有元素
first()返回数据集中的第一个元素
take(n)以数据的形式返回数据集中的前n个元素
reduce(func)通过函数func聚合数据集中的元素
foreache(func)将数据集中的每个元素传递到函数func中运行
http://www.lryc.cn/news/13630.html

相关文章:

  • 利用OpenCV的函数equalizeHist()对图像作直方图均衡化处理
  • 星河智联Android开发
  • 【C++】关联式容器——map和set的使用
  • Promise的实现原理
  • 【MFC】数据库操作——ODBC(20)
  • 旺店通与金蝶云星空对接集成采购入库单接口
  • Linux基础-学会使用命令帮助
  • MyBatis 之四(动态SQL之 if、trim、where、set、foreach 标签)
  • PAT (Advanced Level) Practice 1006 Sign In and Sign Out
  • Android入门第64天-MVVM下瀑布流界面的完美实现-使用RecyclerView
  • Windows PowerShell中成功进入conda虚拟环境
  • 【C++】类与对象理解和学习(中)
  • 每日英语学习(11)大英复习单词和翻译
  • x79主板M.2无法识别固态硬盘
  • 配置Tomcat性能优化
  • Hive3 安装方式详解,datagrid自定义驱动连接hive
  • 约束优化:约束优化的三种序列无约束优化方法(罚函数法)
  • 你真的会做APP UI自动化测试吗?我敢打赌百分之九十的人都不知道这个思路
  • GIT:【基础三】Git工作核心原理
  • 【1.12 golang中的指针】
  • 十五.程序环境和预处理
  • 高并发系统设计之负载均衡
  • 嵌入式Linux从入门到精通之第十四节:Linux IO控制技术
  • /etc/fstab文件
  • 深度学习神经网络基础知识(一) 模型选择、欠拟合和过拟合
  • 同样做软件测试,为什么有人月入3k-5k,有人能拿到17-20k?
  • 如何运行YOLOv5的代码,实现目标识别
  • 【正点原子FPGA连载】第十四章SD卡读写TXT文本实验 摘自【正点原子】DFZU2EG_4EV MPSoC之嵌入式Vitis开发指南
  • 【人工智能AI :Open AI】我想写一本书,书名是《中国文学史》,帮我列一下目录,细化到三级目录,不少于2000字。
  • 「文档数据库之争」MongoDB和CouchDB的比较