日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > python >内容正文

python

电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统

發(fā)布時間:2024/3/12 python 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

為了學(xué)習(xí)spark,在實驗樓上找到的一個spark入門課程,在此記錄一下學(xué)習(xí)過程。

我使用的Spark版本為Spark 2.2.0, 實驗樓教程使用的是Spark 1.6.1

流程和算法介紹

這個簡單的電影推薦系統(tǒng)是根據(jù)已有用戶對電影的評價系統(tǒng),針對特定用戶輸出其可能會感興趣的電影,構(gòu)成一個簡單的電影推薦系統(tǒng)。

主要步驟

加載數(shù)據(jù)集,解析成特定格式

劃分數(shù)據(jù)集,分為訓(xùn)練集和測試集

利用交替最小二乘法(ALS)算法,訓(xùn)練用戶與電影之間的矩陣模型

基于訓(xùn)練集進行預(yù)測,利用測試集來驗證預(yù)測結(jié)果是否有效。

實際上,上述步驟的第三四步是使用了協(xié)同過濾算法來推薦電影。

引用知乎上的回答解釋協(xié)同過濾

舉個簡單的小例子

我們已知道用戶u1喜歡的電影是A,B,C

用戶u2喜歡的電影是A, C, E, F

用戶u3喜歡的電影是B,D

我們需要解決的問題是:決定對u1是不是應(yīng)該推薦F這部電影。

基于內(nèi)容的做法:要分析F的特征和u1所喜歡的A、B、C的特征,需要知道的信息是A(戰(zhàn)爭片),B(戰(zhàn)爭片),C(劇情片),如果F(戰(zhàn)爭片),那么F很大程度上可以推薦給u1,這是基于內(nèi)容的做法,你需要對item進行特征建立和建模。

協(xié)同過濾的辦法:那么你完全可以忽略item的建模,因為這種辦法的決策是依賴user和item之間的關(guān)系,也就是這里的用戶和電影之間的關(guān)系。我們不再需要知道ABCF哪些是戰(zhàn)爭片,哪些是劇情片,我們只需要知道用戶u1和u2按照item向量表示,他們的相似度比較高,那么我們可以把u2所喜歡的F這部影片推薦給u1。

在Spark MLlib中,協(xié)同過濾算法是通過交替最小二乘法(ALS)實現(xiàn)的,具體算法實現(xiàn)在此并不關(guān)注。

數(shù)據(jù)集

數(shù)據(jù)集來自GroupLens,是一個名為MovieLens的數(shù)據(jù)集的數(shù)據(jù),在此處選擇數(shù)據(jù)量為一百萬條的數(shù)據(jù)集,下載地址

具體代碼和分析

1.導(dǎo)入包

我們需要導(dǎo)入以下包

import org.apache.spark.rdd._

import org.apache.spark.sql._

import org.apache.spark.mllib.recommendation.Rating

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

mllib包是Spark中的機器學(xué)習(xí)包,我們這次導(dǎo)入的有ALS,MatrixFactorizationModel,Rating。ALS即為上文提到的交替最小二乘算法,在Spark中ALS算法的返回結(jié)果為MatrixFactorizationModel類,最后的Rating是Spark定義的評價Model,對應(yīng)于我們數(shù)據(jù)中的Rating.dat中的內(nèi)容,不用用戶再自行定義

然后,我們還需要導(dǎo)入implicits包,這個是Spark中的隱式轉(zhuǎn)換包,可以自動地對一些數(shù)據(jù)類型進行轉(zhuǎn)換,但是這個包需要在代碼中動態(tài)導(dǎo)入

val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()

import spark.implicits._

其中spark為SparkSession類,在Spark 2.2.0中用來代替SparkContext,作為整個程序的入口點

2.數(shù)據(jù)處理

定義電影、用戶數(shù)據(jù)實體類,用來映射對應(yīng)的數(shù)據(jù)

case class Movie(movieId: Int, title: String)

case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

定義解析函數(shù),將數(shù)據(jù)從文件中解析出來

def parseMovieData(data: String): Movie = {

val dataField = data.split("::")

assert(dataField.size == 3)

Movie(dataField(0).toInt, dataField(1))

}

def parseUserData(data: String): User = {

val dataField = data.split("::")

assert(dataField.size == 5)

User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)

}

def parseRatingData(data: String): Rating = {

val dataField = data.split("::")

Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)

}

導(dǎo)入數(shù)據(jù)

var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData).cache()

var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData).cache()

var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData).cache()

3. 訓(xùn)練模型

// convert to DataFrame

val moviesDF = moviesData.toDF()

val usersDF = usersData.toDF()

val ratingsDF = ratingsData.toDF()

// split to data set and test set

val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)

val trainingSetOfRatingsData = tempPartitions(0).cache().rdd

val testSetOfRatingData = tempPartitions(1).cache().rdd

// training model

val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

按7:3的比例將數(shù)據(jù)集分為訓(xùn)練集和驗證集,由于劃分出來的數(shù)據(jù)集為DataSet類型,而ALS算法的run函數(shù)接收的參數(shù)為RDD類型,所以需要將DataSet轉(zhuǎn)換為RDD,方法很簡單,就加上”.rdd"就可以了,如果不轉(zhuǎn)換會報錯

spark_error_5.PNG

訓(xùn)練完之后可以調(diào)用模型進行推薦,比如要給用戶ID為1000的用戶推薦適合TA看的10部電影,就可以執(zhí)行

val recomResult = recomModel.recommendProducts(1000, 10)

結(jié)果如下

運行結(jié)果

返回的結(jié)果包括用戶ID,電影ID,和對應(yīng)的相關(guān)性

如果我們要顯示電影名,可以執(zhí)行以下代碼

val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()

val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))

println(recommendMoviesWithTitle.mkString("\n"))

在Spark老版本中,可以直接使用

val movieTitles = moviesDF.map(array => (array(0), array(1))).collectAsMap()

將moviesDF轉(zhuǎn)換為key為電影ID,value為電影名的map,但是在2.2.0中,如果這樣寫會提示DataSet沒有collectAsMap()方法,錯誤截圖如下

錯誤截圖

經(jīng)過一番搜索后,在StackOverflow上有人提到RDD有collectAsMap()方法,于是就要將moviesDF轉(zhuǎn)換為RDD類型,即上文用到的方法

打印出來的結(jié)果如圖

轉(zhuǎn)換結(jié)果

4.驗證模型

如何知道模型是否正確呢?可以用之前從數(shù)據(jù)集里面劃分出來的驗證集,通過調(diào)用模型得出預(yù)測結(jié)果,與驗證集中的原數(shù)據(jù)進行對比,可以判斷模型的效果如何

val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{

case Rating(user, product, rating) => (user, product)

})

val formatResultOfTestSet = testSetOfRatingData.map{

case Rating(user, product, rating) => ((user, product), rating)

}

val formatResultOfPredictionResult = predictResultOfTestSet.map {

case Rating(user, product, rating) => ((user, product), rating)

}

val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

val MAE = finalResultForComparison.map {

case ((user, product), (ratingOfTest, ratingOfPrediction)) =>

val error = (ratingOfTest - ratingOfPrediction)

Math.abs(error)

}.mean()

在得到測試集的預(yù)測評分結(jié)果之后,我們用 map 操作和 join 操作將它與測試集的原始數(shù)據(jù)組合成為 ((用戶ID, 電影ID), (測試集原有評分, 預(yù)測評分))的格式。這個格式是 Key-Value 形式的,Key 為 (user, product)。我們是要把這里的測試集原有評分與預(yù)測時得到的評分相比較,二者的聯(lián)系就是 user 和 product 相同。

上述代碼中首先調(diào)用模型進行預(yù)測,然后將在測試集上的預(yù)測結(jié)果和測試集本身的數(shù)據(jù)都轉(zhuǎn)換為 ((user,product), rating) 的格式,之后將兩個數(shù)據(jù)組合在一起,計算兩者之間的評價的差值的絕對值,然后求平均值,這種方法叫做計算平均絕對誤差

平均絕對誤差( Mean Absolute Error )是所有單個觀測值與算術(shù)平均值偏差的絕對值的平均。

與平均誤差相比,平均絕對誤差由于離差被絕對值化,不會出現(xiàn)正負相抵消的情況,所以平均絕對誤差能更好地反映預(yù)測值誤差的實際情況。

最終算出的結(jié)果為

image.png

效果還算可以,如果想繼續(xù)優(yōu)化可以通過增加ALS的迭代次數(shù)和特征矩陣的秩來提高準確率

完整代碼

import org.apache.spark.rdd._

import org.apache.spark.sql._

import org.apache.spark.mllib.recommendation.Rating

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

object PredictMovie {

case class Movie(movieId: Int, title: String)

case class User(userId: Int, gender: String, age: Int, occupation: Int, zipCode: String)

def parseMovieData(data: String): Movie = {

val dataField = data.split("::")

assert(dataField.size == 3)

Movie(dataField(0).toInt, dataField(1))

}

def parseUserData(data: String): User = {

val dataField = data.split("::")

assert(dataField.size == 5)

User(dataField(0).toInt, dataField(1).toString, dataField(2).toInt, dataField(3).toInt, dataField(4).toString)

}

def parseRatingData(data: String): Rating = {

val dataField = data.split("::")

Rating(dataField(0).toInt, dataField(1).toInt, dataField(2).toDouble)

}

def main(args: Array[String]){

val spark = SparkSession.builder.master("local").appName("Predict").getOrCreate()

import spark.implicits._

var moviesData = spark.read.textFile("File:///home/hadoop/ml-1m/movies.dat").map(parseMovieData _).cache()

var usersData = spark.read.textFile("File:///home/hadoop/ml-1m/users.dat").map(parseUserData _).cache()

var ratingsData = spark.read.textFile("File:///home/hadoop/ml-1m/ratings.dat").map(parseRatingData _).cache()

// convert to DataFrame

val moviesDF = moviesData.toDF()

val usersDF = usersData.toDF()

val ratingsDF = ratingsData.toDF()

// split to data set and test set

val tempPartitions = ratingsData.randomSplit(Array(0.7, 0.3), 1024L)

val trainingSetOfRatingsData = tempPartitions(0).cache().rdd

val testSetOfRatingData = tempPartitions(1).cache().rdd

// training model

val recomModel = new ALS().setRank(20).setIterations(10).run(trainingSetOfRatingsData)

val recomResult = recomModel.recommendProducts(1000, 10)

println(s"Recommend Movie to User ID 1000")

println(recomResult.mkString("\n"))

val movieTitles = moviesDF.as[(Int, String)].rdd.collectAsMap()

val recommendMoviesWithTitle = recomResult.map(rating =>(movieTitles(rating.product), rating.rating))

println(recommendMoviesWithTitle.mkString("\n"))

val predictResultOfTestSet = recomModel.predict(testSetOfRatingData.map{

case Rating(user, product, rating) => (user, product)

})

val formatResultOfTestSet = testSetOfRatingData.map{

case Rating(user, product, rating) => ((user, product), rating)

}

val formatResultOfPredictionResult = predictResultOfTestSet.map {

case Rating(user, product, rating) => ((user, product), rating)

}

val finalResultForComparison = formatResultOfPredictionResult.join(formatResultOfTestSet)

val MAE = finalResultForComparison.map {

case ((user, product), (ratingOfTest, ratingOfPrediction)) =>

val error = (ratingOfTest - ratingOfPrediction)

Math.abs(error)

}.mean()

println(s"mean error: $MAE")

spark.stop()

}

}

總結(jié)

以上是生活随笔為你收集整理的电影推荐系统 python简书_【记录|Spark】简单的电影推荐系统的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。