【机器学习】在大数据上使用PySpark进行K-Means
作者 | Angel Das
編譯 | VK
來源 | Towards Data Science
如果你不熟悉K Means聚類,我建議你閱讀下面的文章。本文主要研究數據并行和聚類,大數據上的K-Means聚類。
https://towardsdatascience.com/unsupervised-learning-techniques-using-python-k-means-and-silhouette-score-for-clustering-d6dd1f30b660
關于聚類
聚類是一種無監督的學習技術,簡而言之,你處理的是數據,沒有任何關于目標屬性或因變量的信息。
聚類的一般思想是在數據中發現一些內在的結構,通常被稱為相似對象的簇。該算法研究數據以識別這些簇,使得簇中的每個成員更接近簇中的另一個成員(較低的簇內距離),而遠離不同簇中的另一個成員(較高的簇間距離)。
聚類適合哪里?
你們大多數人都熟悉現實生活中的這些例子:
客戶細分-廣泛用于目標營銷
圖像分割-識別景觀
推薦引擎
背景
K-Means聚類,使用歐氏距離形式的相似性度量,通常被稱為分裂聚類或分區聚類。
K均值的基本思想是從每個數據點都屬于一個簇,然后根據用戶輸入K(或聚類數)將它們分成更小的簇。每個簇都有一個稱為質心的中心。質心總數總是等于K。該算法迭代地尋找數據點并將它們分配給最近的簇。
一旦所有數據點被分配到各自的質心(這里代表每個簇),質心值將被重新計算,過程將重復,直到簇達到收斂標準。
質心只不過是每個簇的新平均值(例如,由客戶A、B、C組成的簇,平均支出為100、200、300,籃子大小為10、15和20,質心分別為200和15)。收斂準則是衡量簇的穩定性的一個指標,即任意兩次迭代之間的簇內距離在給定的閾值范圍內不變。
Pypark有什么不同嗎
在我們討論為什么PySpark不是基于Sklearn的算法之前,讓我們先討論一下PySpark中的過程有什么不同。
在使用PySpark構建任何聚類算法時,都需要執行一些數據轉換。讓我們先理解數據,用于分析的數據可以在這里找到。
https://www.kaggle.com/arjunbhasin2013/ccdata
數據
該數據集由超過6個月的9K名活躍信用卡持卡人及其交易和賬戶屬性組成。其想法是制定一個客戶細分的營銷策略。
使用Pypark
from?pyspark.sql?import?SparkSessionspark?=?SparkSession.builder.appName(‘Clustering?using?K-Means’).getOrCreate()data_customer=spark.read.csv('CC?General.csv',?header=True,?inferSchema=True)data_customer.printSchema()屬性可以分為三大類。客戶信息(主鍵為CUST_ID)、帳戶信息(余額、余額頻率、購買、信用額度、使用期限等)和交易(購買頻率、付款、預付現金等)。
data_customer=data_customer.na.drop()所考慮的所有屬性都是數字或離散數字,因此我們需要使用向量匯編器(Vector Assembler)將它們轉換為特征。向量匯編器是一種轉換器,它將一組特征轉換為單個向量列,通常稱為特征數組,這里的特征是列。
customer id不會用于聚類。我們首先使用.columns提取所需的列,將其作為輸入傳遞給Vector Assembler,然后使用transform將輸入列轉換為一個稱為feature的向量列。
from?pyspark.ml.feature?import?VectorAssembler data_customer.columnsassemble=VectorAssembler(inputCols=['BALANCE','BALANCE_FREQUENCY','PURCHASES','ONEOFF_PURCHASES','INSTALLMENTS_PURCHASES','CASH_ADVANCE','PURCHASES_FREQUENCY','ONEOFF_PURCHASES_FREQUENCY','PURCHASES_INSTALLMENTS_FREQUENCY','CASH_ADVANCE_FREQUENCY','CASH_ADVANCE_TRX','PURCHASES_TRX','CREDIT_LIMIT','PAYMENTS','MINIMUM_PAYMENTS','PRC_FULL_PAYMENT','TENURE'],?outputCol='features')assembled_data=assemble.transform(data_customer)assembled_data.show(2)既然所有的列都被轉換成一個單一的特征向量,我們就需要對數據進行標準化,使它們具有可比的規模。例如,BALANCE可以是10-1000,而BALANCE_FREQUENCY可以是0-1。
歐幾里德距離總是在大尺度上受到更大的影響,因此對變量進行標準化是非常重要的。
from?pyspark.ml.feature?import?StandardScalerscale=StandardScaler(inputCol='features',outputCol='standardized')data_scale=scale.fit(assembled_data) data_scale_output=data_scale.transform(assembled_data)data_scale_output.show(2)既然我們的數據已經標準化了,我們就可以開發K均值算法了。
K-means是最常用的聚類算法之一,用于將數據分簇到預定義數量的聚類中。
spark.mllib包括k-means++方法的一個并行化變體,稱為kmeans||。KMeans函數來自pyspark.ml.clustering,包括以下參數:
k是用戶指定的簇數
maxIterations是聚類算法停止之前的最大迭代次數。請注意,如果簇內距離的變化不超過上面提到的epsilon值,迭代將停止,而不考慮最大迭代次數
initializationMode指定質心的隨機初始化或通過k-means||初始化(類似于k-means++)
epsilon決定k-均值收斂的距離閾值
initialModel是一簇可選的群集質心,用戶可以將其作為輸入提供。如果使用此參數,算法只運行一次,將點分配到最近的質心
train(k=4,?maxIterations=20,?minDivisibleClusterSize=1.0,?seed=-1888008604)是默認值。
from?pyspark.ml.clustering?import?KMeans from?pyspark.ml.evaluation?import?ClusteringEvaluatorsilhouette_score=[]evaluator?=?ClusteringEvaluator(predictionCol='prediction',?featuresCol='standardized',?\metricName='silhouette',?distanceMeasure='squaredEuclidean')for?i?in?range(2,10):KMeans_algo=KMeans(featuresCol='standardized',?k=i)KMeans_fit=KMeans_algo.fit(data_scale_output)output=KMeans_fit.transform(data_scale_output)score=evaluator.evaluate(output)silhouette_score.append(score)print("Silhouette?Score:",score)可視化分數。注意,以前版本的K Means有computeScore,它計算聚類內距離的總和,但在spark3.0.0中被棄用。
輪廓分數使用ClusteringEvaluator,它測量一個簇中的每個點與相鄰簇中的點的接近程度,從而幫助判斷簇是否緊湊且間隔良好
#?可視化輪廓分數 import?matplotlib.pyplot?as?plt fig,?ax?=?plt.subplots(1,1,?figsize?=(8,6)) ax.plot(range(2,10),silhouette_score) ax.set_xlabel(‘k’) ax.set_ylabel(‘cost’)我更喜歡用K=7,在那里可以觀察到輪廓分數的局部最大值。什么值的K是好的沒有正確的答案。
我們可以使用描述性統計和其他圖表來檢查,這點在SkLearn和PCA上實現更方便。我們中的大多數人更喜歡研究肘部圖,而不是輪廓分數,但PySpark有它的優點。
為什么是Pypark?
PySpark在執行K均值聚類時使用數據并行或結果并行的概念。
假設你需要為墨爾本節禮日活動開展有針對性的營銷活動,并且你希望接觸到具有不同購買屬性的20萬客戶。想象一下在本地系統上運行K Means的多次迭代。對于K=5,需要計算的距離度量數為5 x 200K=1百萬。100萬個這樣的度量需要計算30次才能滿足收斂標準,即3000萬個距離(歐幾里德距離)。處理這樣的場景需要大量的計算能力和時間。
數據并行性
數據并行所做的是,通過將數據集劃分為更小的分區,從一開始就創建并行性。另一方面,結果并行是基于目標聚類的。例如:
D=記錄數{X1,X2,…,Xn}
k=簇數
P=處理器數{P1,P2,…,Pm}
C=初始質心{C1,C2,…,Ck}
數據D被P個處理器分割。每個處理器處理一簇記錄(由spark配置決定)。初始質心值C在每個處理器之間共享
現在每個處理器都有質心信息。處理器計算它們的記錄到這些質心的距離,并通過將數據點分配到最近的質心來形成局部聚類
完成步驟2后,主進程將存儲P個處理器上每個聚類的記錄總數和計數,以供將來參考
一旦一次迭代完成,來自處理器的信息被交換,主進程計算更新的質心并再次在P個處理器之間共享它們,即,主進程更新質心,并與處理器重新共享信息
這個過程不斷迭代直到收斂。一旦滿足收斂條件,主進程就收集本地簇并將它們組合成一個全局聚類
想象一下,將20萬條記錄分成3個處理器,每個處理器有約70萬條記錄。這就是分布式處理的用武之地,以減少數據量,同時確保完整的結果。
結果并行性
例如:
D=記錄數{X1,X2,…,Xn}
k=簇數
P=處理器數{P1,P2,…,Pm}
C=初始質心{C1,C2,…,Ck}
數據D被P個處理器分割,然后在每個處理器內排序。每個處理器處理一組記錄(由spark配置決定)
初始質心值C被初始化,并在這些處理器中的每一個處理器之間進行分割/共享(即,與所有質心值在所有處理器之間共享的數據并行性不同,這里,我們將一個質心值傳遞給一個處理器)
現在每個處理器都有一個中心。計算這些點到這些質心的距離。對于處理器中的數據點:如果它們更接近處理器的質心,則將它們分配給該簇,否則如果它們更接近屬于其他處理器的質心,則將數據點移動到新處理器
重復,直到收斂。
有用的鏈接
https://spark.apache.org/docs/latest/mllib-clustering.html
https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.clustering.KMeansModel
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.evaluation.ClusteringEvaluator
https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/evaluation.html
感謝閱讀。
往期精彩回顧適合初學者入門人工智能的路線及資料下載機器學習及深度學習筆記等資料打印機器學習在線手冊深度學習筆記專輯《統計學習方法》的代碼復現專輯 AI基礎下載機器學習的數學基礎專輯溫州大學《機器學習課程》視頻 本站qq群851320808,加入微信群請掃碼:總結
以上是生活随笔為你收集整理的【机器学习】在大数据上使用PySpark进行K-Means的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 搜狗浏览器打不开网页怎么办
- 下一篇: Win11如何将游戏隐藏 Win11游戏