当前位置: 首页 > news >正文

Spark【RDD编程(四)综合案例】

案例1-TOP N个数据的值

输入数据:
 

1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27

处理代码:

def main(args: Array[String]): Unit = {//创建SparkContext对象val conf:SparkConf = new SparkConf()conf.setAppName("test1").setMaster("local")val sc: SparkContext = new SparkContext(conf)var index: Int = 0//通过加载本地文件系统的数据创建RDD对象val rdd: RDD[String] = sc.textFile("data/file1.txt")rdd.filter(line=>line.split(",").length == 4).map(line=>line.split(",")(2)).map(word=>(word.toInt,1)).sortByKey(false).map(kv=>kv._1).take(5).foreach(key=>{index += 1println(index + s"\t$key")})//关闭SparkContext对象sc.stop()}

代码解析:

  • sc.textFile("data/file1.txt"):通过加载本地文件来创建RDD对象
    
  • rdd.filter(line=>line.split(",").length == 4):确保数据的完整性
  • map(line=>line.split(",")(2)):通过逗号将一行字符串分隔开来组成一个Array数组并取出数组中第3个严肃
  • map(word=>(word.toInt,1)):因为我们的sortByKey方法是针对键值对进行操作的,所以必须把我们上面取出来的值转为(值,x)形式的键值对。
  • sortByKey(false):设置参数为false表示降序排列。
  • map(kv=>kv._1).take(5):取出top五。

 运行结果:

1	7890
2	788
3	600
4	290
5	259

案例2-文件排序

要求:输入三个文件(每行一个数字),要求输出一个文件,文件内文本格式为(序号 数值)。

rdd.map(num => (num.toInt,1)).partitionBy(new HashPartitioner(1)).sortByKey().map(t=>{index += 1(index,t._1)}).foreach(println) //只有调用 行动操作语句 才会触发真正的从头到尾的计算

        我们会发现,如果我们不调用 foreach 这个行动操作而是直接在转换操作中进行输出的话,这样是输出不来结果的,所以我们必须要调用行动操作。

        而且,我们必须对分区进行归并,因为在分布式环境下,只有把多个分区合并成一个分区,才能使得结果整体有序。(这里尽管我们是本地测试,数据源是一个目录下的文件,但是我们也要考虑到假如是在分布式环境下的情况)

运行结果:

(1,1)
(2,4)
(3,5)
(4,12)
(5,16)
(6,25)
(7,33)
(8,37)
(9,39)
(10,40)
(11,45)

案例3-二次排序

要求:对格式为(数值 数值)类型的数据进行排序,假如第一个数值相同,则比较第二个数值。

import com.study.spark.core.rdd
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}class SecondarySortKey(val first:Int,val second:Int) extends Ordered[SecondarySortKey] with Serializable {override def compare(other: SecondarySortKey): Int = {if (this.first - other.first != 0) {this.first - other.first}else{this.second-other.second}}
}
object SecondarySortKey{def main(args: Array[String]): Unit = {val conf:SparkConf = new SparkConf()conf.setAppName("test3").setMaster("local")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[String] = sc.textFile("data/sort/test03.txt")val rdd2: RDD[(SecondarySortKey, String)] = rdd.map(line => (new SecondarySortKey(line.split(" ")(0).toInt, line.split(" ")(1).toInt), line))rdd2.sortByKey(false).map(t=>t._2).foreach(println)sc.stop()}
}

这里我们使用了自定义的类并继承了Ordered 和 Serializable 这两个特质,为了实现自定义的排序规则。 其中,Ordered 特质的混入需要重写它的 compare 方法来实现我们的自定义比较规则,而 Serializable 的混入作用是使得我们的对象可以序列化,以便在网络中可以传输。

运行结果:

8 3
5 6
5 3
4 9
4 7
3 2
1 6

案例4-平均成绩

给出三门成绩的三个文件,要求算出每位学生的平均成绩。

//读入数据val rdd: RDD[String] = sc.textFile("data/rdd/test3")rdd.map(line=>(line.split(" ")(0),line.split(" ")(1).toInt)).map(t=>(t._1,(t._2,1))).reduceByKey((t1,t2)=>(t1._1+t2._1,t1._2+t2._2)).mapValues(t=>t._1/t._2.toFloat).foreach(println)

运行结果:

(小新,88.333336)
(小丽,88.666664)
(小明,89.666664)
(小红,83.666664)

综合案例

输入数据格式:(姓名,课程名,成绩)

Aaron,OperatingSystem,100
Aaron,Python,50
Aaron,ComputerNetwork,30
Aaron,Software,94
Abbott,DataBase,18
Abbott,Python,82
Abbott,ComputerNetwork,76
Abel,Algorithm,30
Abel,DataStructure,38
Abel,OperatingSystem,38
...
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDDPractice {def main(args: Array[String]): Unit = {val conf:SparkConf = new SparkConf()conf.setAppName("test-last").setMaster("local")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[String] = sc.textFile("data/chapter5-data1.txt")//(1)该系共有多少名学生val nums: Long = rdd.map(line => line.split(",")(0)).distinct().count()println("该系一共 "+nums+" 名学生")//(2)该系共开设多少门课程val course_nums: Long = rdd.map(line => line.split(",")(1)).distinct().count()println("该系一共 "+course_nums+" 门课程")//(3)学生 Tom 的总成绩和平均成绩分别是多少val score: Double = rdd.filter(line => line.contains("Tom")).map(line => line.split(",")(2).toInt).sum()val avg: Double = score/rdd.filter(line => line.contains("Tom")).map(line=>line.split(",")(1)).count()println("Tom 的总成绩为 "+score+",平均成绩为 "+avg)//(4)求每名同学的选修的课程门数rdd.map(line=>(line.split(",")(0),line.split(",")(1)))  //(学生名,课程名).mapValues(v => (v,1))  //(学生名,(课程名,1)).reduceByKey((k,v)=>("",k._2+v._2)) //(学生名,("",1+1+1)) 合并课程总数.mapValues(x => x._2) //(学生名,课程总数).foreach(println)//(5)该系DataBase课程共有多少人选修val l = rdd.filter(line => line.split(",")(1) == "DataBase").count()println("选修DataBase课程的人数为 "+l)//(6)各门课程的平均分是多少//(学生,课程名,成绩)=>课程总成绩/该课程的学生数val res: RDD[(String, Float)] = rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt)) //(课程名,成绩).combineByKey(score => (score, 1),  //(成绩,1)(acc: (Int, Int), score) => (acc._1 + score, acc._2 + 1), //(成绩1+成绩2,1+1)(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  //(成绩+成绩,1+1)).map({case (key, value) => (key, value._1 / value._2.toFloat) //(课程名,总课程成绩/课程人数)})res.saveAsTextFile("data/rdd/practice")sc.stop()}
}

运行结果:

该系一共 265 名学生
该系一共 8 门课程
Tom 的总成绩为 154.0,平均成绩为 30.8
(Ford,3)
(Lionel,4)
(Verne,3)
(Lennon,4)
(Joshua,4)
(Marvin,3)
(Marsh,4)
(Bartholomew,5)
(Conrad,2)
(Armand,3)
(Jonathan,4)
(Broderick,3)
(Brady,5)
(Derrick,6)
(Rod,4)
(Willie,4)
(Walter,4)
(Boyce,2)
(Duncann,5)
(Elvis,2)
(Elmer,4)
(Bennett,6)
(Elton,5)
(Jo,5)
(Jim,4)
(Adonis,5)
(Abel,4)
(Peter,4)
(Alvis,6)
(Joseph,3)
(Raymondt,6)
(Kerwin,3)
(Wright,4)
(Adam,3)
(Borg,4)
(Sandy,1)
(Ben,4)
(Miles,6)
(Clyde,7)
(Francis,4)
(Dempsey,4)
(Ellis,4)
(Edward,4)
(Mick,4)
(Cleveland,4)
(Luthers,5)
(Virgil,5)
(Ivan,4)
(Alvin,5)
(Dick,3)
(Bevis,4)
(Leo,5)
(Saxon,7)
(Armstrong,2)
(Hogan,4)
(Sid,3)
(Blair,4)
(Colbert,4)
(Lucien,5)
(Kerr,4)
(Montague,3)
(Giles,7)
(Kevin,4)
(Uriah,1)
(Jeffrey,4)
(Simon,2)
(Elijah,4)
(Greg,4)
(Colin,5)
(Arlen,4)
(Maxwell,4)
(Payne,6)
(Kennedy,4)
(Spencer,5)
(Kent,4)
(Griffith,4)
(Jeremy,6)
(Alan,5)
(Andrew,4)
(Jerry,3)
(Donahue,5)
(Gilbert,3)
(Bishop,2)
(Bernard,2)
(Egbert,4)
(George,4)
(Noah,4)
(Bruce,3)
(Mike,3)
(Frank,3)
(Boris,6)
(Tony,3)
(Christ,2)
(Ken,3)
(Milo,2)
(Victor,2)
(Clare,4)
(Nigel,3)
(Christopher,4)
(Robin,4)
(Chad,6)
(Alfred,2)
(Woodrow,3)
(Rory,4)
(Dennis,4)
(Ward,4)
(Chester,6)
(Emmanuel,3)
(Stan,3)
(Jerome,3)
(Corey,4)
(Harvey,7)
(Herbert,3)
(Maurice,2)
(Merle,3)
(Les,6)
(Bing,6)
(Charles,3)
(Clement,5)
(Leopold,7)
(Brian,6)
(Horace,5)
(Sebastian,6)
(Bernie,3)
(Basil,4)
(Michael,5)
(Ernest,5)
(Tom,5)
(Vic,3)
(Eli,5)
(Duke,4)
(Alva,5)
(Lester,4)
(Hayden,3)
(Bertram,3)
(Bart,5)
(Adair,3)
(Sidney,5)
(Bowen,5)
(Roderick,4)
(Colby,4)
(Jay,6)
(Meredith,4)
(Harold,4)
(Max,3)
(Scott,3)
(Barton,1)
(Elliot,3)
(Matthew,2)
(Alexander,4)
(Todd,3)
(Wordsworth,4)
(Geoffrey,4)
(Devin,4)
(Donald,4)
(Roy,6)
(Harry,4)
(Abbott,3)
(Baron,6)
(Mark,7)
(Lewis,4)
(Rock,6)
(Eugene,1)
(Aries,2)
(Samuel,4)
(Glenn,6)
(Will,3)
(Gerald,4)
(Henry,2)
(Jesse,7)
(Bradley,2)
(Merlin,5)
(Monroe,3)
(Hobart,4)
(Ron,6)
(Archer,5)
(Nick,5)
(Louis,6)
(Len,5)
(Randolph,3)
(Benson,4)
(John,6)
(Abraham,3)
(Benedict,6)
(Marico,6)
(Berg,4)
(Aldrich,3)
(Lou,2)
(Brook,4)
(Ronald,3)
(Pete,3)
(Nicholas,5)
(Bill,2)
(Harlan,6)
(Tracy,3)
(Gordon,4)
(Alston,4)
(Andy,3)
(Bruno,5)
(Beck,4)
(Phil,3)
(Barry,5)
(Nelson,5)
(Antony,5)
(Rodney,3)
(Truman,3)
(Marlon,4)
(Don,2)
(Philip,2)
(Sean,6)
(Webb,7)
(Solomon,5)
(Aaron,4)
(Blake,4)
(Amos,5)
(Chapman,4)
(Jonas,4)
(Valentine,8)
(Angelo,2)
(Boyd,3)
(Benjamin,4)
(Winston,4)
(Allen,4)
(Evan,3)
(Albert,3)
(Newman,2)
(Jason,4)
(Hilary,4)
(William,6)
(Dean,7)
(Claude,2)
(Booth,6)
(Channing,4)
(Jeff,4)
(Webster,2)
(Marshall,4)
(Cliff,5)
(Dominic,4)
(Upton,5)
(Herman,3)
(Levi,2)
(Clark,6)
(Hiram,6)
(Drew,5)
(Bert,3)
(Alger,5)
(Brandon,5)
(Antonio,3)
(Elroy,5)
(Leonard,2)
(Adolph,4)
(Blithe,3)
(Kenneth,3)
(Perry,5)
(Matt,4)
(Eric,4)
(Archibald,5)
(Martin,3)
(Kim,4)
(Clarence,7)
(Vincent,5)
(Winfred,3)
(Christian,2)
(Bob,3)
(Enoch,3)
选修DataBase课程的人数为 126

各门课程的平均分是多少,输出文件:

(CLanguage,50.609375)
(Software,50.909092)
(Python,57.82353)
(Algorithm,48.833332)
(DataStructure,47.572517)
(DataBase,50.539684)
(ComputerNetwork,51.90141)
(OperatingSystem,54.9403)

解析

(1)该系共有多少名学生

首先使用map 转换操作从数据中提取出来所有的学生姓名,然后使用转换操作 distinct 函数去重,最后使用行动操作 count 进行统计。

    //(1)该系共有多少名学生val nums: Long = rdd.map(line => line.split(",")(0)).distinct().count()

(2)该系共开设多少门课程

同(1),不同的是我们提取的是所有的课程名。

    //(2)该系共开设多少门课程val course_nums: Long = rdd.map(line => line.split(",")(1)).distinct().count()

(3)学生 Tom 的总成绩和平均成绩分别是多少

对于总成绩,使用过滤函数 filter 提取出含有"Tom"的数据行,然后将一行字符串转为多个字段并取出成绩字段的值并求和。

对于平均成绩,我们计算出科目的数量然后用总成绩除以它即可。

//(3)学生 Tom 的总成绩和平均成绩分别是多少val score: Double = rdd.filter(line => line.contains("Tom")).map(line => line.split(",")(2).toInt).sum()val avg: Double = score/rdd.filter(line => line.contains("Tom")).map(line=>line.split(",")(1)).count()println("Tom 的总成绩为 "+score+",平均成绩为 "+avg)

(4)求每名同学的选修的课程门数

先取出学生名和课程名,把学生名最为key,课程名通过mapValues函数转为(课程名,1)的形式,对于相同的学生,通过reduceByKey函数累加它的课程数,通过mapValues函数将键值对形式的value转为单个的值-课程总数。

//(4)求每名同学的选修的课程门数rdd.map(line=>(line.split(",")(0),line.split(",")(1)))  //(学生名,课程名).mapValues(v => (v,1))  //(学生名,(课程名,1)).reduceByKey((k,v)=>("",k._2+v._2)) //(学生名,("",1+1+1)) 合并课程总数.mapValues(x => x._2) //(学生名,课程总数).foreach(println)

(5)该系DataBase课程共有多少人选修

直接通过 count 函数对字段1为"DataBase"的数据行进行统计。

//(5)该系DataBase课程共有多少人选修val l = rdd.filter(line => line.split(",")(1) == "DataBase").count()

(6)各门课程的平均分是多少

通过combineByKey函数通过对每个key(课程)对应的value(成绩)转为(成绩,1)的形式,

然后对相同的key(课程)的值(成绩,1)进行合并,将成绩和次数进行累加,

对于不同分区的数据也是一样,对成绩和次数都进行累加,

最后按照要求的格式输出(课程名,总成绩/总次数=课程平均成绩)

//(6)各门课程的平均分是多少//(学生,课程名,成绩)=>课程总成绩/该课程的学生数val res: RDD[(String, Float)] = rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt)) //(课程名,成绩).combineByKey(score => (score, 1),  //(成绩,1)(acc: (Int, Int), score) => (acc._1 + score, acc._2 + 1), //(成绩1+成绩2,1+1)(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)  //(成绩+成绩,1+1)).map({case (key, value) => (key, value._1 / value._2.toFloat) //(课程名,总课程成绩/课程人数)})

除此之外,也可以用reduceByKey来进行解决,二者的原理是一样的:

rdd.map(line => (line.split(",")(1), line.split(",")(2).toInt)).map(t => (t._1, (t._2, 1))).reduceByKey((t1, t2) => (t1._1 + t2._1, t1._2 + t2._2)).map(t => (t._1, t._2._1 / t._2._2.toFloat))    //这行代码可以用mapValues()替换,因为我们本来就是只对value进行操作,key不需要改变.foreach(println)

http://www.lryc.cn/news/162953.html

相关文章:

  • Golang报错mixture of field:value and value initializers
  • 【网络教程】记一次使用Docker手动搭建BT宝塔面板的全过程(包含问题解决如:宝塔面板无法开启防火墙,ssh,nginx等)
  • 【大虾送书第九期】速学Linux:系统应用从入门到精通
  • docker相关命令
  • 【Redis】4、rsync远程同步
  • 无服务架构--Serverless
  • 2023-09-07 LeetCode每日一题(修车的最少时间)
  • 数据挖掘实验-主成分分析与类特征化
  • 70. 爬楼梯 (进阶),322. 零钱兑换,279.完全平方数
  • Apache Doris 2.0 如何实现导入性能提升 2-8 倍
  • RabbitMQ: topic 结构
  • 信息系统项目管理教程(第4版):第二章 信息技术及其发展
  • 有哪些适合初学者的编程语言?
  • uni-app动态tabBar,根据不同用户展示不同的tabBar
  • 手写Spring:第6章-资源加载器解析文件注册对象
  • Redis 7 第八讲 集群模式(cluster)架构篇
  • 【PowerQuery】导入与加载XML
  • vue 预览视频
  • 4个维度讲透ChatGPT技术原理,揭开ChatGPT神秘技术黑盒!(文末送书)
  • 【无标题】@Scheduled 的cron
  • IP和MAC的作用区别
  • python趣味编程-数独游戏
  • MySQL/MariaDB 查询某个 / 多个字段重复数据
  • 【力扣每日一题】2023.9.10 课程表Ⅱ
  • VSCODE CMAKE C++ 工程调试, C++不以科学计数法输出并控制小数位数
  • Drools规则引擎入门学习记录
  • 肖sir__设计测试用例方法之判定表06_(黑盒测试)
  • <图像处理> 空间滤波基础
  • 如何在Django中使用django-crontab启动定时任务、关闭任务以及关闭指定任务
  • mysql配置项整理