尚硅谷大数据技术之电商推荐系统

标签:
itjava培训linux大数据尚硅谷 |
分类: 大数据学科 |

核心代码如下:
//计算商品相似度矩阵
//获取商品的特征矩阵,数据格式 RDD[(scala.Int, scala.Array[scala.Double])]
val productFeatures = model.productFeatures.map{case (productId,features) =>
(productId, new DoubleMatrix(features))
}
// 计算笛卡尔积并过滤合并
val productRecs = productFeatures.cartesian(productFeatures)
.filter{case (a,b) => a._1 != b._1}
.map{case (a,b) =>
val simScore = this.consinSim(a._2,b._2) // 求余弦相似度
(a._1,(b._1,simScore))
}.filter(_._2._2 > 0.6)
.groupByKey()
.map{case (productId,items) =>
ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2)))
}.toDF()
productRecs
.write
.option("uri", mongoConfig.uri)
.option("collection",PRODUCT_RECS)
.mode("overwrite")
.format("com.mongodb.spark.sql")
.save()
其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:
//计算两个商品之间的余弦相似度
def consinSim(product1: DoubleMatrix,
product2:DoubleMatrix) : Double ={
}
4.3.3 模型评估和参数选取
在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。
有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。
在scala/com.atguigu.offline/下新建单例对象ALSTrainer,代码主体架构如下:
def main(args: Array[String]): Unit = {
val config = Map(
"spark.cores" -> "local[*]",
"mongo.uri" -> "mongodb://localhost:27017/recommender",
"mongo.db" -> "recommender"
)
//创建SparkConf
val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))
//创建SparkSession
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
import spark.implicits._
//加载评分数据
val ratingRDD = spark
.read
.option("uri",mongoConfig.uri)
.option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)
.format("com.mongodb.spark.sql")
.load()
.as[ProductRating]
.rdd
.map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()
// 将一个RDD随机切分成两个RDD,用以划分训练集和测试集
val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
val trainingRDD = splits(0)
val testingRDD = splits(1)
//输出最优参数
adjustALSParams(trainingRDD, testingRDD)
//关闭Spark
spark.close()
}
其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:
// 输出最终的最优参数
def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
// 这里指定迭代次数为5,rank和lambda在几个值中选取调整
val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))
yield {
val model = ALS.train(trainData,rank,5,lambda)
val rmse = getRMSE(model, testData)
(rank,lambda,rmse)
}
// 按照rmse排序
println(result.sortBy(_._3).head)
}
计算RMSE的函数getRMSE代码实现如下:
def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={
val userProducts = data.map(item => (item.user,item.product))
val predictRating = model.predict(userProducts)
val real = data.map(item => ((item.user,item.product),item.rating))
val predict = predictRating.map(item => ((item.user,item.product),item.rating))
// 计算RMSE
sqrt(
real.join(predict).map{case ((userId,productId),(real,pre))=>
// 真实值和预测值之间的差
val err = real - pre
err * err
}.mean()
)
}
运行代码,我们就可以得到目前数据的最优模型参数。
第5章 实时推荐服务建设
5.1 实时推荐服务
实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该反映最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好。
用户对物品的偏好随着时间的推移总是会改变的。比如一个用户u 在某时刻对商品p 给予了极高的评分,那么在近期一段时候,u 极有可能很喜欢与商品p 类似的其他商品;而如果用户u 在某时刻对商品q 给予了极低的评分,那么在近期一段时候,u 极有可能不喜欢与商品q 类似的其他商品。所以对于实时推荐,当用户对一个商品进行了评价后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味。
如果实时推荐继续采用离线推荐中的ALS 算法,由于算法运行时间巨大,不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少差别,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验。
另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐的精度要求则可以适当放宽。
所以对于实时推荐算法,主要有两点需求:
(1)用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果;
(2)计算量不大,满足响应时间上的实时或者准实时要求;
5.2 实时推荐模型和代码框架
5.2.1 实时推荐模型算法设计
当用户u 对商品p 进行了评分,将触发一次对u 的推荐结果的更新。由于用户u 对商品p 评分,对于用户u 来说,他与p 最相似的商品们之间的推荐强度将发生变化,所以选取与商品p 最相似的K 个商品作为候选商品。
每个候选商品按照“推荐优先级”这一权重作为衡量这个商品被推荐给用户u 的优先级。
这些商品将根据用户u 最近的若干评分计算出各自对用户u 的推荐优先级,然后与上次对用户u 的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。
具体来说:
首先,获取用户u 按时间顺序最近的K 个评分,记为RK;获取商品p 的最相似的K 个商品集合,记为S;
然后,对于每个商品q
S ,计算其推荐优先级
,计算公式如下: