Spark机器学习(Chapter 04)推荐系统

时间:2023-01-09 22:00:41

代码:

#coding:utf-8
from pyspark import SparkContext

sc = SparkContext("local[4]", "MovieLens Spark App")
rawData=sc.textFile("../02/ml-100k/u.data")
print rawData.first()
rawRating=rawData.map(lambda x:x.split('\t'))
print rawRating.take(5)

#导入Rating、ALS模块,用于后面建模
from pyspark.mllib.recommendation import Rating,ALS
ratings=rawRating.map(lambda x:Rating(int(x[0]),int(x[1]),int(x[2])))
print ratings.take(5)

#模型训练
model=ALS.train(ratings,50,10,0.01)
userFeatures=model.userFeatures()
print userFeatures.take(2)

print model.userFeatures().count()
print model.productFeatures().count()

#进行预测
print len(userFeatures.first()[1])
predictRating = model.predict(789,123)
print predictRating

#要为某个用户生成前K个推荐物品,可借助MatrixFactorizationModel所提供的recommendProducts函数来实现。该函数需两个输入参数:user和num。其中user
#是用户ID,而num是要推荐的物品个数。
topRecs=model.recommendProducts(789,10) #给789用户推荐10部电影
print '给用户userId推荐其喜欢的item:'
for rec in topRecs:
print rec

#检验推荐内容要直观地检验推荐的效果,可以简单比对下用户所评级过的电影和被推荐的那些电影
moviesForUser=ratings.groupBy(lambda x:x.user).mapValues(list).lookup(789)
print '用户对%d部电影进行了评级'%len(moviesForUser[0])
print '源数据中用户(userId=789)喜欢的电影(item):'
for i in sorted(moviesForUser[0],key=lambda x: x.rating,reverse=True):
print i.product

movies=sc.textFile("../02/ml-100k/u.item")
titles=movies.map(lambda line: (int(line.split('|')[0]),line.split('|')[1])).collectAsMap()
print titles[1]

for i,rec in enumerate(topRecs):
print 'rank:'+str(i)+' '+str(titles[rec.product])+':'+str(rec.rating)

#物品的余弦相似度
import numpy as np
def cosineSimilarity(x,y):
return (x.dot(y)/np.linalg.norm(x)*np.linalg.norm(y)) #dot计算矩阵乘积,linalg.norm=sqrt(x*x)
testx=np.array([1.0,2.0,3.0])
print cosineSimilarity(testx,testx)
#下面以itemId = 567为例,计算所有与物品itemId = 567的的相似度,并取其相似度最高的前十个
itemId=567
itemFactor=model.productFeatures().lookup(itemId)[0]
print itemFactor

#计算电影567与其他电影的相似度
sims=model.productFeatures().map(lambda (id,factor):(id,cosineSimilarity(np.array(factor),np.array(itemFactor))))
sims.sortBy(lambda (x,y):y,ascending=False).take(10)

# 均方差(Mean Squared Error,MSE)直接衡量“用户物品”评级矩阵的重建误差。它也是一些模型里所采用的最小化目标函数,特别是许多矩阵分解类方法,比如ALS。因此,它常用于显式评级的情形。
# 它的定义为各平方误差的和与总数目的商。其中平方误差是指预测到的评级与真实评级的差值的平方。下面以用户789为例做讲解。现在从之前计算的moviesForUser这个Ratings集合里找出该用户的第一个评级:

actual=moviesForUser[0][0]
actualRating=actual.rating
print'用户789对电影1012的实际评级',actualRating
predictRating=model.predict(789,actual.product)
print '用户789对电影1012的预测评级',predictRating
squaredError=np.power(actualRating-predictRating,2)
print '实际评级与预测评级的MSE',squaredError

#计算整个数据集上的MSE,需要对每一条(user, movie, actual rating, predictedrating)记录都计算该平均误差,然后求和,再除以总的评级次数
userProducts=ratings.map(lambda rating:(rating.user,rating.product))

print '实际的评分:',userProducts.take(5)
#预测所有用户对电影的相应评分
print model.predictAll(userProducts).collect()[0]
predictions=model.predictAll(userProducts).map(lambda rating:((rating.user,rating.product),rating.rating))
print '预测的评分:',predictions.take(5)
'''
join(otherDataset,[numTasks]):当有两个KV的dataset(K,V)和(K,W),返回的是(K,(V,W))的dataset,numTasks为并发的任务数。
'''
ratingsAndPredictions=ratings.map(lambda rating:((rating.user,rating.product),rating.rating)).join(predictions)
print '组合预测的评分和实际的评分:',ratingsAndPredictions.take(5)

MSE=ratingsAndPredictions.map(lambda ((x,y),(m,n)):np.power(m-n,2)).reduce(lambda x,y:x+y)/ratingsAndPredictions.count()
print '模型的均方误差:',MSE
print '模型的均方根误差:',np.sqrt(MSE)

'''
5.2 K值平均准确率

K值平均准确率(MAP)的意思是整个数据集上的K值平均准确率(Average Precision at Kmetric,MAP)的均值。APK是信息检索中常用的一个指标。它用于衡量针对某个查询所返回的“前K个”文档的平均相关性。对于每次查询,我们会将结果中的前K个与实际相关的文档进行比较。
用MAP指标计算时,结果中文档的排名十分重要。如果结果中文档的实际相关性越高且排名也更靠前,那APK分值也就越高。由此,它也很适合评估推荐的好坏。因为推荐系统也会计算“前K个”推荐物,然后呈现给用户。如果在预测结果中得分更高(在推荐列表中排名也更靠前)的物品实际上也与用户更相关,那自然这个模型就更好。MAP和其他基于排名的指标同样也更适合评估隐式数据集上的推荐。这里用MSE相对就不那么合适。

'''
def avgPrecisionK(actual,predicted,k):
if len(predicted)>k:
predK=predicted[:k];
else:
predK=predicted
score=0.0
numHits=0.0
for i,p in enumerate(predK):
if p in actual:
numHits=numHits+1
score=score+numHits/(i+1)
if not actual:
return 1.0
else:
return score/min(len(actual),k)
actualMovies = [rating.product for rating in moviesForUser[0]]
predictMovies = [rating.product for rating in topRecs]
print '实际的电影:',actualMovies
print '预测的电影:',predictMovies

MAP10 = avgPrecisionK(actualMovies,predictMovies,10)
print MAP10

#全局MAPK的求解要计算对每一个用户的APK得分,再求其平均。这就要为每一个用户都生成相应的推荐列表。

itemFactors=model.productFeatures().map(lambda (id,factor):factor).collect()
itemMatrix=np.array(itemFactors)
print itemMatrix
print itemMatrix.shape

#便于后面进行map计算的时候使所有map都有效,这里在实际当中的意思是把数据广播到任何一个节点
imBroadcast=sc.broadcast(itemMatrix)
'''
现在可以计算每一个用户的推荐。这会对每一个用户因子进行一次map操作。在这个操作里,会对用户因子矩阵和电影因子矩阵做乘积,其结果为一个表示各个电影预计评级的向量(长度为1682,即电影的总数目)。之后,用预计评级对它们排序:
'''
'''
使用MLlib内置的评估函数
RMSE和MSE
前面我们从零开始对模型进行了MSE、RMSE和MAP三方面的评估。这是一段很有用的练习。同样, MLlib下的RegressionMetrics和RankingMetrics类也提供了相应的函数以方便模型评估。
'''
userVector=model.userFeatures().map(lambda (userId,array):(userId,np.array(array)))
usreVector=userVector.map(lambda (userId,x):(userId,imBroadcast.value,dot((np.array(x).transpose))))
userVectorId = userVector.map(lambda (userId,x) : (userId,[(xx,i) for i,xx in enumerate(x.tolist())]))
#print userVectorId.collect()[0]
sortUserVectorId = userVectorId.map(lambda (userId,x):(userId,sorted(x,key=lambda x:x[0],reverse=True)))
sortUserVectorRecId = sortUserVectorId.map(lambda (userId,x): (userId,[xx[1] for xx in x]))
#print sortUserVectorRecId.take(2)
sortUserVectorRecId.count()

userMovies = ratings.map(lambda rating: (rating.user,rating.product)).groupBy(lambda (x,y):x)
print userMovies.take(3)
userMovies = userMovies.map(lambda (userId,x):(userId, [xx[1] for xx in x] ))
#print userMovies.take(1)
#allAPK=sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted, actual)):(userId,avgPrecisionK(actual,predicted,10)))
#print allAPK.map(lambda (x,y):y).reduce(lambda x,y:x+y)/sortUserVectorRecId.count()
allAPK=sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted, actual)):avgPrecisionK(actual,predicted,2000))
# print allAPK.take(10)
print allAPK.reduce(lambda x,y:x+y)/allAPK.count()


from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.mllib.evaluation import RankingMetrics

predictedAndTrue=ratingsAndPredictions.map(lambda ((userId,product),(predicted,actual)):(predicted,actual))

print predictedAndTrue.take(5)
regressionMetrics=RegressionMetrics(predictedAndTrue)
# print "均方误差 = %f"% regressionMetrics.meanSquaredError
# print "均方根误差 = %f"% regressionMetrics.rootMeanSquaredError

sortedLabels = sortUserVectorRecId.join(userMovies).map(lambda (userId,(predicted, actual)):(predicted,actual))
# print sortedLabels.take(1)
rankMetrics = RankingMetrics(sortedLabels)
print "Mean Average Precision = %f" % rankMetrics.meanAveragePrecision
print "Mean Average Precision(at K=10) = %f" % rankMetrics.precisionAt(10)