当前位置: 首页 > news >正文

spark读取数据性能提升

1. 背景

spark默认的jdbc只会用单task读取数据,读取大数据量时,效率低。

2. 解决方案

根据分区字段,如日期进行划分,增加task数量提升效率。

  /*** 返回每个task按时间段划分的过滤语句* @param startDate* @param endDate* @param threadCount* @return*/def getPredicateDates(startDate: String, endDate: String, threadCount: Int): Array[String] = {getPredicates(startDate, endDate, threadCount).map(x=>s"recordDate>='${x._1}' and recordDate <='${x._2}'")}/*** 将startDate到endDate间的日期,根据给定的threadCount参数,做时间段划分,例如:* getPredicates("2017-01-01", "2017-01-31", 10)* 返回:* 2017-01-01 -> 2017-01-04* 2017-01-05 -> 2017-01-08* 2017-01-09 -> 2017-01-12* 2017-01-13 -> 2017-01-16* 2017-01-17 -> 2017-01-20* 2017-01-21 -> 2017-01-24* 2017-01-25 -> 2017-01-28* 2017-01-29 -> 2017-01-31** @param startDate   开始日期* @param endDate     结束日期* @param threadCount 线程数* @return 包含各个连续时段的数组*/def getPredicates(startDate: String, endDate: String, threadCount: Int): Array[(String, String)] = {val dayDiff = DateTimeUtils.rangeDay(startDate, endDate)val buff = new ArrayBuffer[(String, String)]()if (dayDiff <= threadCount) {//天数差小于期望的线程数,则按照每天一个线程处理var tempDate = startDatewhile (tempDate <= endDate) {buff += (tempDate -> tempDate)tempDate = DateTimeUtils.dateAddOne(tempDate)}} else {//天数差大于期望的线程数,则按照线程数对时间段切分val offset = (dayDiff / threadCount).toIntvar tempDate = startDatewhile (DateTimeUtils.dateAddN(tempDate, offset) <= endDate) {buff += (tempDate -> DateTimeUtils.dateAddN(tempDate, offset))tempDate = DateTimeUtils.dateAddOne(DateTimeUtils.dateAddN(tempDate, offset))}if (tempDate != endDate) {buff += (tempDate -> endDate)}}buff.toArray}
DateTimeUtils工具类
import java.text.SimpleDateFormat
import java.util.{Calendar, Date, Locale}object DateTimeUtils {def rangeDay(startDateStr: String, endDateStr: String): Long = {val dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")val startDate: Date = dateFormat.parse(startDateStr)val endDate: Date = dateFormat.parse(endDateStr)(endDate.getTime() - startDate.getTime()) / 1000 / 60 / 60 / 24}def dateAddOne(dateStr: String): String = {var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")var dateInfo: Date = dateFormat.parse(dateStr)var cal: Calendar = Calendar.getInstance()cal.setTime(dateInfo)cal.add(Calendar.DATE, 1)dateFormat.format(cal.getTime)}def dateAddN(dateStr: String, value: Int): String = {var dateFormat: SimpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")var dateInfo: Date = dateFormat.parse(dateStr)var cal: Calendar = Calendar.getInstance()cal.setTime(dateInfo)cal.add(Calendar.DATE, value)dateFormat.format(cal.getTime)}
}

举例

    val startDate = DateTimeUtils.dateAddN(calcDate,-365) //获取计算日期一年前的日期作为开始时间val predicates= getPredicateDates(startDate,calcDate,12) //分12个task读取,提高性能val url = PropUtils.getProxyJdbc() //jdbc连接的代理(需按自己的项目实现)val res = spark.read.jdbc(url, tableName, predicates,PropUtils.getProperties()) 

3. 实验及结论

使用1个节点 8核16G的Clickhouse数据库,spark从clickhouse读取近4亿行数据。

单Task运行时间:14min

按日期划分成12个Task,运行时间:1.6min

结论:性能提升88.6%

http://www.lryc.cn/news/443946.html

相关文章:

  • 一次使用threading.Thread来实现Pytorch多个模型并发运行的失败案例
  • HashMap源码
  • 探索 Web Speech API:实现浏览器语音识别与合成
  • python基础题练习
  • 工业交换机如何保证数据的访问安全
  • jmeter得到的文档数据处理
  • 12- 【JavaWeb】校园快递管理系统-数据库建设
  • Windows本地连接远程服务器并创建新用户详细记录
  • 【kaggle竞赛】毒蘑菇的二元预测题目相关信息和思路求解代码
  • Pytest-allure如何在测试完成后自动生成完整报告?
  • 数据结构-树(基础,分类,遍历)
  • CodeGeeX4:程序员的高效助手,多语言代码生成神器!
  • 小程序组件间通信
  • Homebrew安装与切换下载源
  • C#回调函数
  • Matplotlib绘制热力图
  • 手写SpringMVC
  • mysql学习教程,从入门到精通,SQL 删除数据(DELETE 语句)(18)
  • 周边游小程序开发
  • 初级前端面试
  • 微软AI核电计划
  • 图片马赛克处理(Java)
  • python+selenium实现自动联网认证,并实现断网重连
  • 基于机器学习的注意力缺陷/多动障碍 (ADHD)(python论文+代码)HYPERAKTIV
  • Spring Boot 集成 Redisson 实现消息队列
  • go语言Map详解
  • C++——已知数组a[6]={1,3,5,7,9};输入一个数值,要求按照现有排序规律将它放入数组当中。
  • 云计算第四阶段---CLOUD Day7---Day8
  • 深入解析ThingsBoard与ThingsKit物联网平台的差异
  • 五、CAN总线