日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

PySpark机器学习 ML

發布時間:2023/12/20 编程问答 42 豆豆
生活随笔 收集整理的這篇文章主要介紹了 PySpark机器学习 ML 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

5 Introducing the ML Package
在前面,我們使用了Spark中嚴格基于RDD的MLlib包。 在這里,我們將基于DataFrame使用MLlib包。 另外,根據Spark文檔,現在主要的Spark機器學習API是spark.ml包中基于DataFrame的一套模型。
5.1 ML包的介紹
從頂層上看,ML包主要包含三大抽象類:轉換器、預測器和工作流。

5.1.1 轉換器(Transformer)
從Transformer抽象類派生出來的每一個新的Transformer都需要實現一個.transform(…) 方法,該方法可以將一個DataFrame轉換成另一個DataFrame。
在spark.ml.feature中有許多Transformer:

Binarizer :給定一個閾值,該方法需要一個連續的變量將其轉換為二進制。
Bucketizer:分箱(分段處理):將連續數值轉換為離散類別比如特征是年齡,是一個連續數值,需要將其轉換為離散類別(未成年人、青年人、中年人、老年人),就要用到Bucketizer了。
ChiSqSelector:對于分類目標變量(考慮到分類模型),此方法允許你預定義數量的特征(通過numTopFeatures參數指定)。 選擇完成后,如方法的名稱所示,使用卡方檢驗。 需要兩步:首先,你需要.fit(…) 數據(為了這個方法可以計算卡方檢驗)。然后,調用.fit(…)方法(將你的DataFrame作為參數傳遞)返回一個可以用.transform(…)轉換的ChiSqSelectorModel對象。
CountVectorizer:將文本文檔轉換為單詞計數的向量。當不存在先驗字典時,Countvectorizer作為Estimator提取詞匯進行訓練,并生成一個CountVectorizerModel用于存儲相應的詞匯向量空間。該模型產生文檔關于詞語的稀疏表示,其表示可以傳遞給其他算法,
HashingTF : 生成詞頻率向量。它采用詞集合并將這些集合轉換成固定長度的特征向量。在文本處理中,“一組詞”可能是一袋詞。 HashingTF使用散列技巧。通過應用散列函數將原始要素映射到索引,然后基于映射的索引來計算項頻率。
IDF : 此方法計算逆文檔頻率。需要注意的是文本首先要用向量表示,可以用HashingTF 或者 CountVectorizer。
MinMaxScaler:最大-最小規范化,將所有特征向量線性變換到用戶指定最大-最小值之間。但注意在計算時還是一個一個特征向量分開計算的。通常將最大,最小值設置為1和0,這樣就歸一化到[0,1]。Spark中可以對min和max進行設置,默認就是[0,1]。(不移動中心點)
MaxAbsScaler:同樣對某一個特征操作,各特征值除以最大絕對值,因此縮放到[-1,1]之間。且不移動中心點。不會將稀疏矩陣變得稠密。
NGram :返回 n-grams,兩個連續的序列詞,或三個或更多。例如 [‘good’,’morning’, ‘Robin’, ‘Williams’] ,返回兩個即 [‘good morning’, ‘morning Robin’, ‘Robin Williams’] 。
Normalizer : 將某個特征向量(由所有樣本某一個特征組成的向量)計算其p-范數,然后對該每個元素除以p-范數。將原始特征Normalizer以后可以使得機器學習算法有更好的表現。(默認是L2)。
PCA : 數據降維。
RegexTokenizer:使用正則表達式。
StandardScaler :將某個特征向量(由所有樣本某一個特征組成的向量)進行標準化,使數據均值為0,方差為1。Spark中可以選擇是帶或者不帶均值和方差。
StopWordsRemover :移除停用詞。
Tokenizer:這是默認的標記器,它將字符串轉換為小寫,然后在空間上分割。
VectorAssembler:這是一個非常有用的轉換器,將多個數值(包含向量)列整合到一個用向量表示的列中。
VectorSlicer:適用于特征向量,無論是稠密還是稀疏:給定一個索引列表,它從特征向量中提取值。
Word2Vec :該方法將一個句子(字符串)作為輸入,并將其轉換為{string,vector}格式的映射,這種格式在自然語言處理中非常有用。
String<->Index 相互轉換:

VectorIndexer:提高決策樹或隨機森林等ML方法的分類效果。VectorIndexer是對數據集特征向量中的類別(離散值)特征(index categorical features categorical features )進行編號。它能夠自動判斷哪些特征是離散值型的特征,并對他們進行編號,具體做法是通過設置一個maxCategories,特征向量中某一個特征不重復取值個數小于maxCategories,則被重新編號為0~K(K<=maxCategories-1)。某一個特征不重復取值個數大于maxCategories,則該特征視為連續值,不會重新編號(不會發生任何改變)。
StringIndexer:按label出現的頻次,轉換成0~num numOfLabels-1(分類個數),頻次最高的轉換為0,
IndexToString:有StringIndexer,就應該有IndexToString。在應用StringIndexer對labels進行重新編號后,帶著這些編號后的label對數據進行了訓練,并接著對其他數據進行了預測,得到預測結果,預測結果的label也是重新編號過的,因此需要轉換回來。
5.1.2 預測器(Estimators)

預測器可以被認為是需要評估的統計模型,來進行預測或對觀測結果進行分類。
如果派生自抽象的Estimator類,則新模型必須實現.fit(…)方法,該方法給DataFrame中的數據以及一些默認或用戶指定的參數泛化模型。

一、分類
ML包提供了七種分類模型,這里介紹四種常用的模型。

LogisticRegression:邏輯回歸是分類的基本模型。邏輯回歸使用logit函數來計算觀測到屬于特定類別的概率。
DecisionTreeClassifier :構建一棵決策樹以預測觀察類別的分類器。maxDepth指定參數限制樹的生長深度,minInstancePerNode確定進一步拆分所需的樹節點中觀察值的最小數目,maxBins參數指定連續變量將被分割的最大數量的區間, impurity 指定測量和計算來自分割的信息增益的度量。
RandomForestClassifier:這個模型產生多個決策樹(因此稱為森林),并使用這些決策樹的模式輸出分類結果。 RandomForestClassifier支持二元和多元標簽。
NaiveBayes:基于貝葉斯定理,這個模型使用條件概率來分類觀測。 PySpark ML中的NaiveBayes模型支持二元和多元標簽。
二、回歸

PySpark ML包中有七種模型可用于回歸任務。這里只介紹兩種模型,如后續需要用可查閱官方手冊。

LinearRegression:最簡單的回歸模型,它假定了特征和連續標簽之間的線性關系,以及誤差項的正態性。
DecisionTreeRegressor:與分類模型類似,標簽是連續的而不是二元或多元的。
三、聚類

聚類是一種無監督的模型。PySpark ML包提供了四種模型。

BisectingKMeans :k-means 聚類和層次聚類的組合。該算法以單個簇中的所有觀測值開始,并將數據迭代地分成k個簇。
KMeans : 將數據分成k個簇,隨機生成k個初始點作為質心,將數據集中的數據按照距離質心的遠近分到各個簇中,將各個簇中的數據求平均值,作為新的質心,重復上一步,直到所有的簇不再改變。
GaussianMixture:這個方法使用k個未知的高斯分布參數來剖析數據集。使用期望最大化算法,通過最大化對數似然函數來找到高斯參數。
LDA:此模型用于自然語言處理應用程序中的主題建模。
5.1.3 管道/工作流(Pipeline)

一個管道串起多個轉換器和預測器,明確一個機器學習工作流。
預測上一篇中的嬰兒存活率。

載入數據:
import pyspark.sql.types as typ
labels = [
(‘INFANT_ALIVE_AT_REPORT’, typ.IntegerType()),
(‘BIRTH_PLACE’, typ.StringType()),
(‘MOTHER_AGE_YEARS’, typ.IntegerType()),
(‘FATHER_COMBINED_AGE’, typ.IntegerType()),
(‘CIG_BEFORE’, typ.IntegerType()),
(‘CIG_1_TRI’, typ.IntegerType()),
(‘CIG_2_TRI’, typ.IntegerType()),
(‘CIG_3_TRI’, typ.IntegerType()),
(‘MOTHER_HEIGHT_IN’, typ.IntegerType()),
(‘MOTHER_PRE_WEIGHT’, typ.IntegerType()),
(‘MOTHER_DELIVERY_WEIGHT’, typ.IntegerType()),
(‘MOTHER_WEIGHT_GAIN’, typ.IntegerType()),
(‘DIABETES_PRE’, typ.IntegerType()),
(‘DIABETES_GEST’, typ.IntegerType()),
(‘HYP_TENS_PRE’, typ.IntegerType()),
(‘HYP_TENS_GEST’, typ.IntegerType()),
(‘PREV_BIRTH_PRETERM’, typ.IntegerType())
]
# Specifying the schema of the DataFrame
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])
births = spark.read.csv(‘births_transformed.csv.gz’,
header=True,
schema=schema)

創建轉換器
因為統計模型只能對數值數據進行操作,我們必須對birth_place變量進行編碼。

import pyspark.ml.feature as ft
# Casting the column to an IntegerType
births = births
.withColumn(‘BIRTH_PLACE_INT’, births[‘BIRTH_PLACE’]
.cast(typ.IntegerType()))
# Using the OneHotEncoder to encode
encoder = ft.OneHotEncoder(
inputCol=‘BIRTH_PLACE_INT’,
outputCol=‘BIRTH_PLACE_VEC’)
# Using the VectorAssembler to create a single column with all the features collated together.
featuresCreator = ft.VectorAssembler(
inputCols=[col[0] for col in labels[2:]] +
[encoder.getOutputCol()],
outputCol=‘features’
)

創建預測器
import pyspark.ml.classification as cl
logistic = cl.LogisticRegression(
maxIter=10,
regParam=0.01,
labelCol=‘INFANT_ALIVE_AT_REPORT’)

創建一個工作流

from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[
encoder,
featuresCreator,
logistic
])

訓練模型
births_train, births_test = births
.randomSplit([0.7, 0.3], seed=666)

model = pipeline.fit(births_train)
test_model = model.transform(births_test)

評估
import pyspark.ml.evaluation as ev
evaluator = ev.BinaryClassificationEvaluator(
rawPredictionCol=‘probability’,
labelCol=‘INFANT_ALIVE_AT_REPORT’)
print(evaluator.evaluate(test_model,
{evaluator.metricName: ‘areaUnderROC’}))
print(evaluator.evaluate(test_model,
{evaluator.metricName: ‘areaUnderPR’}))

保存模型
pipelinePath = ‘./infant_oneHotEncoder_Logistic_Pipeline’
pipeline.write().overwrite().save(pipelinePath)
# You can load it up later and use it straight away to .fit(…) and predict:
loadedPipeline = Pipeline.load(pipelinePath)
loadedPipeline
.fit(births_train)
.transform(births_test)

總結

以上是生活随笔為你收集整理的PySpark机器学习 ML的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。