SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践

时间:2023-03-09 08:05:23
SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践

相关内容原文地址:

****:leboop:Spark MLlib协同过滤之交替最小二乘法ALS原理与实践

****:Jantelope:Spark中MLlib中的ALS算法物品推荐代码实现;



一、Spark MLlib算法实现

数据准备:

1,101,5.0

1,102,3.0

1,103,2.5

2,101,2.0

2,102,2.5

2,103,5.0

2,104,2.0

3,101,2.5

3,104,4.0

3,105,4.5

3,107,5.0

4,101,5.0

4,103,3.0

4,104,4.5

4,106,4.0

5,101,4.0

5,102,3.0

5,103,2.0

5,104,4.0

5,105,3.5

5,106,4.0

第一列为用户id(userId),第二列为物品id(itemId),第三列为用户给物品的评分,转换成用户-物品评分矩阵后,如下:

SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践

1.1 显示反馈

Spark MLlib提供了两种API,一种基于RDD的,在spark.mllib下,该API已经进入维护状态,预计在Spark 3.0中放弃维护,最新的是基于DataFrame,该API在spark.ml下。

1.1.1 基于RDD

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession /**
* 基于RDD的ALS API推荐Demo
*/
object ALSCFDemo { // case class Rating(userId: Int, itermId: Int, rating: Float) /**
* 解析数据:将数据转换成Rating对象
* @param str
* @return
*/
def parseRating(str: String): Rating = {
val fields = str.split(",")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
} def main(args: Array[String]): Unit = {
//定义切入点
val spark = SparkSession.builder().master("local").appName("ASL-Demo").getOrCreate()
//读取数据,生成RDD并转换成Rating对象
val ratingsRDD = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating)
//隐藏因子数
val rank=50
//最大迭代次数
val maxIter=10
//正则化因子
val labmda=0.01
//训练模型
val model=ALS.train(ratingsRDD,rank,maxIter,labmda)
//推荐物品数
val proNum=2
//推荐
val r=model.recommendProductsForUsers(proNum)
//打印推荐结果
r.foreach(x=>{
println("用户 "+x._1)
x._2.foreach(x=>{
println(" 推荐物品 "+x.product+", 预测评分 "+x.rating)
println()
}
)
println("===============================")
}
) }
}

运行结果如下:

用户 4
推荐物品 101, 预测评分 4.987222374679642 推荐物品 104, 预测评分 4.498410352539908 ===============================
用户 1
推荐物品 101, 预测评分 4.9941397937874825 推荐物品 104, 预测评分 4.482759123081623 ===============================
用户 3
推荐物品 107, 预测评分 4.9917963612098415 推荐物品 105, 预测评分 4.50190214892064 ===============================
用户 5
推荐物品 101, 预测评分 4.023403087402049 推荐物品 104, 预测评分 3.9938240731866506 ===============================
用户 2
推荐物品 103, 预测评分 4.985059400785903 推荐物品 102, 预测评分 2.4974442131394214 ===============================

均方根误差RESM:

SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践

T为真实值,SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践为预测值。

import com.leboop.mllib.ALSCFDemo.rems
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession /**
* 基于RDD的ALS API推荐Demo
*/
object ALSCFDemo {
/**
* 解析数据:将数据转换成Rating对象
*
* @param str
* @return
*/
def parseRating(str: String): Rating = {
val fields = str.split(",")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
} /**
* @param model 训练好的模型
* @param data 真实数据
* @param n 数据个数
* @return 误差
*/
def rems(model: MatrixFactorizationModel, data: RDD[Rating], n: Long): Double = {
//预测值 Rating(userId,itermId,rating)
val preRDD: RDD[Rating] = model.predict(data.map(d => (d.user, d.product)))
//关联:组成(预测评分,真实评分)
val doubleRating = preRDD.map(
x => ((x.user, x.product), x.rating)
).join(
data.map { x => ((x.user, x.product), x.rating) }
).values
//计算RMES
math.sqrt(doubleRating.map(x => math.pow(x._1 - x._2, 2)).reduce(_ + _) / n)
} def main(args: Array[String]): Unit = {
//定义切入点
val spark = SparkSession.builder().master("local").appName("ASL-Demo").getOrCreate()
//读取数据,生成RDD并转换成Rating对象
val ratingsRDD = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating)
//将数据随机分成训练数据和测试数据(权重分别为0.8和0.2)
val Array(training, test) = ratingsRDD.randomSplit(Array(1, 0))
//隐藏因子数
val rank = 50
//最大迭代次数
val maxIter = 10
//正则化因子
val labmda = 0.01
//训练模型
val model = ALS.train(training, rank, maxIter, labmda)
//计算误差
val remsValue = rems(model, ratingsRDD, ratingsRDD.count)
println("误差: " + remsValue)
}
}

结果如下:

误差:  0.011343969370562474

1.1.2 基于DataFrame

import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.sql.SparkSession /**
* ASL基于DataFrame的Demo
*/
object ALSDFDemo {
case class Rating(userId: Int, itemId: Int, rating: Float)
/**
* 解析数据:将数据转换成Rating对象
* @param str
* @return
*/
def parseRating(str: String): Rating = {
val fields = str.split(",")
assert(fields.size == 3)
Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
} def main(args: Array[String]): Unit = {
//定义切入点
val spark = SparkSession.builder().master("local").appName("ASL-DF-Demo").getOrCreate()
//读取数据,生成RDD并转换成Rating对象
import spark.implicits._
val ratingsDF = spark.sparkContext.textFile("data/ratingdata.csv").map(parseRating).toDF()
//将数据随机分成训练数据和测试数据(权重分别为0.8和0.2)
val Array(training, test) = ratingsDF.randomSplit(Array(0.8, 0.2))
//定义ALS,参数初始化
val als = new ALS().setRank(50)
.setMaxIter(10)
.setRegParam(0.01)
.setUserCol("userId")
.setItemCol("itemId")
.setRatingCol("rating")
//训练模型
val model = als.fit(training) //推荐:每个用户推荐2个物品
val r = model.recommendForAllUsers(2) //关闭冷启动(防止计算误差不产生NaN)
model.setColdStartStrategy("drop")
//预测测试数据
val predictions = model.transform(test) //定义rmse误差计算器
val evaluator = new RegressionEvaluator()
.setMetricName("rmse")
.setLabelCol("rating")
.setPredictionCol("prediction")
//计算误差
val rmse = evaluator.evaluate(predictions) //打印训练数据
training.foreach(x=>println("训练数据: "+x))
//打印测试数据
test.foreach(x=>println("测试数据: "+x))
//打印推荐结果
r.foreach(x=>print("用户 "+x(0)+" ,推荐物品 "+x(1)))
//打印预测结果
predictions.foreach(x=>print("预测结果: "+x))
//输出误差
println(s"Root-mean-square error = $rmse")
}
}

运行结果如下:

训练数据: [1,101,5.0]
训练数据: [1,102,3.0]
训练数据: [1,103,2.5]
训练数据: [2,101,2.0]
训练数据: [2,102,2.5]
训练数据: [2,104,2.0]
训练数据: [3,101,2.5]
训练数据: [3,105,4.5]
训练数据: [3,107,5.0]
训练数据: [4,101,5.0]
训练数据: [4,103,3.0]
训练数据: [4,104,4.5]
训练数据: [4,106,4.0]
训练数据: [5,102,3.0]
训练数据: [5,103,2.0]
训练数据: [5,104,4.0]
训练数据: [5,105,3.5] 测试数据: [2,103,5.0]
测试数据: [3,104,4.0]
测试数据: [5,101,4.0]
测试数据: [5,106,4.0] 用户 1 ,推荐物品 WrappedArray([101,4.98618], [105,3.477826])
用户 3 ,推荐物品 WrappedArray([107,4.9931526], [105,4.499714])
用户 5 ,推荐物品 WrappedArray([104,3.9853115], [105,3.4996033])
用户 4 ,推荐物品 WrappedArray([101,5.000056], [104,4.5001974])
用户 2 ,推荐物品 WrappedArray([105,3.0707152], [102,2.4903712]) 预测结果: [5,101,4.0,3.1271331]
预测结果: [2,103,5.0,1.0486442]
预测结果: [5,106,4.0,1.8420099]
预测结果: [3,104,4.0,1.4847627] Root-mean-square error = 2.615265256309832

1.2 隐式反馈

与显式反馈基本相同,这里需要使用方法setImplicitPrefs()打开隐式反馈,代码如下:

val als = new ALS()
.setMaxIter(5)
.setRegParam(0.01)
.setImplicitPrefs(true)
.setUserCol("userId")
.setItemCol("movieId")
.setRatingCol("rating")

二、Spark中MLlib中的ALS算法物品推荐代码实现;

ALS 是交替最小二乘 (alternating least squares)的简称。

简单说明下原理:

从协同过滤的分类来说,ALS算法属于User-Item CF,也叫做混合CF。它同时考虑了User和Item两个方面。

用户和商品的关系,可以抽象为如下的三元组:<User,Item,Rating>。其中,Rating是用户对商品的评分,表征用户对该商品的喜好程度。

假设我们有一批用户数据,其中包含m个User和n个Item,则我们定义Rating矩阵,其中的元素表示第u个User对第i个Item的评分。

在实际使用中,由于n和m的数量都十分巨大,因此R矩阵的规模很容易就会突破1亿项。这时候,传统的矩阵分解方法对于这么大的数据量已经是很难处理了。

另一方面,一个用户也不可能给所有商品评分,因此,R矩阵注定是个稀疏矩阵。矩阵中所缺失的评分,又叫做missing item。

SparkMLlib—协同过滤之交替最小二乘法ALS原理与实践

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import scala.io.Source
import org.apache.spark.rdd.RDD object BasedItermDemo {
def main(args: Array[String]): Unit = {
//屏蔽日志信息
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//构建sc对象
val conf = new SparkConf().setAppName("ALS iterm").setMaster("local")
val sc = new SparkContext(conf)
val seq = getSeqRDD("D:\\tmp\\ratingdata.txt")
val ratings = sc.parallelize(seq, 1)
val model = ALS.train(ratings, 15, 5, 0.01)
/*
* rating 评分矩阵
* 经验值:
* rank 是模型中隐语义因子的个数 推荐10-200 数据越大越准确,计算也就越复杂
* iteration 模型迭代计算次数10-20 数据越大越准确,计算也就越复杂
* lambda 惩罚函数的因数,是ALS的正则化参数,推荐值:0.01
*/
//获取用户ID 对每个用户 推荐2个商品
val n = ratings.count()
val userID = ratings.map(f=>f.user).distinct().collect().toArray
for(s<-userID){
val products = model.recommendProducts(s, 2)
println("用户ID是:"+s)
for(s<-products){
println("推荐的商品是: "+s.product+"推荐的理由是: "+s.rating)
}
println("**********************")
}
//需要计算当前模型的误差RMSE
val rmse = computeRMSE(model,ratings,n)
println("当前模型误差值是:"+rmse)
} //评分矩阵 ratings需要的是一个rdd 现在需要构建一个rdd,也就是一个序列
def getSeqRDD(path:String):Seq[Rating]={
val data = Source.fromFile(path).getLines().map(f=>f.split(",")match{
case Array(user,product,rat)=>Rating(user.toInt,product.toInt,rat.toDouble)
}).filter(f=>f.rating>0.0) if(data.isEmpty){
println("数据错误")
return null
}else{
data.toSeq
}
}
//定义方法采用当前model 获取数据,误差是多少
//3个参数,一个当前model值 以及 实际评分矩阵值,总共多少条评分数据
def computeRMSE(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double={
/*获取等值条件
* select 计算值,实际值
* from model,data
* where model.(user,product) = data.(user,product)
*/
//等价条件 计算值
val equal = model.predict((data.map(f=>(f.user,f.product))))
//获取 计算值得矩阵
val predictRating = equal.map(f=>((f.user,f.product),f.rating))
//获取实际值得矩阵
val realRating = data.map(f=>((f.user,f.product),f.rating))
//将两个评分矩阵进行合并计算均方根误差
val predictAndReal = predictRating.join(realRating).values
math.sqrt(predictAndReal.map(f=>(f._1-f._2)*(f._1-f._2)).reduce(_+_)/n) } }

结果展示:

用户ID是:4
推荐的商品是: 101推荐的理由是: 4.998812482106746
推荐的商品是: 104推荐的理由是: 4.499347431643257
**********************
用户ID是:1
推荐的商品是: 101推荐的理由是: 4.986825763711995
推荐的商品是: 104推荐的理由是: 3.500596045600693
**********************
用户ID是:3
推荐的商品是: 107推荐的理由是: 4.992594312342743
推荐的商品是: 105推荐的理由是: 4.498912380875409
**********************
用户ID是:5
推荐的商品是: 104推荐的理由是: 4.019718120441245
推荐的商品是: 101推荐的理由是: 4.012689154167877
**********************
用户ID是:2
推荐的商品是: 103推荐的理由是: 4.981664427426299
推荐的商品是: 102推荐的理由是: 2.5151597024240413
**********************
当前模型误差值是:0.01120138424326239

当前代码采用的是经验值,对于大多数模型来书,经验值可以满足条件,模型中的值也可以自己测试

代码如下:

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import scala.io.Source
import org.apache.spark.rdd.RDD object BaedItermCFDemo01 {
def main(args: Array[String]): Unit = {
//屏蔽日志信息
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
//构建sc对象
val conf = new SparkConf().setAppName("ALS iterm").setMaster("local")
val sc = new SparkContext(conf)
val seq = getSeqRDD("D:\\tmp\\ratingdata.txt")
val ratings = sc.parallelize(seq, 1)
//计算商品个数,用户个数,评分个数
val pNum = ratings.map(f=>f.product).distinct().count()
val uNum = ratings.map(f=>f.user).distinct().count()
val rNum = ratings.map(f=>f.rating).count()
//val model1 = ALS.train(ratings, 15, 5, 0.01)
/*
* rating 评分矩阵
* 经验值:
* rank 数据循环训练的此时 设置10-200 数据越大越准确,计算也就越复杂
* iteration 模型迭代计算次数10-20 数据越大越准确,计算也就越复杂
* lambda 惩罚函数的因数,是ALS的正则化参数,推荐值:0.01
*/
//推测最佳模型
val ranks = List(5,15)
val iterations = List(2,5)
val lambdas = List(0.1,1)
// 寻找 最佳模型
var bestModel:Option[MatrixFactorizationModel] = None
var bestRank = 0
var bestIteration = -1
var bestLambda = -1.0
//初始误差,寻找最小误差
var bestRMSE:Double= Double.MaxValue
for(rank<-ranks;iteration<-iterations;lambda<-lambdas){
//寻找最佳模型
val model = ALS.train(ratings, rank, iteration, lambda)
//计算每个数值的误差
val rmse = computeRMSE(model,ratings,rNum)
//判断得出最佳模型
if(rmse<bestRMSE){
bestModel = Some(model)
bestRMSE = rmse
bestRank = rank
bestIteration = iteration
bestLambda = lambda
}
}
println("最佳模型:" + bestModel)
println("最佳的RMSE:"+bestRMSE)
println("最佳的Lambda:"+bestLambda)
println("最佳的迭代次数Iteration:"+bestIteration) } //评分矩阵 ratings需要的是一个rdd 现在需要构建一个rdd,也就是一个序列
def getSeqRDD(path:String):Seq[Rating]={
val data = Source.fromFile(path).getLines().map(f=>f.split(",")match{
case Array(user,product,rat)=>Rating(user.toInt,product.toInt,rat.toDouble)
}).filter(f=>f.rating>0.0) if(data.isEmpty){
println("数据错误")
return null
}else{
data.toSeq
}
}
//定义方法采用当前model 获取数据,误差是多少
//3个参数,一个当前model值 以及 实际评分矩阵值,总共多少条评分数据
def computeRMSE(model:MatrixFactorizationModel,data:RDD[Rating],n:Long):Double={
/*获取等值条件
* select 计算值,实际值
* from model,data
* where model.(user,product) = data.(user,product)
*/
//等价条件 计算值
val equal = model.predict((data.map(f=>(f.user,f.product))))
//获取 计算值得矩阵
val predictRating = equal.map(f=>((f.user,f.product),f.rating))
//获取实际值得矩阵
val realRating = data.map(f=>((f.user,f.product),f.rating))
//将两个评分矩阵进行合并计算均方根误差
val predictAndReal = predictRating.join(realRating).values
math.sqrt(predictAndReal.map(f=>(f._1-f._2)*(f._1-f._2)).reduce(_+_)/n) } }

结果展示:

最佳模型:Some(org.apache.spark.mllib.recommendation.MatrixFactorizationModel@1e57b783)
最佳的RMSE:0.11470480237681456
最佳的Lambda:0.1
最佳的迭代次数Iteration:2