spark之CF协同过滤
一)、協(xié)同過濾
1.1 概念
協(xié)同過濾是一種借助"集體計(jì)算"的途徑。它利用大量已有的用戶偏好來估計(jì)用戶對其未接觸過的物品的喜好程度。其內(nèi)在思想是相似度的定義
1.2 分類
1.在基于用戶的方法的中,如果兩個(gè)用戶表現(xiàn)出相似的偏好(即對相同物品的偏好大體相同),那就認(rèn)為他們的興趣類似。要對他們中的一個(gè)用戶推薦一個(gè)未知物品,
?? 便可選取若干與其類似的用戶并根據(jù)他們的喜好計(jì)算出對各個(gè)物品的綜合得分,再以得分來推薦物品。其整體的邏輯是,如果其他用戶也偏好某些物品,那這些物品很可能值得推薦。
? 2. 同樣也可以借助基于物品的方法來做推薦。這種方法通常根據(jù)現(xiàn)有用戶對物品的偏好或是評級情況,來計(jì)算物品之間的某種相似度。 這時(shí),相似用戶評級相同的那些物品會(huì)被認(rèn)為更相近。一旦有了物品之間的相似度,便可用用戶接觸過的物品來表示這個(gè)用戶,然后找出和這些已知物品相似的那些物品,并將這些物品推薦給用戶。同樣,與已有物品相似的物品被用來生成一個(gè)綜合得分,而該得分用于評估未知物品的相似度。
二)、矩陣分解
Spark推薦模型庫當(dāng)前只包含基于矩陣分解(matrix factorization)的實(shí)現(xiàn),由此我們也將重點(diǎn)關(guān)注這類模型。它們有吸引人的地方。首先,這些模型在協(xié)同過濾中的表現(xiàn)十分出色。而在Netflix Prize等知名比賽中的表現(xiàn)也很拔尖
1,顯式矩陣分解
?? 要找到和“用戶 物品”矩陣近似的k維(低階)矩陣,最終要求出如下兩個(gè)矩陣:一個(gè)用于表示用戶的U × k維矩陣,以及一個(gè)表示物品的I × k維矩陣。這兩個(gè)矩陣也稱作因子矩陣。它們的乘積便是原始評級矩陣的一個(gè)近似。值得注意的是,原始評級矩陣通常很稀疏,但因子矩陣卻是稠密的。
特點(diǎn):因子分解類模型的好處在于,一旦建立了模型,對推薦的求解便相對容易。但也有弊端,即當(dāng)用戶和物品的數(shù)量很多時(shí),其對應(yīng)的物品或是用戶的因子向量可能達(dá)到數(shù)以百萬計(jì)。這將在存儲(chǔ)和計(jì)算能力上帶來挑戰(zhàn)。另一個(gè)好處是,這類模型的表現(xiàn)通常都很出色。
2,隱式矩陣分解(關(guān)聯(lián)因子分確定,可能隨時(shí)會(huì)變化)
隱式模型仍然會(huì)創(chuàng)建一個(gè)用戶因子矩陣和一個(gè)物品因子矩陣。但是,模型所求解的是偏好矩陣而非評級矩陣的近似。類似地,此時(shí)用戶因子向量和物品因子向量的點(diǎn)積所得到的分?jǐn)?shù),也不再是一個(gè)對評級的估值,而是對某個(gè)用戶對某一物品偏好的估值(該值的取值雖并不嚴(yán)格地處于0到1之間,但十分趨近于這個(gè)區(qū)間)
3,最小二乘法(Alternating Least Squares ? ?ALS):解決矩陣分解的最優(yōu)化方法
? ALS的實(shí)現(xiàn)原理是迭代式求解一系列最小二乘回歸問題。在每一次迭代時(shí),固定用戶因子矩陣或是物品因子矩陣中的一個(gè),然后用固定的這個(gè)矩陣以及評級數(shù)據(jù)來更新另一個(gè)矩陣。之后,被更新的矩陣被固定住,再更新另外一個(gè)矩陣。如此迭代,直到模型收斂(或是迭代了預(yù)設(shè)好的次數(shù))。
三)、Spark下ALS算法的應(yīng)用
1,數(shù)據(jù)來源電影集ml-100k
2,代碼實(shí)現(xiàn)?
??????spark中ALS是居于矩陣分解方法,既不是user_base也不是items_base,下面中的代碼注釋是有問題的,這里是轉(zhuǎn)載的? https://www.cnblogs.com/ksWorld/p/6808092.html?
?
package com.spark.milb.study
import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.{SparkConf, SparkContext}
import org.jblas.DoubleMatrix
/**
* Created by hadoop on 17-5-3.
* 協(xié)同過濾(處理對象movie,使用算法ALS:最小二乘法(實(shí)現(xiàn)用戶推薦)
* 余弦相似度實(shí)現(xiàn)商品相似度推薦
*/
object cfTest {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val conf=new SparkConf().setMaster("local").setAppName("AlsTest")
val sc=new SparkContext(conf)
CF(sc,"ml-100k/u.data")
}
def CF(sc:SparkContext,fileName:String): Unit ={
val movieFile=sc.textFile(fileName)
val RatingDatas=movieFile.map(_.split("\t").take(3))
//轉(zhuǎn)為Ratings數(shù)據(jù)
val ratings=RatingDatas.map(x =>Rating(x(0).toInt,x(1).toInt,x(2).toDouble))
//獲取用戶評價(jià)模型,設(shè)置k因子,和迭代次數(shù),隱藏因子lambda,獲取模型
/*
* ? rank :對應(yīng)ALS模型中的因子個(gè)數(shù),也就是在低階近似矩陣中的隱含特征個(gè)數(shù)。因子個(gè)
數(shù)一般越多越好。但它也會(huì)直接影響模型訓(xùn)練和保存時(shí)所需的內(nèi)存開銷,尤其是在用戶
和物品很多的時(shí)候。因此實(shí)踐中該參數(shù)常作為訓(xùn)練效果與系統(tǒng)開銷之間的調(diào)節(jié)參數(shù)。通
常,其合理取值為10到200。
iterations :對應(yīng)運(yùn)行時(shí)的迭代次數(shù)。ALS能確保每次迭代都能降低評級矩陣的重建誤
差,但一般經(jīng)少數(shù)次迭代后ALS模型便已能收斂為一個(gè)比較合理的好模型。這樣,大部分
情況下都沒必要迭代太多次(10次左右一般就挺好)。
lambda :該參數(shù)控制模型的正則化過程,從而控制模型的過擬合情況。其值越高,正則
化越嚴(yán)厲。該參數(shù)的賦值與實(shí)際數(shù)據(jù)的大小、特征和稀疏程度有關(guān)。和其他的機(jī)器學(xué)習(xí)
模型一樣,正則參數(shù)應(yīng)該通過用非樣本的測試數(shù)據(jù)進(jìn)行交叉驗(yàn)證來調(diào)整。
* */
val model=ALS.train(ratings,50,10,0.01)
//基于用戶相似度推薦
println("userNumber:"+model.userFeatures.count()+"\t"+"productNum:"+model.productFeatures.count())
//指定用戶及商品,輸出預(yù)測值
println(model.predict(789,123))
//為指定用戶推薦的前N商品
model.recommendProducts(789,11).foreach(println(_))
//為每個(gè)人推薦前十個(gè)商品
model.recommendProductsForUsers(10).take(1).foreach{
case(x,rating) =>println(rating(0))
}
//基于商品相似度(使用余弦相似度)進(jìn)行推薦,獲取某個(gè)商品的特征值
val itemFactory=model.productFeatures.lookup(567).head
val itemVector=new DoubleMatrix(itemFactory)
//求余弦相似度
val sim=model.productFeatures.map{
case(id,factory)=>
val factorVector=new DoubleMatrix(factory)
val sim=cosineSimilarity(factorVector,itemVector)
(id,sim)
}
val sortedsim=sim.top(11)(Ordering.by[(Int,Double),Double]{
case(id,sim)=>sim
})
println(sortedsim.take(10).mkString("\n"))
//模型評估,通過均誤差
//實(shí)際用戶評估值
val actualRatings=ratings.map{
case Rating(user,item,rats) => ((user,item),rats)
}
val userItems=ratings.map{
case(Rating(user,item,rats)) => (user,item)
}
//模型的用戶對商品的預(yù)測值
val predictRatings=model.predict(userItems).map{
case(Rating(user,item,rats)) =>((user,item),rats)
}
//聯(lián)合獲取rate值
val rates=actualRatings.join(predictRatings).map{
case x =>(x._2._1,x._2._2)
}
//求均方差
val regressionMetrics=new RegressionMetrics(rates)
//越接近0越佳
println(regressionMetrics.meanSquaredError)
//全局平均準(zhǔn)確率(MAP)
val itemFactors = model.productFeatures.map { case (id, factor)
=> factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
//分布式廣播商品的特征矩陣
val imBroadcast = sc.broadcast(itemMatrix)
//計(jì)算每一個(gè)用戶的推薦,在這個(gè)操作里,會(huì)對用戶因子矩陣和電影因子矩陣做乘積,其結(jié)果為一個(gè)表示各個(gè)電影預(yù)計(jì)評級的向量(長度為
//1682,即電影的總數(shù)目)
val allRecs = model.userFeatures.map{ case (userId, array) =>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
val recommendedIds = sortedWithId.map(_._2 + 1).toSeq //+1,矩陣從0開始
(userId, recommendedIds)
}
//實(shí)際評分
val userMovies = ratings.map{ case Rating(user, product, rating) =>
(user, product)}.groupBy(_._1)
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case
(userId, (predicted, actualWithIds)) =>
val actual = actualWithIds.map(_._2)
(predicted.toArray, actual.toArray)
}
//求MAP,越大越好吧
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)
}
//余弦相似度計(jì)算
def cosineSimilarity(vec1:DoubleMatrix,vec2:DoubleMatrix):Double={
vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())
}
}
?
以下轉(zhuǎn)載:http://blog.51cto.com/snglw/1662153
總結(jié)
??????這樣,一個(gè)簡單的基于模型的電影推薦應(yīng)用就算OK了。
??實(shí)時(shí)推薦架構(gòu)分析
????????上面,實(shí)現(xiàn)了簡單的推薦系統(tǒng)應(yīng)用,但是,僅僅實(shí)現(xiàn)用戶的定向推薦,在實(shí)際應(yīng)用中價(jià)值不是非常大,如果體現(xiàn)價(jià)值,最好能夠?qū)崿F(xiàn)實(shí)時(shí)或者準(zhǔn)實(shí)時(shí)推薦。
????????下面,簡單介紹下實(shí)時(shí)推薦的一個(gè)架構(gòu):
????????
????????
????????該架構(gòu)圖取自淘寶Spark On Yarn的實(shí)時(shí)架構(gòu),這里,給出一些個(gè)人的觀點(diǎn):
? ? ? ? 架構(gòu)圖分為三層:離線、近線和在線。
????????????離線部分:主要實(shí)現(xiàn)模型的建立。原始數(shù)據(jù)通過ETL加工清洗,得到目標(biāo)數(shù)據(jù),目標(biāo)業(yè)務(wù)數(shù)據(jù)結(jié)合合適的算法,學(xué)習(xí)訓(xùn)練模型,得到最佳的模型。
????????????近線部分:主要使用HBase存儲(chǔ)用戶行為信息,模型混合系統(tǒng)綜合顯性反饋和隱性反饋的模型處理結(jié)果,將最終的結(jié)果推薦給用戶。
????????????在線部分:這里,主要有兩種反饋,顯性和隱性,個(gè)人理解,顯性反饋理解為用戶將商品加入購物車,用戶購買商品這些用戶行為;隱性反饋理解為用戶在某個(gè)商品上停留的時(shí)間,用戶點(diǎn)擊哪些商品這些用戶行為。這里,為了實(shí)現(xiàn)實(shí)時(shí)/準(zhǔn)實(shí)時(shí)操作,使用到了Spark Streaming對數(shù)據(jù)進(jìn)行實(shí)時(shí)處理。(有可能是Flume+Kafka+Spark Streaming架構(gòu))
總結(jié)
以上是生活随笔為你收集整理的spark之CF协同过滤的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何用java语言调用tensorflo
- 下一篇: eclipse+scala+java+m