基于 Spark MLlib 的推荐系统实现
基于 Spark MLlib 的推荐系统实现
本文档将从框架原理、算法深度、代码实践三个维度,全面解析如何使用 Spark MLlib 构建现代推荐系统。
第一部分:Spark MLlib 框架深度解析
1. Spark MLlib 核心概念与演进
Apache Spark MLlib 是 Spark 生态中专为大规模、分布式数据设计的机器学习库。它并非一个单一的组件,而是经历了两个主要发展阶段:
spark.mllib
(基于RDD的API): 这是 MLlib 的初代 API,它直接操作 Spark 的核心数据结构 RDD。它的优点是底层、灵活,但需要开发者手动处理大量数据转换和特征工程细节。本文提供的部分经典算法实现(如User-CF、Item-CF)仍需利用其底层矩阵运算能力。spark.ml
(基于DataFrame的API): 这是新一代的、官方主推的 API。它基于 Spark SQL 的 DataFrame,引入了 Pipeline(管道) 的概念。一个 Pipeline 将机器学习流程中的多个环节(如特征提取、转换、模型训练、评估)串联起来,形成一个标准化的工作流。这使得模型构建、调优和部署过程更加规范、简洁和高效。ALS 算法在spark.ml
包中有更完善的实现。
核心特性总结:
特性 | 详细解析 |
---|---|
分布式计算 | 底层计算引擎为 Spark,能将计算任务拆分到上百甚至上千个节点并行执行,轻松处理传统单机无法承受的TB级数据。 |
高性能 | 充分利用内存计算,通过 RDD 的血统(Lineage)和惰性计算(Lazy Evaluation)机制,避免了不必要的磁盘I/O,比Hadoop MapReduce快10-100倍。 |
易用性 | 提供统一的 Scala, Java, Python, R 语言API。特别是spark.ml 的 Pipeline 机制,极大降低了机器学习工作流的开发复杂度。 |
生态集成 | 与 Spark SQL(数据处理)、GraphX(图计算)、Structured Streaming(流处理)无缝集成,可以构建一站式的“数据清洗->特征工程->模型训练->实时推荐”系统。 |
2. 2025年 MLlib 最新特性深度解读
面向2025年,Spark MLlib 的发展更侧重于自动化、性能极致化、生态融合和可解释性。
- 深度学习集成 (Deep Learning Integration): MLlib 正通过 Project Hydrogen 和 Pandas UDF 等项目,深化与 TensorFlow、PyTorch 的集成。这意味着开发者可以在 Spark 数据预处理管道的末端,无缝地将分布式处理好的数据喂给一个深度学习框架进行模型训练,特别适合构建复杂的深度学习推荐模型(如DeepFM, Wide & Deep)。
- AutoML 支持 (Automated ML): 自动化机器学习是当前热点。MLlib 正在增强其
CrossValidator
和TrainValidationSplit
等工具,并探索更高级的自动化能力,如自动特征工程、模型选择和超参数优化(Hyperparameter Tuning),旨在让非专家用户也能构建出高性能模型。 - 实时推荐 (Real-time Recommendation): 结合 Structured Streaming,MLlib 可以实现端到端的实时推荐。数据流(如用户实时点击行为)进入 Spark,经过流式处理和特征提取后,可以实时调用已训练好的模型(如ALS、GNN)进行预测,并将推荐结果毫秒级地推送给用户。
- GPU 加速 (GPU Acceleration): 通过集成 NVIDIA RAPIDS 加速器,Spark 3.0+ 已经可以在ETL和模型训练中利用GPU进行计算,对于矩阵运算密集型的推荐算法(如ALS、矩阵分解)和深度学习模型,可以带来数量级的性能提升。
- 模型可解释性 (Model Interpretability): “黑盒”模型越来越难以满足业务和合规需求。MLlib 社区正积极集成 SHAP、LIME 等模型解释工具,帮助开发者理解“为什么模型会做出这样的推荐”,增加模型透明度和可信度。
3. MLlib 在推荐系统中的核心优势
优势 | 在推荐场景中的体现 |
---|---|
极致的可扩展性 | 电商、短视频等平台的推荐系统需要处理海量用户行为日志(浏览、点击、购买),MLlib 的分布式特性使其能轻松应对。 |
强大的容错性 | 基于 RDD 的血缘关系,任何计算节点宕机,Spark 都能根据血缘自动恢复丢失的数据分区,保证了长耗时训练任务的稳定性。 |
丰富的算法支持 | 内置了经典的协同过滤(ALS)、逻辑回归(用于CTR预估)、梯度提升树等,同时其开放性也便于开发者实现更前沿的算法(如GNN)。 |
内存计算优化 | 推荐算法涉及大量迭代计算,MLlib 的内存缓存机制 (cache() , persist() ) 可以将中间结果(如用户/物品特征向量)缓存在内存中,极大加速后续迭代。 |
第二部分:协同过滤算法深度解析
1. 协同过滤算法核心理念
协同过滤(Collaborative Filtering, CF)是推荐系统领域的基石。其核心假设是,用户的偏好存在“群体智慧”,可以通过分析群体的行为模式来预测个体的兴趣。
- 用户相似性假设 (User-based): “如果你和我喜欢的东西大致相同,那么我喜欢但你没见过的东西,你可能也会喜欢。”
- 物品相似性假设 (Item-based): “如果大部分喜欢物品A的人也喜欢物品B,那么对于某个喜欢A的用户,我们可以向他推荐B。”
2. 2025年协同过滤算法发展趋势
传统的协同过滤正与前沿技术深度融合,以克服其数据稀疏性、冷启动和表达能力不足等问题。
- 神经网络协同过滤 (Neural Collaborative Filtering, NCF): 放弃了传统的内积(Dot Product)方式来计算用户和物品的交互,转而使用多层神经网络来学习一个更复杂、非线性的交互函数,能够捕捉更深层次的用户-物品关系。
- 图神经网络 (Graph Neural Networks, GNN): 将整个用户-物品交互历史看作一个庞大的异构图(User-Item Bipartite Graph)。通过在图上传播和聚合节点信息,GNN 能够学习到高阶的、携带了图结构信息的特征表示(Embedding),对于挖掘潜在关联和冷启动场景效果显著。
- 多模态融合 (Multimodal Fusion): 推荐不再仅仅依赖评分和点击数据。算法会融合物品的文本描述、封面图片、视频内容等多种模态的信息,通过多模态Embedding技术,极大地丰富了物品的特征表示,提升了推荐的准确度和多样性。
- 因果推荐 (Causal Recommendation): 传统推荐系统容易受到流行度偏见、曝光偏见等影响,做出“相关但不具因果”的推荐。因果推断技术的引入,旨在剥离这些混淆因素,挖掘用户行为背后真正的“因果关系”,从而做出更精准、更有说服力的推荐。
- 联邦学习 (Federated Learning): 在数据隐私法规日益严格的今天,联邦学习允许在不上传用户原始数据的情况下,在用户的设备上进行分布式模型训练。这对于保护用户隐私,同时利用海量数据的协同过滤场景,具有巨大的应用前景。
第三部分:基于 Spark MLlib 的协同过滤实现(Java版)
(一) 基于用户的协同过滤算法 (User-based CF)
1. 算法原理
找到与目标用户兴趣最相似的 “邻居” 用户群体,然后将这些邻居喜欢、但目标用户尚未接触过的物品推荐给他。
2. 算法步骤
- 构建用户-物品评分矩阵 (User-Item Matrix): 一个行为数据的抽象,通常是稀疏的。
- 计算用户相似度 (User Similarity): 使用余弦相似度、皮尔逊相关系数等方法计算用户向量间的相似度。在Spark中,这可以通过
RowMatrix
的columnSimilarities()
高效完成。 - 选择相似用户 (Find Neighbors): 对每个用户,筛选出相似度最高的 K 个用户。
- 生成推荐 (Generate Recommendations): 综合邻居们的评分,预测目标用户对未接触物品的可能评分,公式通常为:
Pred(u, i) = avg_rating(u) + Σ(sim(u, v) * (rating(v, i) - avg_rating(v))) / Σ|sim(u, v)|
。
3. 优缺点分析
- 优点: 简单直观,可解释性强(“因为和你相似的XXX喜欢这个”),能发现和推荐长尾物品。
- 缺点: 在用户量巨大时,用户相似度矩阵的计算成本极高(O(N_users² M_items));数据稀疏性问题严重;对新用户存在冷启动问题。
4. Java 代码实现 (2025 优化版)
以下是原始 User-CF Java 代码的重构和优化版本。主要优化点在于generateRecommendations
方法,避免了在循环中多次调用.collect()
这一严重的性能反模式,转而使用join
等分布式操作,将计算保留在Spark集群中,极大提升了效率和可扩展性。
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import scala.Tuple2;import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;/*** 基于用户的协同过滤推荐算法 (User-based CF)* 2025年优化版本:* 1. 使用更高效的分布式操作(join)生成推荐,避免在Driver端进行高代价循环。* 2. 增强了代码结构和注释。* 3. 优化了Spark配置。*/
public class UserBasedCF_Optimized {private static final Logger logger = Logger.getLogger(UserBasedCF_Optimized.class);public static void main(String[] args) {// 1. 初始化Spark环境Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);SparkConf conf = new SparkConf().setAppName("UserBasedCF_Optimized").setMaster("local[*]").set("spark.sql.adaptive.enabled", "true").set("spark.sql.adaptive.coalescePartitions.enabled", "true");try (JavaSparkContext sc = new JavaSparkContext(conf)) {// 2. 读取并解析数据: (UserID, ItemID, Rating)JavaRDD<String> data = sc.textFile("hdfs://localhost:9000/data/ratingdata.txt");JavaRDD<MatrixEntry> parsedData = data.map(line -> {String[] parts = line.split(",");if (parts.length != 3) {throw new IllegalArgumentException("Invalid data format: " + line);}// MatrixEntry(rowIndex, colIndex, value) -> (UserID, ItemID, Rating)return new MatrixEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1]), Double.parseDouble(parts[2]));}).filter(entry -> entry.value() > 0.0); // 过滤无效评分// 使用cache()将中间结果持久化到内存,加速后续计算parsedData.cache();// 3. 构建坐标矩阵 (CoordinateMatrix)CoordinateMatrix ratingsMatrix = new CoordinateMatrix(parsedData.rdd());// 4. 计算用户相似度// MLLib计算列相似度,所以需要先将 User-Item 矩阵转置为 Item-User 矩阵// 然后计算这个新矩阵的列相似度,即原矩阵的行相似度(用户相似度)RowMatrix userMatrix = ratingsMatrix.transpose().toRowMatrix();CoordinateMatrix userSimilarities = userMatrix.columnSimilarities();// 打印部分用户相似度System.out.println("用户相似度矩阵 (部分):");userSimilarities.entries().toJavaRDD().take(10).forEach(entry ->System.out.printf("用户 %d -> 用户 %d, 相似度: %.4f%n", entry.i(), entry.j(), entry.value()));// 5. 为指定用户生成推荐long targetUserId = 1L;int topN = 5; // 推荐物品数量int topK = 20; // 相似用户数量List<Tuple2<Long, Double>> recommendations = generateRecommendations(sc, parsedData, userSimilarities, targetUserId, topN, topK);System.out.printf("%n为用户 %d 推荐的 Top %d 物品:%n", targetUserId, topN);recommendations.forEach(rec ->System.out.printf("物品ID: %d, 预测评分: %.4f%n", rec._1(), rec._2()));} catch (Exception e) {logger.error("推荐系统运行出错", e);}}/*** 为指定用户生成推荐列表 (分布式优化版)* @param sc JavaSparkContext* @param ratingsRDD 原始评分RDD (userID, itemID, rating)* @param similarities 用户相似度矩阵* @param targetUserId 目标用户ID* @param topN 推荐数量* @param topK 相似用户数量* @return 推荐物品ID和预测评分的列表*/private static List<Tuple2<Long, Double>> generateRecommendations(JavaSparkContext sc,JavaRDD<MatrixEntry> ratingsRDD,CoordinateMatrix similarities,long targetUserId,int topN,int topK) {// 步骤1: 找到与目标用户最相似的K个用户及其相似度// (similarUserID, similarity)Map<Long, Double> topKSimilarUsers = similarities.entries().toJavaRDD().filter(entry -> entry.i() == targetUserId || entry.j() == targetUserId).mapToPair(entry -> {long otherUser = entry.i() == targetUserId ? entry.j() : entry.i();return new Tuple2<>(otherUser, entry.value());}).distinct().top(topK, new TupleComparator()).stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2));// 如果没有相似用户,返回空列表if (topKSimilarUsers.isEmpty()) {return Collections.emptyList();}// 将相似用户Map广播到所有Executorvar broadcastSimilarUsers = sc.broadcast(topKSimilarUsers);// 步骤2: 获取目标用户已评分的物品集合Set<Long> targetUserRatedItems = ratingsRDD.filter(entry -> entry.i() == targetUserId).map(MatrixEntry::j).collect().stream().collect(Collectors.toSet());var broadcastRatedItems = sc.broadcast(targetUserRatedItems);// 步骤3: 获取K个相似用户的所有评分记录// 转换为 (itemID, (similarity * rating, similarity))JavaPairRDD<Long, Tuple2<Double, Double>> weightedRatings = ratingsRDD.filter(entry -> broadcastSimilarUsers.value().containsKey(entry.i())) // 过滤出相似用户的评分.filter(entry -> !broadcastRatedItems.value().contains(entry.j())) // 过滤掉目标用户已评分的物品.mapToPair(entry -> {long similarUserID = entry.i();long itemID = entry.j();double rating = entry.value();double similarity = broadcastSimilarUsers.value().get(similarUserID);// (itemID, (加权评分, 相似度))return new Tuple2<>(itemID, new Tuple2<>(rating * similarity, similarity));});// 步骤4: 聚合计算每个物品的最终预测分// (itemID, Σ(sim*rating) / Σ|sim|)JavaPairRDD<Long, Double> predictedScores = weightedRatings.reduceByKey((t1, t2) -> new Tuple2<>(t1._1() + t2._1(), t1._2() + t2._2())).mapValues(sumTuple -> {double totalWeightedScore = sumTuple._1();double totalSimilarity = sumTuple._2();return totalSimilarity == 0 ? 0 : totalWeightedScore / totalSimilarity;});// 步骤5: 按预测分降序排序,取TopNreturn predictedScores.takeOrdered(topN, new TupleComparator().reversed());}// 自定义比较器用于排序Tuple2static class TupleComparator implements java.io.Serializable, java.util.Comparator<Tuple2<Long, Double>> {@Overridepublic int compare(Tuple2<Long, Double> o1, Tuple2<Long, Double> o2) {return o1._2().compareTo(o2._2());}}
}
(二) 基于物品的协同过滤算法 (Item-based CF)
1. 算法原理
计算物品之间的相似度,然后根据用户过去喜欢的物品,向他推荐与之相似的其他物品。Item-CF 更适合于物品数量相对稳定,但用户数量庞大的场景。
2. 算法步骤
- 构建用户-物品评分矩阵。
- 计算物品相似度 (Item Similarity): 此时,我们将矩阵中的列向量(代表物品)进行比较。在Spark中,直接对User-Item矩阵调用
RowMatrix
的columnSimilarities()
即可。 - 生成推荐: 对于目标用户,遍历他所有评分过的物品,找到每个物品最相似的N个其他物品,然后进行加权求和,预测用户对这些新物品的评分。
3. 优缺点分析
- 优点: 物品相似度相对稳定,可以离线预计算,推荐的实时性好;可解释性强(“因为你喜欢A,所以为你推荐相似的B”);在物品数量远小于用户数时计算效率高。
- 缺点: 无法推荐全新的、与其他物品无关联的物品;对于热门物品,其相似物品也容易是热门的,可能导致推荐多样性降低。
4. Java 代码实现 (Scala 转译)
Java
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.linalg.SparseVector;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRow;
import org.apache.spark.mllib.linalg.distributed.MatrixEntry;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;
import scala.Tuple2;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;/*** 基于物品的协同过滤推荐算法 (Item-based CF)* Scala 代码的Java版本实现*/
public class ItemBasedCF {public static void main(String[] args) {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);SparkConf conf = new SparkConf().setAppName("ItemBasedCF").setMaster("local[*]");try (JavaSparkContext sc = new JavaSparkContext(conf)) {// 1. 读取并解析数据JavaRDD<String> data = sc.textFile("hdfs://localhost:9000/data/ratingdata.txt");JavaRDD<MatrixEntry> parsedData = data.map(line -> {String[] parts = line.split(",");return new MatrixEntry(Long.parseLong(parts[0]), Long.parseLong(parts[1]), Double.parseDouble(parts[2]));});// 2. 构建坐标矩阵并转换为行矩阵CoordinateMatrix ratingsMatrix = new CoordinateMatrix(parsedData.rdd());RowMatrix rowMatrix = ratingsMatrix.toRowMatrix();// 3. 计算物品相似度 (列相似度)CoordinateMatrix similarities = rowMatrix.columnSimilarities();// 转换为 (itemID, (similarItemID, similarityScore)) 的格式,便于后续joinJavaPairRDD<Long, Tuple2<Long, Double>> itemSimilarities = similarities.entries().toJavaRDD().flatMapToPair(entry -> {// 为了双向都能join,生成 (i, (j, sim)) 和 (j, (i, sim))return java.util.Arrays.asList(new Tuple2<>(entry.i(), new Tuple2<>(entry.j(), entry.value())),new Tuple2<>(entry.j(), new Tuple2<>(entry.i(), entry.value()))).iterator();});// 4. 为指定用户生成推荐long targetUserId = 1L;int topN = 2;// 4.1 获取目标用户的评分记录 RDD: (itemID, rating)JavaPairRDD<Long, Double> targetUserRatings = parsedData.filter(entry -> entry.i() == targetUserId).mapToPair(entry -> new Tuple2<>(entry.j(), entry.value()));// 4.2 获取目标用户已评分的物品ID集合Set<Long> ratedItemIds = targetUserRatings.keys().collect().stream().collect(Collectors.toSet());// 4.3 将用户评分与物品相似度进行join// (itemID, (rating, (similarItemID, similarity)))JavaPairRDD<Long, Tuple2<Double, Tuple2<Long, Double>>> joinedRDD = targetUserRatings.join(itemSimilarities);// 4.4 计算每个候选物品的预测分数// (similarItemID, rating * similarity)JavaPairRDD<Long, Double> candidateScores = joinedRDD.mapToPair(tuple -> {double rating = tuple._2._1;long similarItemID = tuple._2._2._1;double similarity = tuple._2._2._2;return new Tuple2<>(similarItemID, rating * similarity);}).filter(pair -> !ratedItemIds.contains(pair._1)); // 过滤掉已评分物品// 4.5 聚合分数并排序List<Tuple2<Long, Double>> recommendations = candidateScores.reduceByKey(Double::sum).sortBy(Tuple2::_2, false, 1).take(topN);System.out.println("********* 推荐的结果是 ***********");recommendations.forEach(r -> System.out.printf("物品ID: %d, 预测分数: %.4f%n", r._1, r._2));}}
}
(三) 基于模型的协同过滤:交替最小二乘法 (ALS)
1. 算法简介
ALS (Alternating Least Squares) 是矩阵分解的一种高效并行化实现,属于模型基协同过滤。它不直接计算用户或物品间的相似度,而是试图学习到能代表用户和物品的隐式特征向量 (Latent Factors)。
核心思想: 庞大稀疏的用户-物品评分矩阵 R (m x n)
,可以被近似分解为两个低秩(rank=k)的小矩阵:用户特征矩阵 U (m x k)
和物品特征矩阵 V (n x k)
的乘积,即 R ≈ U * Vᵀ
。
k
是隐式特征的数量(超参数),远小于m
和n
。U
的每一行代表一个用户的特征向量(如:用户对科幻、爱情、动作等题材的偏好程度)。V
的每一行代表一个物品的特征向量(如:电影在科幻、爱情、动作等题材上的成分)。
用户的评分 R(u, i)
就约等于用户 u
的向量和物品 i
的向量的点积。ALS 算法通过交替固定U
解算V
,再固定V
解算U
的方式,迭代地使 U * Vᵀ
的结果不断逼近真实的 R
,从而最小化预测误差。
2. Java 代码实现 (Scala 转译)
Java
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.mllib.recommendation.ALS;
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import scala.Tuple2;import java.util.Arrays;
import java.util.List;/*** 基于ALS (Alternating Least Squares) 的协同过滤推荐* Scala 代码的Java版本实现*/
public class ALSDemo {public static void main(String[] args) {Logger.getLogger("org.apache.spark").setLevel(Level.ERROR);SparkConf conf = new SparkConf().setAppName("ALSDemo").setMaster("local[*]");try (JavaSparkContext sc = new JavaSparkContext(conf)) {// 1. 读取并解析数据为 RDD<Rating>JavaRDD<String> data = sc.textFile("hdfs://localhost:9000/data/ratingdata.txt");JavaRDD<Rating> ratingsRDD = data.map(line -> {String[] parts = line.split(",");return new Rating(Integer.parseInt(parts[0]), Integer.parseInt(parts[1]), Double.parseDouble(parts[2]));}).filter(rating -> rating.rating() > 0.0);// 将数据缓存ratingsRDD.cache();long numRatings = ratingsRDD.count();// 2. 训练ALS模型// ALS.train(ratings, rank, iterations, lambda)// rank: 特征向量的维度。经验值10-200。// iterations: 迭代次数。经验值10左右。// lambda: 正则化参数,防止过拟合。经验值0.01。int rank = 10;int iterations = 10;double lambda = 0.01;MatrixFactorizationModel model = ALS.train(ratingsRDD.rdd(), rank, iterations, lambda);// 3. 评估模型 (计算RMSE - 均方根误差)double rmse = computeRMSE(model, ratingsRDD);System.out.printf("训练完成,模型的均方根误差 (RMSE) = %.4f%n%n", rmse);// 4. 为指定用户推荐商品int targetUserId = 1;int topN = 2;System.out.printf("为用户 %d 推荐 Top %d 商品:%n", targetUserId, topN);Rating[] recommendations = model.recommendProducts(targetUserId, topN);Arrays.stream(recommendations).forEach(r ->System.out.printf("用户: %d -> 物品: %d, 预测评分: %.4f%n", r.user(), r.product(), r.rating()));// 5. 预测特定用户对特定物品的评分double prediction = model.predict(1, 105);System.out.printf("%n预测用户1对物品105的评分为: %.4f%n", prediction);} catch (Exception e) {e.printStackTrace();}}/*** 计算模型的均方根误差 (RMSE)* @param model 训练好的模型* @param data 原始评分数据* @return RMSE值*/public static double computeRMSE(MatrixFactorizationModel model, JavaRDD<Rating> data) {// 使用模型对数据中的 (user, product) 对进行预测JavaRDD<Tuple2<Object, Object>> userProducts = data.map(r -> new Tuple2<>(r.user(), r.product()));JavaPairRDD<Tuple2<Integer, Integer>, Double> predictions = JavaPairRDD.fromJavaRDD(model.predict(userProducts.rdd()).toJavaRDD().map(r ->new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));// 将真实评分与预测评分进行Join// RDD is ((user, product), (predicted_rating, actual_rating))JavaPairRDD<Tuple2<Integer, Integer>, Tuple2<Double, Double>> predictionsAndRatings =predictions.join(data.mapToPair(r -> new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));// 计算均方误差double mse = predictionsAndRatings.values().mapToDouble(pair -> {double err = pair._1() - pair._2();return err * err;}).mean();return Math.sqrt(mse);}
}