当前位置: 首页 > article >正文

Spark SQL 初探: 使用大数据分析2000万数据

去年网上曾放出个2000W的开房记录的数据库, 不知真假。 最近在学习Spark, 所以特意从网上找来数据测试一下, 这是一个绝佳的大数据素材。
如果数据涉及到个人隐私,请尽快删除, 本站不提供此类数据。你可以写个随机程序生成2000W的测试数据, 以CSV格式。

Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map
reduce算法实现的分布式计算,拥有Hadoop
MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更
好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

Spark是一个高效的分布式计算系统,相比Hadoop,它在性能上比Hadoop要高100倍。Spark提供比Hadoop更上层的API,
同样的算法在Spark中实现往往只有Hadoop的1/10或者1/100的长度。Shark类似“SQL on
Spark”,是一个在Spark上数据仓库的实现,在兼容Hive的情况下,性能最高可以达到Hive的一百倍。

Apache Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala
能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。

2014年处, Apache 基金会宣布旗下的 Apache Spark 项目成为基金会的顶级项目,拥有顶级域名
http://spark.apache.org/。 Spark
的用户包括:阿里巴巴、Cloudera、Databricks、IBM、英特尔和雅虎等知名厂商。

Spark SQL是支持在Spark中使用Sql、HiveSql、Scaca中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把
行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件,
最重要的是它可以支持用HiveQL从hive里面读取数据。

在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark
SQL上。在会议上,Databricks表示,Shark更多是对Hive的改造,替换了Hive的物理执行引擎,因此会有一个很快的速度。然而,不容
忽视的是,Shark继承了大量的Hive代码,因此给优化和维护带来了大量的麻烦。随着性能优化和先进分析整合的进一步加深,基于MapReduce设
计的部分无疑成为了整个项目的瓶颈。 详细内容请参看 [ Shark, Spark SQL, Hive on Spark, and the future of
SQL on Spark ](http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-
spark-and-the-future-of-sql-on-spark.html)

当前Spark SQL还处于alpha阶段,一些API在将将来的版本中可能会有所改变。

我也翻译几篇重要的Spark文档,你可以在我的网站找到。 Spark翻译文档

本文主要介绍了下面几个知识点:

  • Spark读取文件夹的文件
  • Spark filter和map使用
  • Spark sql语句调用
  • 自定义Spark sql的函数

提前讲一下,我也是最近才学习Spark及其相关的技术如Scala,下面的例子纯粹为了验证性的试验, 相信例子代码很很多优化的地方。

安装和配置Spark

当前最新的Spark版本为1.1.1, 因为我们以Standalone方式运行Spark,所以直接随便挑一个版本, 比如spark-1.1.1-bin-
hadoop2.4.tgz, 解压到你的机器上。
我使用的CentOS 6.4。 具体来讲,它是我笔记本的一个虚拟机, 4个核, 4G内存。

在/opt解压它, 命令行中进入解压后的目录/opt/spark-1.1.1-bin-hadoop2.4。

运行 ./bin/spark-shell 就可以启动一个交互式的spark shell控制台, 在其中可以执行scala代码。

回到顶部

Spark初试

因为我们以本地单机的形式测试Spark, 你需要配置以下你的spark, 否则在分析大数据时很容易出现内存不够的问题。
在我的机器上, conf文件夹下复制一份spark-defaults.conf,将使用的内存增大一些:

?

1

2

|

spark.executor.memory 2g

spark.driver.memory 2g

—|---

启动shark-shell的时候设置使用4个核。

?

1

|

[root @colobu conf]# ./bin/spark-shell --master local[ 4 ]

—|---

根据 Spark 快速入门
中的介绍运行个例子测试一下:

?

1

2

3

4

5

|

scala> val textFile = sc.textFile( "README.md" )

14 / 12 / 11 13 : 52 : 00 INFO MemoryStore: ensureFreeSpace( 163705 ) called with curMem= 0 , maxMem= 1111794647

14 / 12 / 11 13 : 52 : 00 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 159.9 KB, free 1060.1 MB)

textFile: org.apache.spark.rdd.RDD[String] = README.md MappedRDD[ 1 ] at textFile at <console>: 12

scala> textFile.count()

—|---

这个例子从Spark解压目录下的README.md文件创建一个RDD,并统计此文件有多少行。

再看一个抛针法计算PI值的例子。

?

1

2

3

4

5

6

7

|

val NUM_SAMPLES= 1000000

val count = sc.parallelize( 1 to NUM_SAMPLES).map{i =>

val x = Math.random()

val y = Math.random()

if (x*x + y*y < 1 ) 1 else 0

}.reduce(_ + _)

println( "Pi 值大约为 " \+ 4.0 * count / NUM_SAMPLES)

—|---

结果为:

?

1

|

Pi 值大约为 3.141408

—|---

到目前为止,我们搭建好了一个Spark环境, 并简单进行了测试。 下一步我们使用Spark SQL分析前面所说的数据。

回到顶部

使用Spark SQL分析数据

这一步,我们使用Spark SQL按照星座对2000W数据进行分组统计, 看看哪个星座的人最喜欢开房。
当然, 使用纯Spark也可以完成我们的分析, 因为实际Spark SQL最终是利用Spark来完成的。
实际测试中发现这些数据并不是完全遵守一个schema, 有些数据的格式是不对的, 有些数据的数据项也是错误的。 在代码中我们要剔除那么干扰数据。
反正我们用这个数据测试者玩, 并没有严格的要求去整理哪些错误数据。

先看代码:

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

|

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

import sqlContext.createSchemaRDD

case class Customer(name: String, gender: String, ctfId: String, birthday: String, address: String)

val customer = sc.textFile( "/mnt/share/2000W/*.csv" ).map(_.split( "," )).filter(line => line.length > 7 ).map(p => Customer(p(
0 ), p( 5 ), p( 4 ), p( 6 ), p( 7 ))).distinct()

customer.registerTempTable( "customer" )

def toInt(s: String):Int = {

try {

s.toInt

} catch {

case e:Exception => 9999

}

}

def myfun(birthday: String) : String = {

var rt = "未知"

if (birthday.length == 8 ) {

val md = toInt(birthday.substring( 4 ))

if (md >= 120 & md <= 219 )

rt = "水瓶座"

else if (md >= 220 & md <= 320 )

rt = "双鱼座"

else if (md >= 321 & md <= 420 )

rt = "白羊座"

else if (md >= 421 & md <= 521 )

rt = "金牛座"

else if (md >= 522 & md <= 621 )

rt = "双子座"

else if (md >= 622 & md <= 722 )

rt = "巨蟹座"

else if (md >= 723 & md <= 823 )

rt = "狮子座"

else if (md >= 824 & md <= 923 )

rt = "处女座"

else if (md >= 924 & md <= 1023 )

rt = "天秤座"

else if (md >= 1024 & md <= 1122 )

rt = "天蝎座"

else if (md >= 1123 & md <= 1222 )

rt = "射手座"

else if ((md >= 1223 & md <= 1231 ) | (md >= 101 & md <= 119 ))

rt = "摩蝎座"

else

rt = "未知"

}

rt

}

sqlContext.registerFunction( "constellation" , (x:String) => myfun(x))

var result = sqlContext.sql( "SELECT constellation(birthday), count(constellation(birthday)) FROM customer group by constellation(birthday)" )

result.collect().foreach(println)

—|---

为了使用spark sql,你需要引入 sqlContext.createSchemaRDD . Spark sql一个核心对象就是 SchemaRDD 。 上面的 import 可以隐式的将一个RDD转换成SchemaRDD。
接着定义了 Customer 类,用来映射每一行的数据, 我们只使用每一行很少的信息, 像地址,email等都没用到。
接下来从2000W文件夹中读取所有的csv文件, 创建一个RDD并注册表customer。
因为没有一个内建的函数可以将出生一起映射为星座, 所以我们需要定义一个映射函数 myfun , 并把它注册到SparkContext中。
这样我们就可以在sql语句中使用这个函数。 类似地,字符串的length函数当前也不支持, 你可以增加一个这样的函数。
因为有的日期不正确,所有特别增加了一个”未知”的星座。 错误数据可能有两种, 一是日期出错, 而是此行格式不对,将其它字段映射成了出生日期。
我们在分析的时候忽略它们好了。

然后执行一个分组的sql语句。这个sql语句查询结果类型为SchemaRDD, 也继承了RDD所有的操作。
最后将结果打印出来。

?

1

2

3

4

5

6

7

8

9

10

11

12

|

[双子座, 1406018 ]

[双鱼座, 1509839 ]

[摩蝎座, 2404812 ]

[金牛座, 1406225 ]

[水瓶座, 1635358 ]

[巨蟹座, 1498077 ]

[处女座, 1666009 ]

[天秤座, 1896544 ]

[白羊座, 1409838 ]

[射手座, 1614915 ]

[未知, 160483 ]

[狮子座, 1613529 ]

—|---

看起来魔蝎座的人最喜欢开房了, 明显比其它星座的人要多。

我们也可以分析一下开房的男女比例:

?

1

2

3

|

......

result = sqlContext.sql( "SELECT gender, count(gender) FROM customer where gender = 'F' or gender = 'M' group by gender" )

result.collect().foreach(println)

—|---

结果显示男女开房的人数大约是2:1

?

1

2

|

[F, 6475461 ]

[M, 12763926 ]

—|---

在这里插入图片描述

http://www.lryc.cn/news/2413699.html

相关文章:

  • 安装程序不能验证 Update.inf 文件的完整性 之解决办法
  • 10分钟带你搭建属于自己的博客
  • 如何在谷歌地图(google maps)中获取经纬度
  • 基于DDD的微服务设计和开发实战
  • UILite——C++类库(XLib非界面功能库+UI和DirectUI库)简介
  • Linux操作系统介绍
  • 介绍个京阿尼的旧作
  • matlab水力学工具箱,水工设计工具箱免费版
  • 跨页传送-PreviousPage
  • 安卓应用《撕掉她的衣服》简化版
  • 新手唱歌从入门到精通,经典唱歌教程系列合集
  • 木马病毒表现手机中病毒/木马病毒对手机有用吗-手把手教白帽子自学
  • 解决笔记本电脑开机密码遗忘的几种方案,帮你轻松解除
  • 探索未来编程的新纪元:Eve语言
  • Windows Server 2003 Service Pack 2 安装和部署指南
  • sem_wait
  • 多进程和多线程
  • 全面解析msvcr100.dll丢失的5种解决方法,快速解决dll丢失问题
  • Linux嵌入式定义四个缓存区,Linux文件缓冲区详解
  • Linux服务器本地POST接口测试
  • 【正点原子FPGA连载】第二十章IP核之FIFO实验 -摘自【正点原子】新起点之FPGA开发指南_V2.1
  • _beginthreadex 和 _beginthread 的区别
  • 在java中调用weka聚类方法并显示相应的类标签+weka中创建arff数据
  • 企业安全意识必修课——修改远程桌面登录端口避免恶意扫描入侵
  • JSP | 简易购物车的实现
  • HTC (Desire V)T328W ROOT 教程 及关闭充电动画
  • 传奇单机版就是自己在家里架设一个
  • 基于SpringBoot+Vue酒店订房系统的设计与实现
  • 关于代理服务器的原理及用法
  • svn找不到节点_章泽天公园甩鞭抽陀螺,现场图曝光:生活不易,全靠自己找乐趣...