日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark 2.x ML概念与应用

發布時間:2024/1/23 编程问答 20 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark 2.x ML概念与应用 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

# spark 2.x ML概念與應用

@(SPARK)[spark]

  • 一基礎
      • 1核心概念
      • 2Transformer
      • 3Estimator
      • 4Pileline
      • 5同一實例
      • 6保存模型
  • 二基本數據結構
    • 一核心概念
      • 1本地向量 LocalVecotr
      • 2向量標簽 LabelVector
      • 3本地矩陣
      • 4分布式矩陣
    • 二libsvm數據格式
      • 3fittransform方法的參數DF包含哪些列
  • 三樸素貝葉斯與邏輯回歸示例
    • 一準備學習數據
      • 1數據格式
      • 2數據排序
    • 二樸素貝葉斯算法
      • 1訓練模型
      • 2模型評估方法1使用spark API -ROC曲線
      • 3模型評估方法2-自己計算
      • 4模型調優
    • 三邏輯回歸算法示例
      • 1訓練模型
      • 2評估模型方法1使用API
      • 3模型評估方法2使用spark API
      • 4模型評估方法3自己寫
      • 5模型調優
    • 四使用模型預測數據
      • 1基本使用
      • 2保存計算結果
      • 3保存模型與重新加載模型
      • 4關于Vector的維度數量的說明
      • 5 一個超級大坑索引從0開始還是從1開始
    • 五各個算法間的結果對比

一、基礎

1、核心概念

DataFrame:通用的API數據集,源自Spark SQL。

Transformer:根據原來的DF計算一些新的列,并附在原有DF上。如一個模型,或者是將原有的類作一些變換(如構造向量)后附在原有DF上。

Estimator:就是一個算法,它根據一個DF算出一個Transformer。

Pipeline:就是一系列的Transformer/Estimator組合。

2、Transformer

一般而言,transformer會實現transform()方法,它包括以下2種類型:
(1)特征變換:讀取一個DataFrame,然后將這個DF中的列轉化為一個向量,并將結果附在原來的DF中。
(2)學習模型:詐取一個DataFrame,使用DF中包括的列或者向量,為每個數據計算一個預測結果或者分類,并將結果附在原有的DF中

3、Estimator

Estimator實現了fit()方法,它使用DataFame來計算出來一個Transformer。比如說,LogisticRegression是一個Estimator,它使用fit()訓練出一個LogisticRegressionModel,后者是一個Transformer。

4、Pileline

pipeline就是一系列的Transformer/Estimator。比如一個文本處理包括以下過程

  • Split each document’s text into words.
  • Convert each document’s words into a numerical feature vector.
  • Learn a prediction model using the feature vectors and labels.

5、同一實例

同一個pipeline中不能重復出現2次同一個transformer/estimator實例,但可以是同一個類的2個實例。
Unique Pipeline stages: A Pipeline’s stages should be unique instances. E.g., the same instance myHashingTF should not be inserted into the Pipeline twice since Pipeline stages must have unique IDs. However, different instances myHashingTF1 and myHashingTF2 (both of type HashingTF) can be put into the same Pipeline since different instances will be created with different IDs.

6、保存模型

Often times it is worth it to save a model or a pipeline to disk for later use. In Spark 1.6, a model import/export functionality was added to the Pipeline API. Most basic transformers are supported as well as some of the more basic ML models. Please refer to the algorithm’s API documentation to see if saving and loading is supported.

二、基本數據結構

完整內容請參考官方文檔 https://spark.apache.org/docs/latest/mllib-data-types.html

(一)核心概念

1、本地向量 LocalVecotr

MLlib的本地向量主要分為兩種,DenseVector和SparseVector,顧名思義,前者是用來保存稠密向量,后者是用來保存稀疏向量,其創建方式主要有一下三種(三種方式均創建了向量(1.0, 0.0, 2.0):

注意,ml package中有同樣的類。

import org.apache.spark.ml.linalg.{Vector, Vectors} //創建一個稠密向量 val dv : Vector = Vectors.dense(1.0,0.0,3.0); //創建一個稀疏向量(第一種方式) val sv1: Vector = Vectors.sparse(3, Array(0,2), Array(1.0,3.0)); //創建一個稀疏向量(第二種方式) val sv2 : Vector = Vectors.sparse(3, Seq((0,1.0),(2,3.0)))

對于稠密向量:很直觀,你要創建什么,就加入什么,其函數聲明為Vectors.dense(values : Array[Double])
對于稀疏向量,當采用第一種方式時,3表示此向量的長度,第一個Array(0,2)表示的索引,第二個Array(1.0, 3.0)與前面的Array(0,2)是相互對應的,表示第0個位置的值為1.0,第2個位置的值為3
對于稀疏向量,當采用第二種方式時,3表示此向量的長度,后面的比較直觀,Seq里面每一對都是(索引,值)的形式。

tips:由于scala中會默認包含scal.collection.immutalbe.Vector,所以當使用MLlib中的Vector時,需要顯式的指明import路徑

2、向量標簽 LabelVector

向量標簽和向量是一起的,簡單來說,可以理解為一個向量對應的一個特殊值,這個值的具體內容可以由用戶指定,比如你開發了一個算法A,這個算法對每個向量處理之后會得出一個特殊的標記值p,你就可以把p作為向量標簽。同樣的,更為直觀的話,你可以把向量標簽作為行索引,從而用多個本地向量構成一個矩陣(當然,MLlib中已經實現了多種矩陣)
其使用代碼為:

import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.feature.LabeledPoint val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))

對于pos變量,第一個參數表示這個數據的分類,1.0的具體含義只有你自己知道咯,可以使行索引,可以使特殊值神馬的
從文件中直接讀入一個LabeledPoint

MLlib提供了一種快捷的方法,可以讓用戶直接從文件中讀取LabeledPoint格式的數據。規定其輸入文件的格式為:

label index1:value1 index2:value2.....

然后通過

val spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate() val data = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")

直接讀入即可。
關于libsvm格式的詳細說明請見下面內容。

3、本地矩陣

既然是算數運算包,肯定少不了矩陣包,先上代碼:

import org.apache.spark.mllib.linalg.{Matrix, Matrices} val dm : Matrix = Matrices.dense(3,2, Array(1.0,3.0,5.0,2.0,4.0,6.0))

上面的代碼段創建了一個稠密矩陣:

1.0 2.0 3.0 4.0 5.0 6.0

很明顯,創建的時候是將原來的矩陣按照列變成一個一維矩陣之后再初始化的。

稀疏矩陣:

val eye = Matrices.sparse(3, 3, Array(0, 1, 2, 3), Array(0, 1, 2), Array(1, 1, 1))

4、分布式矩陣

(ml package未找到類似的類)
MLlib提供了三種分布式矩陣的實現,依據你數據的不同的特點,你可以選擇不同類型的數據:
(1)RowMatrix

RowMatrix矩陣只是將矩陣存儲起來,要注意的是,此種矩陣不能按照行號訪問。(我也不知道為什么這樣鳥。。)

import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ...// val mat: RowMatrix = new RowMatrix(rows) val m = mat.numRows() val n = mat.numCols()

RowMatrix要從RDD[Vector]構造,m是mat的行數,n是mat的列
Multivariate summary statistics

顧名思義,這個類里面包含了矩陣中的很多常見信息,怎么使用呢?

import org.apache.spark.mllib.linalg.Matrix import org.apache.spark.mllib.linalg.distributed.RowMatrix import org.apache.spark.mllib.stat.MultivariateStatisticalSummary val mat: RowMatrix = .. val summy : MultivariateStatisticalSummary = mat.computeColumnSummaryStatistics() println(summy.mean)//平均數

通過這個類,可以得到平均數,矩陣中非0個數,具體的數據看看幫助文檔
(2)IndexedRowMatrix

IndexedRowMatrix矩陣和RowMatrix矩陣的不同之處在于,你可以通過索引值來訪問每一行。其他的,沒啥區別。。
(3)CoordinateMatrix

當你的數據特別稀疏的時候怎么辦?采用這種矩陣吧。先上代碼:

import org.apache.spark.mllib.linalg.distributed.{CoordinatedMatrix, MatrixEntry} val entries : RDD[MatrixEntry] = .. val mat: CoordinateMatrix = new CoordinateMatrix(entries)

CoordinateMatrix矩陣中的存儲形式是(row,col,value),就是原始的最稀疏的方式,所以如果矩陣比較稠密,別用這種數據格式

(二)libsvm數據格式

首先介紹一下 libSVM的數據格式

Label 1:value 2:value …
Label:是類別的標識,比如上節train.model中提到的1 -1,你可以自己隨意定,比如-10,0,15。當然,如果是回歸,這是目標值,就要實事求是了。
Value:就是要訓練的數據,從分類的角度來說就是特征值,數據之間用空格隔開
比如:
-15 1:0.708 2:1056 3:-0.3333
需要注意的是,如果特征值為0,則這列數據可以不寫,因此特征冒號前面的(姑且稱做序號)可以不連續。如:
-15 1:0.708 3:-0.3333
表明第2個特征值為0,從編程的角度來說,這樣做可以減少內存的使用,并提高做矩陣內積時的運算速度。我們平時在matlab中產生的數據都是沒有序號的常規矩陣,所以為了方便最好編一個程序進行轉化。

spark提供了方便的工具類來加載這些數據

val spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate() val data = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data3")

3、fit()/transform()方法的參數DF包含哪些列

  • 基于DataFrame,借助于抽象,將模型抽象為三個基本類,estimators(實現fit方法), transformers(實現transform方法), pipelines
  • 一個正常的模型應該同時實現 fit 和 transform 兩個方法
  • transform 將生成一個新的DataFrame,包含了預測的結果
  • fit 的DataFrame需要包含兩列 featuresCol 和 labelCol 默認名字為 label
    *
    transform 之前的DataFrame需要包含一列 featuresCol,默認名字為features,輸出三列(依賴于參數),三列有默認名字,都可以通過setter函數進行設置。

    • predictedCol 預測的標簽,默認名字為 prediction
    • rawPredictedCol 預測的裸數據?向量?邏輯回歸是wx貌似,默認名字為 rawPrediction
    • probabilityCol 預測的概率,默認名字為 probability

三、樸素貝葉斯與邏輯回歸示例

(一)準備學習數據

1、數據格式

我們使用上面介紹的libsvm數據格式作為訓練數據:

0 31607:17 0 111905:36 0 109:3 506:41 1509:1 2106:4 5309:1 7209:5 8406:1 27108:1 27709:1 30209:8 36109:20 41408:1 42309:1 46509:1 47709:5 57809:1 58009:1 58709:2 112109:4 123305:48 142509:1 0 407:14 2905:2 5209:2 6509:2 6909:2 14509:2 18507:10 0 604:3 3505:9 6401:3 6503:2 6505:3 7809:8 10509:3 12109:3 15207:19 31607:19 0 19109:7 29705:4 123305:32 0 15309:1 43005:1 108509:1 1 604:1 6401:1 6503:1 15207:4 31607:40 0 1807:19 0 301:14 501:1 1502:14 2507:12 123305:4 0 607:14 19109:460 123305:448 0 5406:14 7209:4 10509:3 19109:6 24706:10 26106:4 31409:1 123305:48 128209:1 1 1606:1 2306:3 3905:19 4408:3 4506:8 8707:3 19109:50 24809:1 26509:2 27709:2 56509:8 122705:62 123305:31 124005:2 0 15309:1 19109:1 43005:1 108509:1 124005:1 0 107:79 507:2 607:1 37907:1 121409:5 1 2905:12 19109:17 31607:16 0 701:6 1101:2 2301:3 4701:2 10601:1 20201:3 32707:20 0 1401:10 14202:3 14402:3 18902:2 19009:3 19109:3 38002:3 0 31607:15 0 15309:4 19109:99 108509:4 0 708:7 1908:4 2008:4 19109:4 123305:4 123905:23

2、數據排序

正如上面如言,libsvm需要將屬性排序,提供一個方法作轉換,如果本身已經是排序的,則不需要

//將ip標簽按標簽id進行排序 import java.util import scala.collection.JavaConversions._ val SEPARATOR = " "; def sortLabel(line:String):String={val contents = line.split(SEPARATOR);val label = contents(0);val map = new util.TreeMap[Integer,Integer]()for(i <- 1 to contents.length-1){println(contents(i))map.put(contents(i).split(":")(0).toInt,contents(i).split(":")(1).toInt)}var returnLine = label;for(key<-map.keySet()){returnLine +=( SEPARATOR + key + ":" + map.get(key))}return returnLine; }sc.textFile("/tmp/ljhn1829/aplus/training_data").map(line=>sortLabel(line)).coalesce(1,true).saveAsTextFile("/tmp/ljhn1829/aplus/training_data2")

(二)樸素貝葉斯算法

1、訓練模型

import org.apache.spark.ml.classification.NaiveBayes import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder.appName("NaiveBayesExample").getOrCreate() val data = spark.read.format("libsvm").option("numFeatures","999999").load("/tmp/ljhn1829/aplus/training_data2")//training_data2:正樣本22W,負樣本2000萬。training_data3:正負樣本各22W. val Array(trainingData, testData) = data.randomSplit(Array(0.99, 0.01), seed = 1234L) //val model = new NaiveBayes().fit(trainingData) val model = new NaiveBayes().setThresholds(Array(10.0,1.0)).fit(trainingData) val predictions = model.transform(testData) predictions.show()

2、模型評估方法1–使用spark API -ROC曲線

可以用上述所列的testData作為檢驗數據,也可以使用全量數據作為檢驗數據:

val data2 = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data2") val Array(trainingData2, testData2) = data2.randomSplit(Array(0.01, 0.99), seed = 1234L) val predictions = model.transform(testData2) predictions.show()

(1)將其作為多分類結果進行評估,可計算f1、精度、召回率、準確度(見MulticlassClassificationEvaluator源代碼)

val multiclassClassificationEvaluator = new MulticlassClassificationEvaluator() def printlnMetric(metricName: String): Unit = {println(metricName + " = " + multiclassClassificationEvaluator.setMetricName(metricName).evaluate(predictions)) }printlnMetric("f1") printlnMetric("weightedPrecision") printlnMetric("weightedRecall") printlnMetric("accuracy")

結果:

scala> printlnMetric("f1") f1 = 0.8942599227319046scala> printlnMetric("weightedPrecision") weightedPrecision = 0.9766525369524176scala> printlnMetric("weightedRecall") weightedRecall = 0.8280687304878339scala> printlnMetric("accuracy") accuracy = 0.8280687304878339

(2)將其作為二分類結果進行評估,可計算areaUnderROC、areaUnderPR(見BinaryClassificationEvaluator源代碼)

val binaryClassificationEvaluator = new BinaryClassificationEvaluator() def printlnMetric(metricName: String): Unit = {println(metricName + " = " + binaryClassificationEvaluator.setMetricName(metricName).evaluate(predictions)) }printlnMetric("areaUnderROC") printlnMetric("areaUnderPR")

3、模型評估方法2—-自己計算

不使用spark API,自己來計算
(1)檢驗樣本的總數

predictions.count res49: Long = 5245125

(2)檢驗樣本中分類正確的數量

predictions.filter($"label" === $"prediction").count res55: Long = 4343324

(3)樣本中分類為1的樣本數量

predictions.filter($"label" === 1).count res60: Long = 68726

(4)樣本中分類為0的樣本數量

predictions.filter($"label" === 0).count res61: Long = 5176399

(5)分類正確且分類為1的樣本數量 TP 21931 35898 119454 146188

predictions.filter($"label" === $"prediction").filter($"label"===1).count

(6)分類正確且分類為0的樣本數量 TN 4321393 44030 11478102 9280601

predictions.filter($"label" === $"prediction").filter($"label"===0).count

(7)分類錯誤且分類為1的樣本數量 FP 855006 21747 5615212 7812713

predictions.filter($"label" !== $"prediction").filter($"prediction"===1).count

(8)分類錯誤且分類為0的樣本數量 FN 46795 32574 80404.0

predictions.filter($"label" !== $"prediction").filter($"prediction"===0).count

使用均衡訓練數據,未設閾值
準確率:(11478102+119454)/(11478102+119454+5615212+107138.0)=0.6696084840183313
召回率:TP/(P) = 119454/(119454+107138.0) = 0.5271765993503742
精度:TP/(TP+FP)=21931/(21931+855006)=0.025008638020747216
準確率60%,如回率58%,精度62%

使用均衡數據,設置10:1閾值
準確率:(146188+9280601)/(9280601+146188+7812713+80404.0)=0.5442748361336373
召回率:TP/(P) = 146188/(146188+80404.0) = 0.6451595819799464
精度:TP/(TP+FP)=146188/(146188+7812713)= 0.01836786259811499
準確率54.4%,召回率64.5%,精度18.3%

4、模型調優

(1)new NaiveBayes().setThresholds(Array(100.0,1.0))
為每個分類設置一個閾值,參數的長度必須和類的個數相等。最終的分類結果會是p/t最大的那個分類,其中p是通過Bayes計算出來的結果,t是閾值。
這對于訓練樣本嚴重不均衡的情況尤其重要,比如分類1只有20W數據,而分類0有2000萬數據,此時應用new NaiveBayes().setThresholds(Array(100.0,1.0))
Param for Thresholds in multi-class classification to adjust the probability of predicting each class. Array must have length equal to the number of classes, with values > 0 excepting that at most one value may be 0. The class with largest value p/t is predicted, where p is the original probability of that class and t is the class’s threshold.

(三)邏輯回歸算法示例

1、訓練模型

import org.apache.spark.ml.classification.LogisticRegressionval data =spark.read.format("libsvm").option("numFeatures","999999").load("/tmp/ljhn1829/aplus/training_data3") val Array(trainingData, testData) = data.randomSplit(Array(0.99, 0.01), seed = 1234L)val lr = new LogisticRegression() //.setThreshold(0.4152415581588802).setMaxIter(10).setRegParam(0.3).setElasticNetParam(0.8) val lrModel = lr.fit(trainingData) println(s"Coefficients: ${lrModel.coefficients} Intercept: ${lrModel.intercept}")val predictions = lrModel.transform(testData) predictions.show()

2、評估模型方法1–使用API

import org.apache.spark.ml.classification.BinaryLogisticRegressionSummary import org.apache.spark.sql.functions;//獲得回歸模型訓練的Summary val trainingSummary = lrModel.summary// Obtain the loss per iteration. //每次迭代的損失,一般會逐漸減小 val objectiveHistory = trainingSummary.objectiveHistory for (lossPerIteration<-objectiveHistory) {println(lossPerIteration); }// Obtain the metrics useful to judge performance on test data. // We cast the summary to a BinaryLogisticRegressionSummary since the problem is a binary // classification problem. //強制類型轉換為二類LR的Summary,然后就可以用混淆矩陣,ROC等評估方法了。Spark2.0還無法針對多類 val binarySummary = trainingSummary.asInstanceOf[BinaryLogisticRegressionSummary] // Obtain the receiver-operating characteristic as a dataframe and areaUnderROC. val roc = binarySummary.roc //獲得ROC roc.show //顯示ROC數據表,可以用這個數據自己畫ROC曲線 roc.select("FPR").show(); System.out.println(binarySummary.areaUnderROC);//AUC

3、模型評估方法2–使用spark API

可以用上述所列的testData作為檢驗數據,也可以使用全量數據作為檢驗數據:

val data2 = spark.read.format("libsvm").load("/tmp/ljhn1829/aplus/training_data2") val Array(trainingData2, testData2) = data2.randomSplit(Array(0.01, 0.99), seed = 1234L) val predictions = lrModel.transform(testData2) predictions.show()

(1)將其作為多分類結果進行評估,可計算f1、精度、召回率、準確度(見MulticlassClassificationEvaluator源代碼)

val multiclassClassificationEvaluator = new MulticlassClassificationEvaluator() def printlnMetric(metricName: String): Unit = {println(metricName + " = " + multiclassClassificationEvaluator.setMetricName(metricName).evaluate(predictions)) }printlnMetric("f1") printlnMetric("weightedPrecision") printlnMetric("weightedRecall") printlnMetric("accuracy")

結果:

scala> printlnMetric("f1") f1 = 0.8942599227319046scala> printlnMetric("weightedPrecision") weightedPrecision = 0.9766525369524176scala> printlnMetric("weightedRecall") weightedRecall = 0.8280687304878339scala> printlnMetric("accuracy") accuracy = 0.8280687304878339

(2)將其作為二分類結果進行評估,可計算areaUnderROC、areaUnderPR(見BinaryClassificationEvaluator源代碼)

val binaryClassificationEvaluator = new BinaryClassificationEvaluator() def printlnMetric(metricName: String): Unit = {println(metricName + " = " + binaryClassificationEvaluator.setMetricName(metricName).evaluate(predictions)) }printlnMetric("areaUnderROC") printlnMetric("areaUnderPR")

4、模型評估方法3:自己寫

(5)分類正確且分類為1的樣本數量 TP 146188

predictions.filter($"label" === $"prediction").filter($"label"===1).count

(6)分類正確且分類為0的樣本數量 TN 9280601

predictions.filter($"label" === $"prediction").filter($"label"===0).count

(7)分類錯誤且分類為1的樣本數量 FP 7812713

predictions.filter($"label" !== $"prediction").filter($"prediction"===1).count

(8)分類錯誤且分類為0的樣本數量 FN 80404

predictions.filter($"label" !== $"prediction").filter($"prediction"===0).count準確率:( 146188+ 9280601)/( 146188+ 9280601+7812713+80404.0)=0.5442748361336373 召回率:TP/(TP+FN) = 146188/( 146188+80404) =0.6451595819799464 精度:TP/(TP+FP)= 146188/( 146188+7812713.0) =0.01836786259811499

設置最優threshole之后:

準確率:( 213151+ 2803234)/( 213151+ 2803234+14290080+13441.0)=0.1741571230236469 召回率:TP/(TP+FN) = 213151/( 213151+13441) = 0.9406819305182884 精度:TP/(TP+FP)= 213151/( 213151+14290080.0) =0.014696794114359759 準確率54.4%,召回率64.5%,精度18.3%

5、模型調優

這些參數用于設置列名:
setPredictionCol setLabelCol setProbabilityCol setFeaturesCol setWeightCol setRawPredictionCol
以下這些參數用于設置各種學習參數:

(1)setThreshold/setThresholds

setThreshold設置了閾值,大于這個閾值則分類為1,小于則分類為0。轉為值為0.5
setThresholds用于多分類的情況。

以下示例如何找到最優的threshold。

// Get the threshold corresponding to the maximum F-Measure and rerun LogisticRegression with // this selected threshold. //不同的閾值,計算不同的F1,然后通過最大的F1找出并重設模型的最佳閾值。 val fMeasure = binarySummary.fMeasureByThreshold val maxFMeasure = fMeasure.select(functions.max("F-Measure")).head().getDouble(0);//獲得最大的F1值 val bestThreshold = fMeasure.where(fMeasure.col("F-Measure").equalTo(maxFMeasure)).select("threshold").head().getDouble(0);//找出最大F1值對應的閾值(最佳閾值) lrModel.setThreshold(bestThreshold);//并將模型的Threshold設置為選擇出來的最佳分類閾值

(2)setRegParam:正則化參數
默認值為0.
正則化參數主要是為了解決過度擬合的問題,詳細理論請參考《邏輯回歸原理與實現》

lrModel.getRegParam // 正則化參數>=0

當設置正則化參數為0~3時,計算檢驗樣本,分別得到召回率與精度如下:

0.0 0.9205683447008687 0.9527194528239897 0.05 0.9239064559263499 0.958818361519877 0.1 0.9223865090282922 0.9569734714653572 0.15 0.9201643081836635 0.9546774724781172 0.2 0.9183653836903926 0.952554380363201 0.3 0.9148060143721561 0.9484744816030162 0.5 0.9084953487700936 0.9416216324007418 3.0 0.8789044838433493 0.9037430510218213

這組數據中可以看出當正則化參數為0.05時,分類效果最優。。但對于其它數據可能就會有過擬合的問題了,所以要視樣本情況而調整正則化參數。

正則化參數過小,則可能過擬合。過大則可能欠擬合。

(3) setMaxIter
最大的迭代次數,當達到這個次數時,不管是否已經收斂到最小誤差,均會結束訓練。默認值為100。

(4) setTol
算法的收斂閾值,當小于這個值時,結束迭代計算,默認值為1.0E-6。

(5)setStandardization
是否對特征值進行標準化,默認為true。

(6)setElasticNetParam
默認值為0.0,這是一個L2懲罰。用于防止過擬合的另一種方式,理論詳見《L0/L1/L2》
對于α= 0,懲罰是L2懲罰。 對于alpha = 1,它是一個L1懲罰。 對于0 <α<1,懲罰是L1和L2的組合。

(7)setFitIntercept
Param for whether to fit an intercept term.

舉個簡單情況 y= w1 * x + w0,這里w0就是一個截距,調節直線不穿過原點。從這個角度想想,w0確實不應該正則化,值是多少就多少。
如果設置為false,則intercept=0.0,否則為實際值。

(8) setFamily
這是2.1才引入的參數,可設置二項式還是多項式模型。

(四)使用模型預測數據

1、基本使用

關鍵在于如何根據一個文件創建一個RDD,然后再轉成DataFrame。

import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.linalg.Vector import scala.collection.mutabledef lineToVector(line:String ):Vector={val seq = new mutable.Queue[(Int,Double)]val content = line.split(" ");for( s <- content){val index = s.split(":")(0).toInt -1 //notice:看下面的大坑val value = s.split(":")(1).toDoubleseq += ((index,value))}return Vectors.sparse(999999, seq)}val df = sc.sequenceFile[org.apache.hadoop.io.LongWritable, org.apache.hadoop.io.Text]("/data/gamein/gameall_sdc/wh/gameall.db/edt_udid_label_format/ds=20170312/001006_0").map(line=>line._2).map(line => (line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1)))).toDF("udid", "features")val predictionResult = lrModel.transform(df)predictionResult.show()

2、保存計算結果

predictionResult.select($"udid",$"probability").rdd.saveAsTextFile("/tmp/ljhn1829/test")

3、保存模型與重新加載模型

model.save("/tmp/ljhn1829/model") val model2 = NaiveBayesModel.load("/tmp/ljhn1829/model")

4、關于Vector的維度數量的說明

用于訓練模型時的維度數據必須與預測時使用的維度數量相同

如上面的:

val data = spark.read.format("libsvm").option("numFeatures","9999999").load("/tmp/ljhn1829/aplus/training_data3")

如果不指定option的話,則維度數量會以實際出現值的列維度數量相同

return Vectors.sparse(9999999, seq)

如果上述二者不相同,則會出現以下異常:

Caused by: java.lang.IllegalArgumentException: requirement failed: You may not write an element to index 804201 because the declared size of your vector is 144109at scala.Predef$.require(Predef.scala:224)at org.apache.spark.ml.linalg.Vectors$.sparse(Vectors.scala:219)at lineToVector(<console>:55)at $anonfun$4.apply(<console>:50)at $anonfun$4.apply(<console>:50)at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:84)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)So I change return Vectors.sparse(144109, seq)to return Vectors.sparse(804202, seq)Another error occurs:Caused by: java.lang.IllegalArgumentException: requirement failed: The columns of A don't match the number of elements of x. A: 144109, x: 804202at scala.Predef$.require(Predef.scala:224)at org.apache.spark.ml.linalg.BLAS$.gemv(BLAS.scala:521)at org.apache.spark.ml.linalg.Matrix$class.multiply(Matrices.scala:110)at org.apache.spark.ml.linalg.DenseMatrix.multiply(Matrices.scala:176)

5、 一個超級大坑:索引從0開始還是從1開始

libsvm格式中,索引是從1開始的,但spark load svm的時候會將它改為從0開始,即將文件中所有的索引ID-1.

另一方面, 自己讀取文件中的index時,一般使用的就是維度的index!!!
此時,注意將讀取到的index-1,如上面的代碼:

val index = s.split(":")(0).toInt -1 //notice:看下面的大坑

(五)各個算法間的結果對比

準備需要用于測試的數據:

SET hive.exec.compress.output=false; create external table ljh_g60_aplus_compare_data (udid string,ip string,label string ) row format delimited fields terminated by '\t' STORED AS textfile location '/tmp/ljhn1829/compare/ljh_g60_aplus_compare_data';insert overwrite table ljh_g60_aplus_compare_data select udid,ip_data,if(isnew=1,1,0) from ftrl_train_g60data where ip_data is not null;

計算預測結果:

val df = sc.textFile("/tmp/ljhn1829/compare/ljh_g60_aplus_compare_data").map(line => (line.toString.split("\t")(0),lineToVector(line.toString.split("\t")(1)))).toDF("udid", "features")val predictionResult = lrModel.transform(df)predictionResult.show()

將spark的計算結果保存在一個hive表中(不需要預先創建表):

默認保存為parquet格式:

val options = Map("path" -> "/tmp/ljhn1829/compare/ljh_g60_aplus_predict_result1") predictionResult.write.options(options).mode(SaveMode.Append).saveAsTable("ljh_g60_aplus_predict_result1")

試一下text格式(未測試):

val options = Map("path" -> "/tmp/ljhn1829/compare/ljh_g60_aplus_predict_result2") predictionResult.write.format("text").options(options).mode(SaveMode.Append).saveAsTable("ljh_g60_aplus_predict_result2")

保存后表的結構如下:

udid string features struct<type:tinyint,size:int,indices:array<int>,values:array<double>> rawprediction struct<type:tinyint,size:int,indices:array<int>,values:array<double>> probability struct<type:tinyint,size:int,indices:array<int>,values:array<double>> prediction double

其中幾個字段是struct類型,可以直接取其中的某些字段值:

select udid, probability.values[1],prediction from ljh_g60_aplus_predict_result1 limit 10;SET hive.exec.compress.output=false; create external table udid_ftrl_predict_01g60train2 (udid string,label int,predict double,ip_data string ) row format delimited fields terminated by '\t' STORED AS textfile location '/user/gzquyajun/test/label/udid_ftrl_predict_01g60train/';

總的正確數量:228836
1、算法1正確的結果數量:

select count(1) from (select udid,predict,label from udid_ftrl_predict_01g60train2 order by predict desc limit 5000000 ) t where t.udid in (select udid from ljh_g60_aplus_compare_data where label='1');

116863 180318

2、算法2正確的結果:

select count(1) from (select udid, probability.values[1] as p from ljh_g60_aplus_predict_result1 order by p desc limit 5000000 ) t where t.udid in (select udid from ljh_g60_aplus_compare_data where label='1');

77876 132865

3、2個算法重合的數量

select count(1) from (select udid,predict from udid_ftrl_predict_01g60train2 order by predict desc limit 5000000) t1, (select udid, probability.values[1] as p from ljh_g60_aplus_predict_result1 order by p desc limit 5000000) t2 where t1.udid=t2.udid;

4、2個算法正確結果的重合數量

select count(1) from (select udid from (select udid,predict,label from udid_ftrl_predict_01g60train2 order by predict desc limit 5000000 ) t where t.label=1 limit 10) a1, (select t3.udid from (select udid,predict from udid_ftrl_predict_01g60train2 order by predict desc limit 5000000) t3, (select udid, probability.values[1] as p from ljh_g60_aplus_predict_result1 order by p desc limit 5000000) t2 where t3.udid=t2.udid) a2 where a1.udid=a2.udid;

總結

以上是生活随笔為你收集整理的spark 2.x ML概念与应用的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。