日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark项目实践--基于 TMDB 数据集的电影数据分析

發布時間:2023/12/31 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark项目实践--基于 TMDB 数据集的电影数据分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

基于 TMDB 數據集的電影數據分析

  • 一、環境搭建
  • 二、數據預處理
  • 三、使用 Spark 將數據轉為 DataFrame
  • 四、使用 Spark 進行數據分析并可視化
    • 1.單獨分析
    • 2.字段之間的關系分析
  • 五,結語

一、環境搭建

從假設裸機,環境搭建開始,具體環境搭建操作大體流程如下,具體詳細流程點擊查看另一篇博客:spark環境搭建
大體流程:
(1)安裝Linux操作系統:比如可以安裝Ubuntu 16.04
(2)安裝Hadoop:需要在Linux系統上安裝Hadoop
(3)安裝Spark:需要在Linux系統上安裝Spark
(4)最后為了方便編寫代碼,實現Linux與Windows下的pycharm對接。

本次實驗環境:
pycharm2019專業版
Ubuntu16.04
pip3=10.0.1
pyspark3.0.2

二、數據預處理

環境搭建完成之后,開始做項目。
首先數據集介紹:
本次項目使用的數據集來自數據網站 Kaggle 的 tmdb-movie-metadata 電影數據集,該數據集包含大約 5000 部電影的相關數據。數據包含以下字段:

上圖中可以看出數據中某些字段包含 json 數據,因此直接使用 DataFrame 進行讀取會出現分割錯誤,所以如果要創建 DataFrame,需要先直接讀取文件生成 RDD,再將 RDD 轉為 DataFrame。過程中,還需要使用 python3 中的 csv 模塊對數據進行解析和轉換。

為了更方便的對 csv 文件轉化的 RDD 進行處理,需要首先去除csv文件的標題行。完成后,將處理好的文件 tmdb_5000_movies.csv 存儲到 HDFS 上方便進一步的處理,使用下面命令將文件上傳至 HDFS:

hdfs dfs -put tmdb_5000_movies.csv

問題一
如果上傳過程中遇到如下情況:hdfs: command not found
原因是沒有在bin目錄之下設置路徑,解決:
1.sudo vi /etc/profile打開原文件
2.在文件最后新起一行添加路徑export PATH=/usr/local/hadoop/bin:$PATH
其中這里的/usr/local/hadoop/bin是你的hdfs所在的目錄,一定看清楚。
3.最后要使文件生效,這個一定不要忘了,不然文件也是白配置。命令:source /etc/profile

這時文件已經被上傳至HDFS中

三、使用 Spark 將數據轉為 DataFrame

打開Windows下的pycharm,連接到Linux,開始編程
因為讀入的 csv 文件是結構化的數據,因此可以將數據創建為 DataFrame 方便進行分析。
上面也說過,為了創建DataFrame ,要將HDFS上的數據加載為RDD,由RDD轉為DataFrame ,具體代碼以及解析如下:

''' csv --> HDFS--> RDD --> DataFrame 1.創建 SparkSession 和 SparkContext 對象。 2.為 RDD 轉為 DataFrame 制作表頭 (schema)。schema 是一個 StructType 對象, 該對象使用一個 StructField 數組創建。 每一個 StructField 都代表結構化數據中的一個字段,構造 StructField 需要 3 個參數 1. 字段名稱 2. 字段類型 3. 字段是否可以為空 ''' #這里運行之前的環境配置,有些不配置不會出錯,不過由于不是直接在Linux里寫的,而是對接的pycharm中,所以可能會出現環境錯誤,帶上這幾行代碼以防萬一 import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" #調用包 from pyspark import SparkContext from pyspark.sql import SparkSession, Row from pyspark.sql.types import StringType, StructField, StructType import json # 用于后面的流程 import csv import matplotlib.pyplot as plt#創建兩個對象 sc = SparkContext('local','spark_project') sc.setLogLevel('WARN') spark = SparkSession.builder.getOrCreate()# 根據數據集表格制作表頭 schemaString = "budget,genres,homepage,id," \"keywords,original_language," \"original_title,overview,popularity," \"production_companies,production_countries," \"release_date,revenue,runtime,spoken_languages," \"status,tagline,title,vote_average,vote_count"fields = [StructField(field,StringType(),True)for field in schemaString.split(',')]schema = StructType(fields)# 3. 開始創建用于轉為 DataFrame 的 RDD。這個過程首先sc.textFile讀取 HDFS 上的數據文件,然后為了將 RDD 轉為 DataFrame,還需要使用lambda函數將數據的每一行轉為一個 Row 對象。 #lambda函數內部:對于每一行用逗號分隔的數據,(從里到外剖析) # 1.csv.reader:使用 csv 模塊進行解析并轉為 Row 對象,得到可以轉為 DataFrame 的 RDD #2.使用 next 函數將迭代器中的數據讀取到數組 #3.使用 * 將數組轉為 Row 對象的構造函數參數,創建 Row 對象 moviesRdd = sc.textFile('tmdb_5000_movies.csv').map(lambda line: Row(*next(csv.reader([line]))))#使用準備好的表頭 (schema) 和 RDD 創建 DataFrame createDataFrame 創建 DataFrame mdf = spark.createDataFrame(moviesRdd, schema) #mdf.show() #至此完成 DataFrame 的創建,show查看最終內容 #print(mdf.collect())

以上數據預處理結束。最后可以打印出來查看數據結構,進行下面的數據分析。

四、使用 Spark 進行數據分析并可視化

接下來使用上述處理完成后得到的 DataFrame mdf 進行數據分析,首先對數據中的主要字段單獨進行分析,然后再分析不同字段之間的關系。
為了方便使用matplotlib進行數據可視化,對于每個字段不同的分析,都將分析結果導出為json文件保存之后,對該文件進行數據可視化。
接下來的每個分析都需要進行保存,所以提前定義一個save函數,直接調用即可:

def save(path, data):with open(path, 'w') as f:f.write(data)

注意!!!下列代碼中所有的save有關的函數如果被注釋掉的請打開,否則存儲不了的話,可視化分析是不可以的。

1.單獨分析

(1)分析數據集中電影的體裁分布

  • 數據集中可以看出,電影的體裁字段是一個 json 格式的數據,所以,要統計不同體裁的電影的數量,首先需要解析這個字段的 json 數據,從中取出每個電影對應的體裁數組,然后使用詞頻統計的方法,根據相同key相加,統計不同體裁出現的頻率,即可得到電影的體裁分布。
#選擇體裁字段,filter篩選出數據中該字段為空的不要,使用map與lambda函數結合, madff = mdf.select("genres").filter(mdf["genres"] != '').rdd.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g["genres"]))])\.repartition(1).reduceByKey(lambda x, y: x + y) #print(mdf.collect()) #建議每打出一行代碼就運行一下看看結果 res =madff.collect() #print(list(map(lambda v: {"genre": v[0], "count": v[1]}, res))) countByGenres=list(map(lambda v: {"genre": v[0], "count": v[1]}, res)) print(countByGenres) #save('/home/hadoop/test/TMDB/genres.json', json.dumps(countByGenres)) #不可跨平臺保存,注意路徑,要是虛擬機的路徑 #下面畫圖之前先讀取保存的文件數據 with open('/home/hadoop/test/TMDB/genres.json', 'r') as f:data=json.load(f) print(data) #取出畫圖需要的x,y軸 x=list(map(lambda x:x['genre'],data)) #python自帶的map與RDD中map區分開,python中map自動遍歷值 y=list(map(lambda x:x['count'],data)) print(x) print(y) plt.bar(x,y,0.5) plt.show()

(2) 前 100 個常見關鍵詞

  • 該項分析電影關鍵詞中出現頻率最高的前一百個。字段也是 json 格式數據,因此也是進行頻率統計,統計結果進行降序排序并取前 100 項。
#進行頻率統計,同時對于統計結果進行降序排序并取前 100 項,套路如上,選擇字段--篩選出無用值--詞頻統計 keyword = mdf.select("keywords").filter(mdf["keywords"] != '').rdd.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g["keywords"]))])\.repartition(1).reduceByKey(lambda x, y: x + y) res1 = keyword.sortBy(lambda x: -x[1]).take(100) countByKeywords=list(map(lambda v: {"x": v[0], "value": v[1]}, res1)) print(countByKeywords) save('/home/hadoop/test/TMDB/keywords.json', json.dumps(countByKeywords))from wordcloud import WordCloud with open('/home/hadoop/test/TMDB/keywords.json', 'r') as f:data=json.load(f) print(data) x=list(map(lambda x:x['x'],data))#python自帶的map與RDD中map區分開,python中map自動遍歷值 save('/home/hadoop/test/TMDB/keywordswc.json',json.dump(x)) #將單詞存儲為一個文件 跟上面的分兩個批次執行,一個處理文本,一個將文本畫詞云 with open('/home/hadoop/test/TMDB/keywordswc.json', 'r') as f:datawc=json.load(f) print(datawc) datawc1=",".join(datawc) #list轉String,因為list類型不可用于詞云畫圖 wc = WordCloud(background_color="white",\width = 800,\height = 600,\).generate(datawc1) #expected string or bytes-like object wc.to_file('keywords.png') # 保存圖片 plt.imshow(wc) # 用plt顯示圖片 plt.axis('off') # 不顯示坐標軸 plt.show()

可視化結果:

(3) 數據集 中最常見的 10 種預算數

  • 這一項探究電影常見的預算數是多少
#統計電影常見預算,首先計算對預算字段進行過濾filter,去除預算為 0 的項目, #然后根據預算聚合groupBy并計數count(),接著根據計數進行排序orderBy,并將結果導出為 json 字符串toJSON(), #為了統一輸出,這里將 json 字符串轉為 python 對象,最后取前 10 項作為最終的結果countByBudget = mdf.filter(mdf["budget"] != 0).groupBy("budget").count().\orderBy('count', ascending=False).toJSON().map(lambda j: json.loads(j)).take(10) print(countByBudget) save('/home/hadoop/test/TMDB/budget.json', json.dumps(countByBudget)) #畫圖可視化結果,查看預算與統計數的關系 with open('/home/hadoop/test/TMDB/budget.json', 'r') as f:data=json.load(f) print(data) x=list(map(lambda x:x['budget'],data)) y=list(map(lambda x:x['count'],data)) print(x) print(y) plt.bar(x,y,0.5) plt.xticks(rotation = 45) plt.show()

結果:

最常見的預算從高到低排列,可以看出20000000的最多。

(4)數據集中最常見電影時長 (只展示電影數大于 100 的時長)

#統計最常見的電影時長,1.過濾掉0分鐘的電影,使用filter 篩選出來runtime不等于0的電影 #2.根據時長字段進行聚攏求和,按照key=runtime的聚攏 #3.對2進行計數統計,然后過濾掉時長<100的也就是篩選出時長大于100的,最終將json轉為python輸出,collect查看結果 Runtime = mdf.filter(mdf["runtime"] != 0).groupBy("runtime").count()\.filter('count>=100').toJSON().map(lambda j: json.loads(j)).collect() print(Runtime) save('/home/hadoop/test/TMDB/runtime.json', json.dumps(Runtime)) #畫圖,可視化結果 #讀取文件內容,取出x,y 使用plt繪圖 with open('/home/hadoop/test/TMDB/runtime.json','r') as f:runtimedata = json.load(f) print(runtimedata) x=list(map(lambda x:x["runtime"],runtimedata)) #pyhton中的map格式:map(fun,data) y=list(map(lambda x:x["count"],runtimedata)) plt.title('Four ') plt.xlabel('runtime') plt.ylabel('count') #將數據顯示在條形圖上方 ax=plt.bar(x,y) for rect in ax:#'BarContainer' object has no attribute 'text'w = rect.get_height()plt.text(rect.get_x() + rect.get_width() / 2, w, '%d' % int(w), ha='center', va='bottom') #ha有三個選擇:right,center,left #va有四個選擇:'top', 'bottom', 'center', 'baseline' plt.show()

結果:

從可視化結果來看最常見的電影時長是90min左右

(5) 生產電影最多的 10 大公司

  • 解析 json 格式字段從中提取出 name 并進行詞頻統計,這里取出"production_companies"字段名稱,進行排序,尋找前十個,選擇字段select(“production_companies”) ,在字段中篩選出空值 ,使用rdd中的map,最終聚合
#解析 json 格式字段從中提取出 name 并進行詞頻統計 #這里取出"production_companies"字段名稱,進行排序,尋找前十個 #1.選擇字段select("production_companies") 2.在字段中篩選出空值 3.使用rdd中的map,最終聚合 res = mdf.select("production_companies").filter(mdf["production_companies"] != '').rdd\.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g["production_companies"]))])\.repartition(1).reduceByKey(lambda x, y: x + y) res5 = res.sortBy(lambda x: -x[1]).take(10) countByCompanies = list(map(lambda v: {"company": v[0], "count": v[1]}, res5)) #規定輸出格式 print(countByCompanies) save('/home/hadoop/test/TMDB/company_count.json', json.dumps(countByCompanies)) with open('/home/hadoop/test/TMDB/company_count.json','r') as f:companydata = json.load(f) print(companydata) x=list(map(lambda x:x["company"],companydata)) y=list(map(lambda x:x["count"],companydata)) ax=plt.bar(x,y) for rect in ax:#'BarContainer' object has no attribute 'text'w = rect.get_height()plt.text(rect.get_x() + rect.get_width() / 2, w, '%d' % int(w), ha='center', va='bottom') plt.xticks(rotation = 10) plt.show()

可視化:

最后x軸字體過長,傾斜度不多,大家可以自己再次調節看看。

(6)TMDb 中的 10 大電影語言 餅圖

  • 該字段也是 JSON 數據,因此首先對每個項目進行詞頻統計,然后過濾掉語言為空的項目,最后排序取前十即可。
#該字段也是 JSON 數據,因此首先對每個項目進行詞頻統計,然后過濾掉語言為空的項目,最后排序取前十即可。 res = mdf.select("spoken_languages").filter(mdf["spoken_languages"] != '').rdd\.flatMap(lambda g: [(v, 1) for v in map(lambda x: x['name'], json.loads(g["spoken_languages"]))])\.repartition(1).reduceByKey(lambda x, y: x + y) res6 =res.sortBy(lambda x: -x[1]).take(10) #排序取前十個 countByLanguage = list(map(lambda v: {"language": v[0], "count": v[1]}, res6)) print(countByLanguage) save('/home/hadoop/test/TMDB/language.json', json.dumps(countByLanguage)) #開始畫圖 with open('/home/hadoop/test/TMDB/language.json','r') as f:landata = json.load(f) print(landata) x=list(map(lambda x:x['language'],landata)) y=list(map(lambda x:x['count'],landata)) plt.pie(x=y, #繪圖數據labels=x,#標簽autopct='%.2f%%', #設置百分比的格式,這里保留兩位小數pctdistance=0.8, #設置百分比標簽與圓心的距離labeldistance=1.1, #設置標簽與圓心的距離startangle=180, #設置餅圖的初始角度radius=1.2, #設置餅圖的半徑counterclock=False, #是否逆時針,這里設置為順時針方向wedgeprops={'linewidth':1.5, 'edgecolor':'green'}, #設置餅圖內外邊界的屬性值textprops={'fontsize':10, 'color':'black'}, #設置文本標簽的屬性值) plt.legend(loc='upper left', bbox_to_anchor=(-0.3, 1)) #圖標的位置 plt.grid() plt.show()

可視化:

其中有中文字體,由于沒有設置中文沒有顯示出來,這也是在最后整理的時候發現的問題。

2.字段之間的關系分析

(1)預算與評價的關系

  • 對于每個電影,需要導出如下的數據:[電影標題title,預算budget,評價vote_average],其中選擇后兩個字段進行關系繪圖
  • 需要注意的是,上述代碼都是放在一個代碼文件中運行的,到這里開始一個關系就是一個py文件,這里除了調用必要的包之外,還需要調用上述文件的mdf變量以及save函數。
  • import dataone
  • from dataone import mdf #調用第一個文件中定義的變量mdf
import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" import json # 用于后面的流程 import csv import matplotlib.pyplot as pltimport dataone from dataone import mdf #調用dataone中的變量mdf#基于 DataFrame 對數據進行字段過濾即可 注意過濾掉預算為空的數據,同時,結果只保留了投票數大于 100 的數據確保公平 budgetVote = mdf.select("title", "budget", "vote_average")\.filter(mdf["budget"] != 0).filter(mdf["vote_count"] > 100).collect() print(budgetVote) dataone.save('budget_vote.json', json.dumps(budgetVote))#可視化 with open('budget_vote.json','r') as f:buddata = json.load(f)print(buddata)x=list(map(lambda x:x[1],buddata)) y=list(map(lambda x:x[2],buddata))X=[] Y=[] for i in x:X.append(int(i)/10000) for i in y:Y.append(float(i)) print(X) print(Y) #預算與評價 y z plt.scatter(X,Y) plt.xticks(rotation =90 ) plt.show()

保存的json文件:

可視化(只取一部分值畫圖):

(2)電影發行時間與評價的關系

  • 這部分考慮發行時間與評價之間的關系,因此對于每個電影,需要導出如下的數據:[電影標題,發行時間,評價]
import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" import json # 用于后面的流程 import csv import matplotlib.pyplot as pltimport dataone from dataone import mdf #調用dataone中的變量mdf #這里還是要注意過濾掉發行時間為空的數據,保留投票數大于 100 的數據 dateVote = mdf.select(mdf["release_date"], "vote_average", "title")\.filter(mdf["release_date"] != "").filter(mdf["vote_count"] > 100).collect()dataone.save('date_vote.json', json.dumps(dateVote))#發行時間與評價 with open('date_vote.json','r') as f:datedata = json.load(f) print(datedata) x=list(map(lambda x:x[0],datedata)) y=list(map(lambda x:x[1],datedata)) X=[] Y=[] for i in x:X.append(i) for i in y:Y.append(float(i)) print(X) print(Y) plt.bar(X,Y) plt.xticks(rotation = 90) plt.show()

看一下保存文件的數據結構:

可視化(依舊是部分取值):

(3)流行度和評價的關系

  • 這部分考慮流行度與評價之間的關系,因此對于每個電影,需要導出如下的數據:[電影標題,流行度,評價]
#下面的代碼基本上都是一個套路了 import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" import json # 用于后面的流程 import csv import matplotlib.pyplot as pltimport dataone from dataone import mdf #調用dataone中的變量mdf#選擇需要的字段,過濾掉流行度為0的,保留票數大于100的 popVote = mdf.select("title", "popularity", "vote_average")\.filter(mdf["popularity"] != 0).filter(mdf["vote_count"] > 100).collect() dataone.save('pop_vote.json', json.dumps(popVote))with open('pop_vote.json','r') as f:popdata = json.load(f) x=list(map(lambda x:x[1],popdata)) y=list(map(lambda x:x[2],popdata)) X=[] Y=[] for i in x:X.append(float(i)) for i in y:Y.append(float(i)) print(X) print(Y) plt.scatter(X,Y) plt.xticks(rotation = 90) plt.xlabel("popularity") plt.ylabel("vote_average") plt.show()


(4)公司生產的電影平均分和數量的關系

  • 計算每個公司生產的電影數量及這些電影的平均分分布。
    首先,需要對數據進行過濾,去掉生產公司字段為空和評價人數小于 100 的電影,然后對于每一條記錄,得到一條如下形式的記錄:
    [公司名,(評分,1)]
import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" import json # 用于后面的流程 import csv import matplotlib.pyplot as pltimport dataone from dataone import mdf #調用dataone中的變量mdfbudgetRevenue = mdf.select("title", "budget", "revenue")\.filter(mdf["budget"] != 0).filter(mdf['revenue'] != 0).collect() print(budgetRevenue) dataone.save('budget_revenue.json', json.dumps(budgetRevenue))with open('budget_revenue.json','r') as f:buddata = json.load(f) x=list(map(lambda x:x[1],buddata)) y=list(map(lambda x:x[2],buddata)) X=[] Y=[] for i in x:X.append(float(i)) for i in y:Y.append(int(i)) print(X) print(Y) plt.scatter(X,Y) plt.xticks(rotation = 90) plt.xlabel("budget") plt.ylabel("revenue") plt.show()



(5)電影預算和營收的關系

  • 這部分考慮電影的營收情況,因此對于每個電影,需要導出如下的數據:
    [電影標題,預算,收入]
import os os.environ["JAVA_HOME"] = "/usr/lib/jvm/jdk1.8.0_162" os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3.5" import json # 用于后面的流程 import csv import matplotlib.pyplot as pltimport dataone from dataone import mdf #調用dataone中的變量mdf #選擇需要的字段,在數據中篩選出預算為0的,收入為0的,聚合剩下的 budgetRevenue = mdf.select("title", "budget", "revenue")\.filter(mdf["budget"] != 0).filter(mdf['revenue'] != 0).collect() print(budgetRevenue) dataone.save('budget_revenue.json', json.dumps(budgetRevenue))with open('budget_revenue.json','r') as f:buddata = json.load(f) x=list(map(lambda x:x[1],buddata)) y=list(map(lambda x:x[2],buddata)) X=[] Y=[] for i in x:X.append(int(i)) for i in y:Y.append(int(i)) print(X) print(Y) plt.scatter(X,Y) plt.xticks(rotation = 90) plt.xlabel("budget") plt.ylabel("revenue") plt.show()


五,結語

到此,整個數據分析項目結束。
總結,在進行項目實驗時:
對于數據
1.考慮數據的結構
2.考慮數據的字段,可以做什么有價值的分析,以及用什么樣的可視化可以將分析結果最直觀的表達出來
對于編程
1.了解數據結構之后設計相應的數據框架(存儲數據的格式),如本項目中將csv文件傳輸到HDFS中,HDFS將文件加載至RDD,最后由RDD轉為DataFrame進行數據分析。
也要想到為什么最終會選擇使用DataFrame,因為

  • DataFrame是一種表格型數據結構,它含有一組有序的列,每列可以是不同的值。DataFrame既有行索引,也有列索引

DataFrame方便對數據進行提取分析處理。

2.使用的Spark知識,總的就是選擇select,過濾filter,聚合reduceByKey,統計,排序sortBy。還需熟練使用matplotlib畫圖方法。

至此結束,若有錯誤,請大佬們指正,感謝閱讀。

總結

以上是生活随笔為你收集整理的Spark项目实践--基于 TMDB 数据集的电影数据分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。