用户画像之Spark ML实现
? ? ? ? ? ? ? ? ? ? ? ? ? 用戶畫像之Spark ML實現
1?Spark ML簡單介紹
Spark ML是面向DataFrame編程的。Spark的核心開發是基于RDD(彈性分布式數據集),但是RDD,但是RDD的處理并不是非常靈活,如果要做一些結構化的處理,將RDD轉換成DataFrame,DataFrame實際上就是行對象的RDD+schema,類似于原本的文本數據,加上schema,做一下結構的轉換就變成數據庫里面的表,表是有元數據的,有字段有類型。所以DataFrame處理起來更加靈活。
要進行機器學習是有一系列的流程,通常離線的處理現有一組數據集,然后進行預處理特征工程,完成之后分成訓練集合測試集,基于訓練集訓練模型,然后選擇算法,進行評估..這是可以形成一個管道的,整體是一個DAG有向無環圖。
其實整個進行模型算法訓練的過程就是一個管道,管道中就會有各種各樣的組件,這些組件總體來說可以分成兩類,①第一個是Transformers:transform()用于轉換,把一個DataFrame轉換為另一個DataFrame,如把原本的數據集拆分成測試集,那就是DataFrame的轉換,像分詞,抽樣,模型的測試都是非常常見的轉換操作,②第二種類型就是Estimators:fit()應用在DF上生成一個轉換器算法,Estimators評估器,用到的函數是fit(),Estimators是為了生成一個轉換器,在機器學習中會用到一些算法,需要去建模,根據訓練集得到模型,模型本質上就是轉換器,進行預測是用的這個模型進行預測,所以轉換是基于這個模型進行預測,所以轉換就是基于這個模型的轉換器轉換時他的實例來進行轉換。
2 Spark ML的工作流程
首先進行預處理,包括模型訓練的整個過程是一個管道pipline,這個pipline的目的是為了得到一個Estimator,即得到一個模型,假如說用邏輯回歸,輸入的數據是普通的文本,首先進行Toknizer分詞,分完次后計算他的詞頻,這兩個本質上否是transform的操作,接下來就要創建一個邏輯回歸的實例,本質上就是一個Estimator,得到一個轉換器。
模型有了接下來就要做預測,不管是訓練集還是測試集,都是要進行分詞,計算詞頻的,這個piplineModel整個都是transform操作,這個模型邏輯回歸就是上一步通過訓練的到的模型。
參數是所有轉換器和評估器共享的一個公共api,參數名Param是一個參數,可以通過setter單獨定義;也可以通過ParamMap定義一個參數的集合(parameter,value),傳遞參數的兩種方式:①通過setter為實例設置參數②傳遞ParamMap給fit或者transform方法
3 Estimator,Transformer,Param使用案例
(1)準備帶標簽和特征的數據
(2)創建邏輯回歸的評估器
(3)使用setter方法設置參數
(4)使用存儲在lr中的參數來訓練一個模型
(5)使用ParamMap選擇指定的參數
(6)準備測試數據
(7)預測結果
代碼具體實現
(1)準備帶標簽和特征的數據
任何應用首先要把需要的類通過import引入,性別預測是分類問題,選擇邏輯回歸
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.param.ParamMap import org.apache.spark.ml.linalg.{Vector,Vectors} import org.apache.spark.sql.Row定義一個初始的DataFrame,通過sqlContext創建,用Seq序列的方式創建一個集合,第一個參數是標簽即目標值,后面的為特征,
val sqlContext=new org.apache.spark.sql.SQLContext(sc) val training = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(1.0,2.1,1.1)),(0.0, Vectors.dense(3.0,2.0,-2.0)),(0.0, Vectors.dense(3.0,0.3,1.0)),(1.0, Vectors.dense(1.0,1.2,-1.5)) )).toDF("label","features")(2)創建邏輯回歸的評估器,設置參數
val lr = new LogisticRegression() //評估器會帶一些默認的參數,通過explainParams()查看 println(lr.explainParams()) //通過set方式修改迭代次數和正則化參數 lr.setMaxIter(10).setRegParam(0.01)//定義模型, val model1 = lr.fit(training) //查看模型的參數 model1.parent.extractParamMap//通過ParamMap設置參數 val paramMap = ParamMap(lr.maxIter -> 20). put(lr.maxIter,30). put(lr.regParam -> 0.1, lr.threshold -> 0.55)val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") //將兩個ParamMap對象合并 val paramMapCombined = paramMap ++ paramMap2//根據ParamMap設置的參數定義模型, val model2 = lr.fit(training, paramMapCombined) model2.parent.extractParamMap(3)準備測試數據
val test = sqlContext.createDataFrame(Seq((1.0, Vectors.dense(-1.2,1.8,1.3)),(0.0, Vectors.dense(4.0,1.8,-0.1)),(1.0, Vectors.dense(0.0,1.9,-1.5)) )).toDF("label","features")(4)預測結果
//調用模型1 model1.transform(test).select("label","features","probability","prediction").collect().foreach{case Row(label: Double, features: Vector, probability: Vector, prediction: Double) => println(s"($features, $label) -> probability=$probability, prediction=$prediction")}4 構建Pipline和保存Pipline
步驟:
(1)準備訓練的文檔
(2)配置ML管道,包含三個stage:Tokenizer,HashingTF和LR
(3)安裝管道到數據上
(4)保存管道到磁盤,包括安裝好的和未安裝好的
(5)加載管道
(6)準備測試文檔
(7)預測結果
代碼實現:
(1)引入需要的類
//用的數邏輯回歸 import org.apache.spark.ml.classification.LogisticRegression //因為特征工程處理的是特征向量,所以需要Vector,輸入輸出會用到 import org.apache.spark.ml.linalg.Vector //行對象,為了輸出美化 import org.apache.spark.sql.Row //需要分詞需要Tokenizer,需要轉換計算詞頻需要HashingTF import org.apache.spark.ml.feature.{Tokenizer,HashingTF} //需要Pipeline將多個Transformers和Estimators連接起來以確定一個ML工作流程 import org.apache.spark.ml.{Pipeline,PipelineModel}(2)準備數據集
//含Sprak的為一類 val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0) )).toDF("id","text","label")(3)定義管道中的Tokenizer,HashingTF,LR這三個組件
//創建tokenizer分詞器 //setInputCol指明輸入DataFrame中的哪一列是被處理的,輸入參數是Dataframe中存在的列名 //setOutputCol設置新增加列的名字,及對輸入的列變換后會產生一個新列,該方法設置增加新列的列名 val tokenizer = new Tokenizer(). setInputCol("text"). setOutputCol("words")//創建hashingTF詞頻統計,他的inputcolumn是tokenizerget出來的 //setNumFeatures設置特征值的數量 val hashingTF = new HashingTF(). setNumFeatures(1000). setInputCol(tokenizer.getOutputCol). setOutputCol("features")//創建邏輯回歸對象,setMaxIter設置邏輯回歸的迭代次數,setRegParam設置正則化 val lr = new LogisticRegression(). setMaxIter(10).setRegParam(0.01)(4)定義管道
//創建管道,setStages將各個計算階段按照tokenizer,hashingTF,lr順序,pipeline是沒有安裝好的管道 val pipeline = new Pipeline(). setStages(Array(tokenizer,hashingTF,lr))//使用pipeline構建模型,model是安裝好的管道 val model = pipeline.fit(training)(5)保存管道到磁盤
pipeline.save("/portrait/sparkML-LRpipeline") model.save("/portrait/sparkML-LRmodel")(6)加載模型
//加載保存到磁盤中模型 val model2 = PipelineModel.load("/portrait/sparkML-LRmodel")(7)準備測試文檔,通過回歸預測,沒有測試集
val test = sqlContext.createDataFrame(Seq((4L, "learn Spark"),(5L, "hadoop hive"),(6L, "bigdata hdfs a"),(7L, "apache Spark") )).toDF("id","text")(8)預測結果
model.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}5 通過網格參數和交叉驗證進行模型調優
所謂的調優就是怎樣根據數據選擇好的模型,或者為整個模型整個管道選擇好的參數,這里是關注參數的調優,模型就選擇邏輯回歸。參數調優就是給一組參數而不是一個參數,讓模型自己選擇。調優是基于管道整體進行調優。
(1)準備訓練的文檔
(2)配置ML管道,包含三個stage:Tokenizer,HashingTF和LR
(3)使用ParamGridBuilder構建一個參數網格
(4)使用CrossValidator來選擇模型和參數,CrossValidator需要一個estimator,一個評估器參數集合,和一個evaluator
(5)運行交叉驗證,選擇最好的參數集
(6)準備測試數據
(7)預測結果
代碼實現過程:
(1)引入需要的包
//用的數邏輯回歸 import org.apache.spark.ml.classification.LogisticRegression //因為特征工程處理的是特征向量,所以需要Vector,輸入輸出會用到 import org.apache.spark.ml.linalg.Vector //行對象,為了輸出美化 import org.apache.spark.sql.Row //需要分詞需要Tokenizer,需要轉換計算詞頻需要HashingTF import org.apache.spark.ml.feature.{Tokenizer,HashingTF} //需要Pipeline將多個Transformers和Estimators連接起來以確定一個ML工作流程 import org.apache.spark.ml.{Pipeline,PipelineModel} //因為是二元的,所以用BinaryClassificationEvaluator評估器 import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator //使用交叉校驗CrossValidator,把所有參數排列組合,交叉進行校驗。ParamGridBuilder參數網格 import org.apache.spark.ml.tuning.{CrossValidator,ParamGridBuilder} //需要引入SQLContext import org.apache.spark.sql.SQLContext(2)準備數據
val sqlContext=new SQLContext(sc) val training = sqlContext.createDataFrame(Seq((0L, "Spark Write applications quickly in Java, Scala, Python, R, and SQL.", 1.0),(1L, "Live and learn", 0.0),(2L, "Spark Run workloads 100x faster.", 1.0),(3L, "study hard and make progress every day", 0.0),(4L, "Rdd Spark who", 1.0),(5L, "good good study", 0.0),(6L, "Spark faster", 1.0),(7L, "day day up", 0.0),(8L, "Spark program", 1.0),(9L, "hello world", 0.0),(10L, "hello Spark", 1.0),(11L, "hi how are you", 0.0) )).toDF("id","text","label")(3)構建管道
//創建tokenizer分詞器 //setInputCol指明輸入DataFrame中的哪一列是被處理的,輸入參數是Dataframe中存在的列名 //setOutputCol設置新增加列的名字,及對輸入的列變換后會產生一個新列,該方法設置增加新列的列名 val tokenizer = new Tokenizer(). setInputCol("text"). setOutputCol("words") //創建hashingTF詞頻統計,他的inputcolumn是tokenizerget出來的 //特征值的數量網格調優 val hashingTF = new HashingTF(). setInputCol(tokenizer.getOutputCol). setOutputCol("features") //創建邏輯回歸對象,setMaxIter設置,正則化參數網格調優 val lr = new LogisticRegression(). setMaxIter(10) //創建管道,setStages將各個計算階段按照tokenizer,hashingTF,lr順序,pipeline是沒有安裝好的管道 val pipeline = new Pipeline(). setStages(Array(tokenizer,hashingTF,lr))(4)構建網格參數
//構建網格參數,addGrid添加網格,hashingTF.numFeatures設置hashingTF特征數量, val paramGrid = new ParamGridBuilder(). addGrid(hashingTF.numFeatures, Array(10,100,1000)). addGrid(lr.regParam, Array(0.1,0.01)). build()(5)創建交叉驗證CrossValidator對象
//創建CrossValidator交叉驗證對象,setEstimator設置評估器,setEstimatorParamMaps設置參數集,setEvaluator設置評估器,setNumFolds創建交叉驗證器,他會把訓練集分成NumFolds份,實際生產要比2大 val cv = new CrossValidator(). setEstimator(pipeline). setEstimatorParamMaps(paramGrid). setEvaluator(new BinaryClassificationEvaluator()). setNumFolds(2)(6)根據最優參數構建模型
//構借助參數網格,交叉驗證,選擇最優的參數構建模型 val cvModel = cv.fit(training)(7)添加測試數據
//添加測試集 val test = sqlContext.createDataFrame(Seq((12L, "learn Spark"),(13L, "hadoop hive"),(14L, "bigdata hdfs a"),(15L, "apache Spark") )).toDF("id","text")(8)預測結果
cvModel.transform(test).select("id","text","probability","prediction").collect().foreach{case Row(id: Long, text: String, probability: Vector, prediction: Double) => println(s"($id, $text) -> probability=$probability, prediction=$prediction")}?
6 通過訓練校驗分類來調優模型
前面交叉驗證是把數據分成多份,每一份把所有參數組合計算一次。而校驗分類只需要把每一組參數計算一次,把數據自動分成訓練集合校驗集,這種方式依賴于比較大的數據量,如果數量不夠生成的結果是不可信任的。不像校驗驗證數據集小沒關系會交叉驗證多次,所以即使數據量少但是計算多次,多次的結果足夠評估選出最優的參數。所以TrainValidationSplit需要的數據量就要大,只會計算一次。這個例子采用線性回歸。
與CrossValidator不同,TrainValidationSplit創建一個(訓練,測試)數據集對。 它使用trainRatio參數將數據集分成這兩個部分。 例如,trainRatio = 0.75,TrainValidationSplit將生成訓練和測試數據集對,其中75%的數據用于訓練,25%用于驗證。
步驟:
(1)準備訓練和測試數據
(2)使用ParamGridBuilder構建一個參數網格
(3)使用TrainValidationSplit來選擇模型和參數,CrossValidator需要一個estimator,一個評估器參數集合,和一個evaluator
(4)運行校驗分類選擇最好的參數
(5)在測試數據上做測試,模型是參數組合中執行最好的一個
//使用線性回歸求解 import org.apache.spark.ml.regression.LinearRegression 因為是回歸問題,所以用RegressionEvaluator回歸評估器 import org.apache.spark.ml.evaluation.RegressionEvaluator //使用ParamGridBuilder參數網格和,TrainValidationSplit import org.apache.spark.ml.tuning.{TrainValidationSplit,ParamGridBuilder} //需要引入SQLContext import org.apache.spark.sql.SQLContextval = sqlContext = new SQLContext(sc) val data = sqlContext.read.format("libsvm").load("file:/data/sample_linear_regression_data.txt")//randomSplits隨機拆分,seed隨機種子 val Array(training, test) = data.randomSplit(Array(0.75, 0.25), seed=12345)//創建線性回歸 val lr = new LinearRegression()//elasticNetParam是Elastic net (回歸)參數,取值介于0和1之間。 //fitIntercept是否允許階段,默認是true。regParam參數定義規范化項的權重 val paramGrid = new ParamGridBuilder(). addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)). addGrid(lr.fitIntercept). addGrid(lr.regParam, Array(0.1,0.01)). build()//訓練校驗的比例setTrainRatio val trainValidationSplit = new TrainValidationSplit(). setEstimator(lr). setEstimatorParamMaps(paramGrid). setEvaluator(new RegressionEvaluator). setTrainRatio(0.8)val model = trainValidationSplit.fit(training)model.transform(test).select("features","label","prediction").show()?
總結
以上是生活随笔為你收集整理的用户画像之Spark ML实现的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode第 46 场双周赛
- 下一篇: R语言第四讲 之R语言数据类型