当前位置: 首页 > news >正文

LouvainMethod分布式运行的升级之路

1、背景介绍        

        Louvain是大规模图谱的谱聚类算法,引入模块度的概念分二阶段进行聚类,直到收敛为止。分布式的代码可以在如下网址进行下载。

GitHub - Sotera/spark-distributed-louvain-modularity: Spark / graphX implementation of the distributed louvain modularity algorithm

  该代码依赖的spark-core和spark-graphx、scala-lang是2.10版本,采用的gradle的进行打包,也可以采用maven进行打包,解决相关的依赖问题之后,本地模式可以很快跑通。但是转向集群的时候,发现集群的spark的scala版本是2.12,我采用的是maven的scala编译的版本是2.10, 编译用到的scala和运行环境的scala版本不一致,结果无法进行spark集群模式的运行。

2、LouvainMethod的升级之路

        首先更改环境,即把louvain的代码依赖保持2.10,把spark的scala版本改成2.10,但是这样会影响其他任务的执行,其他任务可能依赖2.12的版本。因此,踏上了LouvainMethod的升级之路,即由2.10升级到2.12。

        将项目依赖的版本和编译的scala版本改为2.12之后,发现在新的高版本的spark-graphx_2.12版本里Graph对象没有了mapReduceTriplet方法,通过查找发现该方法在2.12版本的GraphXUtils类里,以一个私有方法存在,只能在包graphx下被访问,对外部不可见,因此首先想到的是通过反射机制对该私有方法进行访问,参照了如下的方法:

         在任意scala对象中调用私有方法 - 问答 - 腾讯云开发者社区-腾讯云

代码调试后,私有方法带有泛类型参数和普通参数,可以正常被反射出来,然而在调用的时候,始终报 wrong-number-of-arguments的问题。原因还没有查到。继而通过高版本的api是实现低版本的mapReduceTriplets方法。   参照该文档  GraphX - Spark 3.4.1 Documentation   的api接口含义,注意到新版2.12的Graph里aggregateMessage方法和低版本的mapReduceTriplets返回值一致,参数类型有diff,高版本的参数是EdgeContext,低版本的是EdgeTriplet,高版本通过sendToDst和sendToSrc对低版本进行了简化,使用功能更强大,因此尝试用aggregateMessage实现mapReduceTriplets。

val nodeWeightMapFunc = (e:EdgeTriplet[VD,Long]) => Iterator((e.srcId,e.attr), (e.dstId,e.attr))

val nodeWeightReduceFunc = (e1:Long,e2:Long) => e1+e2

转化为:

def nodeWeightMapFunc(e:EdgeContext[VD, Long, Long]) {
e.sendToDst(e.attr)
e.sendToSrc(e.attr)
}
Msg与reduceFunc的返回值保持一致。

通过如下方式进行调用:val nodeWeights = graph.aggregateMessages[Long](nodeWeightMapFunc,nodeWeightReduceFunc)

sendMsg的低版本如下:

private def sendMsg(et:EdgeTriplet[VertexState,Long]) = {
    val m1 = (et.dstId,Map((et.srcAttr.community,et.srcAttr.communitySigmaTot)->et.attr))
   val m2 = (et.srcId,Map((et.dstAttr.community,et.dstAttr.communitySigmaTot)->et.attr))
   Iterator(m1, m2)
}

升级为:

private def sendMsg(et: EdgeContext[VertexState, Long, Map[(Long,Long),Long]]) = {
et.sendToSrc(Map((et.dstAttr.community, et.dstAttr.communitySigmaTot) -> et.attr))
et.sendToDst(Map((et.srcAttr.community, et.srcAttr.communitySigmaTot) -> et.attr))
}

4、在集群运行相关jar的及运行脚本

 

http://www.lryc.cn/news/118301.html

相关文章:

  • 【Node.js】低代码平台源码
  • docker 部署 xxl-job-admin
  • c++(空间配置器)[32]
  • Linux系列之解压文件
  • 为什么重写equals方法时必须重写hashcode方法
  • java导入excel图片处理
  • 【Rust】Rust学习 第四章认识所有权
  • 学习C语言第三天 :关系操作符、逻辑操作符
  • Jenkins自动化打包脚本
  • 一百五十、Kettle——Kettle官网下载地址
  • 使用 Visual Studio Code 调试 CMake 脚本
  • 【云原生】Docker 详解(二):Docker 架构及工作原理
  • 微服务 云原生:基于 Gogs + Drone 实现 CI/CD 自动化
  • ADO.NET之SQL Server
  • Nginx负载均衡(重点)
  • 第一章 SpringBoot入门
  • JavaScript Es6_2笔记 (深入对象 + 内置构造函数 + 包装类型)+包含实例方法
  • 尼科彻斯定理
  • 主数据管理案例-中国外运
  • 改进DevSecOps框架的 5 大关键技术
  • uni-app之app上传pdf类型文件
  • bash: sudo: command not found的解决方法 | 安装sudo
  • 电脑合上盖子无线网络不会断开
  • 【从零开始学习JAVA | 第四十篇】了解线程池
  • axios如何取消请求,其原理是什么?
  • 消息中间件 Asio (C++)
  • 3.4 网络安全管理设备
  • 前端高级面试题-JS
  • AcWing 1564:哈希 ← 只具有正增量的二次探测法
  • 什么是媒体代发布?媒体代发布注意事项