当前位置: 首页 > news >正文

推荐系统-ALS协同过滤算法实现

从协同过滤的分类来说,ALS(Alternating Least Squares,交替最小二乘)算法属于User-Item CF,也叫做混合CF,它同时考虑了User和Item两个方面,通过数量相对少的未被观察到的隐藏因子,来解释大量用户和物品之间潜在联系。ALS基于矩阵分解通过降维的方法来补全用户-物品矩阵,对矩阵中没有出现的值进行估计。

用户和物品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

ALS基本假设:任何一个评分矩阵均可近似分解成两个低维的用户特征矩阵和物品特征矩阵。矩阵分解过程可理解成将用户和物品均抽象的映射到相同的低维潜在特征空间中。因此其基本思想是对稀疏矩阵进行模型分解,评估出缺失项的值,以此来得到一个基本的训练模型,然后依照此模型可以针对新的用户和物品数据进行评估。ALS是采用交替的最小二乘法来算出缺失项,交替最小二乘法是在最小二乘法的基础上发展而来的。

1、spark代码实现

1.1 数据入口

case class ProductRating(userId:Int, productId:Int, score:Double)

/** 训练最好模型输出

  • @param bestModel 模型
  • @param bestRanks 隐含因子
  • @param bestIters 迭代次数
  • @param bestLambdas 惩罚值
  • @param bestRmse 最佳方差值**/
    case class BestModel(bestModel:Option[MatrixFactorizationModel], bestRanks:Int, bestIters:Int, bestLambdas:Double, bestRmse:Double)

def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("ALSTrainer")//创建sparkSessionval spark = SparkSession.builder().config(sparkConf).getOrCreate()//加载数据,作为rating, rdd需要应用aslval ratingRDD = getDFFromCass(spark, "cdp", "t_user_item_rating").as[ProductRating].rdd.map(rating => Rating(rating.userId, rating.productId, rating.score))//数据切分为训练集合测试集val splits = ratingRDD.randomSplit(Array(0.8, 0.2))val trainingRDD = splits(0)val testingRDD = splits(1)//核心实现,输出最优参数val bestModel = RmseUtil.predictBestRmse(trainingRDD, testingRDD)println("bestModel" + bestModel.bestRmse)val itemRecs = recommender(spark, ratingRDD, 10)//output result to cassandrasaveToCass(itemRecs.toDF(), "cdp", "t_user_recs")spark.stop()}

1.2 数据加载

我们使用cassandra大数据库,实现数据的输入与存储;

 def saveToCass(saveDF: DataFrame, keyspace: String, tableName: String): Unit = {saveDF.write.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> keyspace, "table" -> tableName)).mode(SaveMode.Append).option("spark.cassandra.output.consistency.level", "ONE").save()}def getDFFromCass(spark: SparkSession, keyspace: String, tableName: String): DataFrame = {val userItemDF = spark.read.format("org.apache.spark.sql.cassandra").options(Map("keyspace" -> keyspace, "table" -> tableName)).load().toDF("userId", "itemId", "rating")userItemDF}

1.3 基于spark mllib物品推荐

建立ALS算法模型,设置模型参数(通过模型参数评估获得最优解),调用recommendProductsForUsers方法为用户推荐指定数量的物品。

  def recommender(spark: SparkSession, ratingRDD: RDD[Rating],  recommendNum: Int): DataFrame={val splits = ratingRDD.randomSplit(Array(0.8, 0.2))val trainRDD= splits(0)val testRDD = splits(1)//建立ALS推荐模型val model = new ALS().setRank(5).setIterations(20).setLambda(0.01).setImplicitPrefs(false).setUserBlocks(-1).setProductBlocks(-1)//设置ratingRDD为所有用户推荐.run(trainRDD)val testUsersProductRDD = testRDD.map { case Rating(user, product, rate) => (user, product) }//得到预测评分的数据集val predictionRDD = model.predict(testUsersProductRDD).map {case Rating(user, product, rate) => ((user, product), rate)}//真实评分数据集与预测评分数据集进行合并val ratesAndPreds = testRDD.map { case Rating(user, product, rate) => ((user, product), rate) }.join(predictionRDD)//计算RMSE,这里的r1就是真实结果,r2就是预测结果val MSE = ratesAndPreds.map {case ((user, product), (r1, r2)) =>val err = (r1 - r2)err * err}.mean()println("Mean Squared Error = " + MSE)//用户推荐recommendNum个商品val userSubsetRecs = model.recommendProductsForUsers(recommendNum)//推荐商品列表val itemRecDF = userSubsetRecs.toDF("userId", "recommends")itemRecDF.show(5)itemRecDF}

1.4 模型参数评估

预测模型评估,预测出最好的模型参数BestModel

object RmseUtil {/*** 训练集合* @param trainingData 训练集合* @param testingData 测试集合* @return*/def predictBestRmse(trainingData:RDD[Rating], testingData:RDD[Rating]): BestModel = {var bestModel: Option[MatrixFactorizationModel] = Nonevar bestRanks = -1var bestIters = 0var bestLambdas = -1.0var bestRmse = Double.MaxValue//多重迭代法求最佳参数模型//迭代次数val numIters = List(5, 10, 20)//隐含因子val numRanks = List(8, 10, 12)//惩罚值(正则化值)val numLambdas = List(0.01, 0.1, 1)//共3*3*3种组合,每种组合迭代次数又不一样,在此会消耗大量时间for (rank <- numRanks; iter <- numIters; lambdas <- numLambdas) {//als参数为 训练集合 隐含因子 迭代次数 惩罚因子val model = ALS.train(trainingData, rank, iter, lambdas)val validationRmse = rmseComputer(model, testingData)//逐步迭代if (validationRmse < bestRmse) {bestModel = Some(model)bestRmse = validationRmsebestIters = iterbestLambdas = lambdasbestRanks = rank}}BestModel(bestModel, bestRanks, bestIters, bestLambdas, bestRmse)}
}/**** @param model       训练模型* @param dataOfTest  用于测试数据集合(一般是笛卡尔积)* @return*/def rmseComputer(model: MatrixFactorizationModel, dataOfTest: RDD[Rating]):Double= {//预测评分矩阵:预测返回结果<user product rating>val predictResult = model.predict(dataOfTest.map(item => (item.user, item.product)))//将预测值和测试值组成一个map然后比较预测的评分值和实际值val predict = predictResult.map(item => ((item.user, item.product), item.rating))val actual = dataOfTest.map(item => ((item.user, item.product), item.rating))val predJoinPrevActual = predict.join(actual).values//直接调用回归库函数需要传入一个(prediction,actualValue)val evaluator = new RegressionMetrics(predJoinPrevActual)evaluator.meanAbsoluteError}
http://www.lryc.cn/news/103724.html

相关文章:

  • QT第三讲
  • Linux内核的I2C驱动框架详解------这应该是我目前600多篇博客中耗时最长的一篇博客
  • 【点云处理教程】05-Python 中的点云分割
  • 代码随想录算法训练营之JAVA|第十七天| 654. 最大二叉树
  • C++重写函数、隐藏函数、重载函数的区别对比
  • 15.python设计模式【函数工厂模式】
  • Redis主从复制、哨兵、cluster集群原理+实验
  • 微信小程序如何实现页面传参?
  • OPC DA 客户端与服务器的那点事
  • Java 错误异常介绍(Exceptions)
  • 每日一题——旋转数组的最小数字
  • SpringBoot Jackson 日期格式化统一配置
  • 剑指 Offer 38. 字符串的排列 / LeetCode 47. 全排列 II(回溯法)
  • 【前端知识】React 基础巩固(四十三)——Effect Hook
  • 一百三十八、ClickHouse——使用clickhouse-backup备份ClickHouse库表
  • 【无标题】使用Debate Dynamics在知识图谱上进行推理(2020)7.31
  • windows下若依vue项目部署
  • 【目标检测】基于yolov5的水下垃圾检测(附代码和数据集,7684张图片)
  • P1734 最大约数和
  • Excel将单元格中的json本文格式化
  • Baumer工业相机堡盟工业相机如何通过BGAPI SDK获取相机当前实时帧率(C#)
  • XGBoost的基础思想与实现
  • 【Docker】Docker的服务更新与发现
  • 【Docker 学习笔记】Docker架构及三要素
  • matlab编程实践14、15
  • C++ ——STL容器【list】模拟实现
  • ubuntu 16.04 安装mujoco mujoco_py gym stable_baselines版本问题
  • 自然语言处理(NLP)技术
  • 如何将ubuntu LTS升级为Pro
  • 如何学习ARM嵌入式开发?