如何在Java应用里集成Spark MLlib训练好的模型做预测
前言
昨天媛媛說,你是不是很久沒寫博客了。我說上一篇1.26號,昨天3.26號,剛好兩個月,心中也略微有些愧疚。今天正好有個好朋友問,怎么在Java應用里集成Spark MLlib訓練好的模型。在StreamingPro里其實都有實際的使用例子,但是如果有一篇文章講述下,我覺得應該能讓更多人獲得幫助
追本溯源
記得我之前吐槽過Spark MLlib的設計,也是因為一個朋友使用了spark MLlib的pipeline做訓練,然后他把這個pipeline放到了spring boot里,結果做預測的時候奇慢無比,一條記錄inference需要30多秒。為什么會這么慢呢?原因是Spark MLlib 是以批處理為核心設計理念的。比如上面朋友遇到的坑是有一部分原因來源于word2vec的transform方法:
@Since("2.0.0")override def transform(dataset: Dataset[_]): DataFrame = {transformSchema(dataset.schema, logging = true)val vectors = wordVectors.getVectors.mapValues(vv => Vectors.dense(vv.map(_.toDouble))).map(identity) // mapValues doesn't return a serializable map (SI-7005)val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors)val d = $(vectorSize)來一條數據(通常API應用都是如此),他需要先獲得vectors(詞到vector的映射)對象,假設你有十萬個詞,
def getVectors: Map[String, Array[Float]] = {wordIndex.map { case (word, ind) =>(word, wordVectors.slice(vectorSize * ind, vectorSize * ind + vectorSize))}}每次請求他都要做如上調用和計算。接著還需要把這些東西(這個可能就比較大了,幾百M或者幾個G都有可能)廣播出去。
所以注定快不了。
把model集成到Java 服務里實例
假設你使用貝葉斯訓練了一個模型,你需要保存下這個模型,保存的方式如下:
val nb = new NaiveBayes() //做些參數配置和訓練過程 ..... //保存模型 nb.write.overwrite().save(path + "/" + modelIndex)接著,在你的Java/scala程序里,引入spark core,spark mllib等包。加載模型:
val model = NaiveBayesModel.load(tempPath)這個時候因為要做預測,我們為了性能,不能直接調用model的transform方法,你仔細觀察發現,我們需要通過反射調用兩個方法,就能實現分類。第一個是predictRaw方法,該方法輸入一個向量,輸出也為一個向量。我們其實不需要向量,我們需要的是一個分類的id。predictRaw 方法在model里,但是沒辦法直接調用,因為是私有的:
override protected def predictRaw(features: Vector): Vector = {$(modelType) match {case Multinomial =>multinomialCalculation(features)case Bernoulli =>bernoulliCalculation(features)case _ =>// This should never happen.throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")}}所以我們需要通過反射來完成:
val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]現在我們已經得到了predctRaw的結果,接著我們要用raw2probability 把向量轉化為一個概率分布,因為spark 版本不同,該方法的簽名也略有變化,所以可能要做下版本適配:
val raw2probabilityMethod = if (sparkSession.version.startsWith("2.3")) "raw2probabilityInPlace" else "raw2probability" val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]raw2probability 其實也還是一個向量,這個向量的長度是分類的數目,每個位置的值是概率。所以所以我們只要拿到最大的那個概率值所在的位置就行:
val categoryId = raw2probability.argmax這個時候categoryId 就是我們預測的分類了。
截止到目前我們已經完成了作為一個普通java/scala 方法的調用流程。如果我不想用在應用程序里,而是放到spark 流式計算里呢?或者批處理也行,那么這個時候你只需要封裝一個UDF函數即可:
val models = sparkSession.sparkContext.broadcast(_model.asInstanceOf[ArrayBuffer[NaiveBayesModel]]) val f2 = (vec: Vector) => {models.value.map { model =>val predictRaw = model.getClass.getMethod("predictRaw", classOf[Vector]).invoke(model, vec).asInstanceOf[Vector]val raw2probability = model.getClass.getMethod(raw2probabilityMethod, classOf[Vector]).invoke(model, predictRaw).asInstanceOf[Vector]//model.getClass.getMethod("probability2prediction", classOf[Vector]).invoke(model, raw2probability).asInstanceOf[Vector]raw2probability}}sparkSession.udf.register(name , f2)上面的例子可以參考StreamingPro 中streaming.dsl.mmlib.algs.SQLNaiveBayes的代碼。不同的算法因為內部實現不同,我們使用起來也會略微有些區別。
總結
Spark MLlib學習了SKLearn里的transform和fit的概念,但是因為設計上還是遵循批處理的方式,實際部署后會有很大的性能瓶頸,不適合那種數據一條一條過來需要快速響應的預測流程,所以需要調用一些內部的API來完成最后的預測。
作者:祝威廉
鏈接:https://www.jianshu.com/p/3c038027ff61
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權并注明出處。
總結
以上是生活随笔為你收集整理的如何在Java应用里集成Spark MLlib训练好的模型做预测的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spark函数详解系列--RDD基本转换
- 下一篇: java美元兑换,(Java实现) 美元