日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

如何在Java应用里集成Spark MLlib训练好的模型做预测

發布時間:2024/1/17 java 35 豆豆
生活随笔 收集整理的這篇文章主要介紹了 如何在Java应用里集成Spark MLlib训练好的模型做预测 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

前言

昨天媛媛說,你是不是很久沒寫博客了。我說上一篇1.26號,昨天3.26號,剛好兩個月,心中也略微有些愧疚。今天正好有個好朋友問,怎么在Java應用里集成Spark MLlib訓練好的模型。在StreamingPro里其實都有實際的使用例子,但是如果有一篇文章講述下,我覺得應該能讓更多人獲得幫助

追本溯源

記得我之前吐槽過Spark MLlib的設計,也是因為一個朋友使用了spark MLlib的pipeline做訓練,然后他把這個pipeline放到了spring boot里,結果做預測的時候奇慢無比,一條記錄inference需要30多秒。為什么會這么慢呢?原因是Spark MLlib 是以批處理為核心設計理念的。比如上面朋友遇到的坑是有一部分原因來源于word2vec的transform方法:

@Since("2.0.0")override def transform(dataset: Dataset[_]): DataFrame = {transformSchema(dataset.schema, logging = true)val vectors = wordVectors.getVectors.mapValues(vv => Vectors.dense(vv.map(_.toDouble))).map(identity) // mapValues doesn't return a serializable map (SI-7005)val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)val d = $(vectorSize)

來一條數據(通常API應用都是如此),他需要先獲得vectors(詞到vector的映射)對象,假設你有十萬個詞,

def getVectors: Map[String, Array[Float]] = {wordIndex.map { case (word, ind) =>(word, wordVectors.slice(vectorSize * ind, vectorSize * ind + vectorSize))}}

每次請求他都要做如上調用和計算。接著還需要把這些東西(這個可能就比較大了,幾百M或者幾個G都有可能)廣播出去。

所以注定快不了。

把model集成到Java 服務里實例

假設你使用貝葉斯訓練了一個模型,你需要保存下這個模型,保存的方式如下:

val nb = new NaiveBayes() //做些參數配置和訓練過程 ..... //保存模型 nb.write.overwrite().save(path + "/" + modelIndex)

接著,在你的Java/scala程序里,引入spark core,spark mllib等包。加載模型:

val model = NaiveBayesModel.load(tempPath)

這個時候因為要做預測,我們為了性能,不能直接調用model的transform方法,你仔細觀察發現,我們需要通過反射調用兩個方法,就能實現分類。第一個是predictRaw方法,該方法輸入一個向量,輸出也為一個向量。我們其實不需要向量,我們需要的是一個分類的id。predictRaw 方法在model里,但是沒辦法直接調用,因為是私有的:

override protected def predictRaw(features: Vector): Vector = {$(modelType) match {case Multinomial =>multinomialCalculation(features)case Bernoulli =>bernoulliCalculation(features)case _ =>// This should never happen.throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")}}

所以我們需要通過反射來完成:

val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]

現在我們已經得到了predctRaw的結果,接著我們要用raw2probability 把向量轉化為一個概率分布,因為spark 版本不同,該方法的簽名也略有變化,所以可能要做下版本適配:

val raw2probabilityMethod = if (sparkSession.version.startsWith("2.3")) "raw2probabilityInPlace" else "raw2probability" val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]

raw2probability 其實也還是一個向量,這個向量的長度是分類的數目,每個位置的值是概率。所以所以我們只要拿到最大的那個概率值所在的位置就行:

val categoryId = raw2probability.argmax

這個時候categoryId 就是我們預測的分類了。

截止到目前我們已經完成了作為一個普通java/scala 方法的調用流程。如果我不想用在應用程序里,而是放到spark 流式計算里呢?或者批處理也行,那么這個時候你只需要封裝一個UDF函數即可:

val models = sparkSession.sparkContext.broadcast(_model.asInstanceOf[ArrayBuffer[NaiveBayesModel]]) val f2 = (vec: Vector) => {models.value.map { model =>val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]//model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]raw2probability}}sparkSession.udf.register(name , f2)

上面的例子可以參考StreamingPro 中streaming.dsl.mmlib.algs.SQLNaiveBayes的代碼。不同的算法因為內部實現不同,我們使用起來也會略微有些區別。

總結

Spark MLlib學習了SKLearn里的transform和fit的概念,但是因為設計上還是遵循批處理的方式,實際部署后會有很大的性能瓶頸,不適合那種數據一條一條過來需要快速響應的預測流程,所以需要調用一些內部的API來完成最后的預測。



作者:祝威廉
鏈接:https://www.jianshu.com/p/3c038027ff61
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。

總結

以上是生活随笔為你收集整理的如何在Java应用里集成Spark MLlib训练好的模型做预测的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。