spark.mllib:NaiveBayes
樸素貝葉斯模型簡述:
貝葉斯模型通過使用后驗(yàn)概率和類的概率分布來估計(jì)先驗(yàn)概率,具體的以公式表達(dá)為
P(Y)可以使用訓(xùn)練樣本的類分布進(jìn)行估計(jì)。如果X是單特征也很好估計(jì),但如果X={x1,x2,..,xn}等n個(gè)特征構(gòu)成,那估計(jì)n個(gè)特征的聯(lián)合概率分布P(X)=P(x1,x2,...,xn)將變得非常困難。由于貝葉斯模型的參數(shù)難于估計(jì),限制了其的應(yīng)用。
樸素貝葉斯模型是貝葉斯模型的簡化版本,通過假設(shè)特征之間獨(dú)立不相關(guān),那么?
通過求解每個(gè)特征的分布和每個(gè)特征的后驗(yàn)概率來近似特征的聯(lián)合概率分布和特征的后驗(yàn)概率。當(dāng)然,通常情況下,特征相互獨(dú)立的假設(shè)不會(huì)成立,這里只是模型復(fù)雜度和模型精度的一個(gè)權(quán)衡。這樣樣本x屬于第i類和第k類的概率分別為:
由于P(X)聯(lián)合概率分布對(duì)每個(gè)類都是相同的,可以不求。
Spark下樸素貝葉斯的具體實(shí)現(xiàn):
NaiveBayesModel
NaiveBayesModel保存了樸素貝葉斯模型的參數(shù),繼承自ClassificationModel,并重寫了predict方法。
先看看NaiveBayesModel的貝葉斯模型參數(shù),
labels:類別編號(hào)
pi: 類的先驗(yàn)概率P(Y)的對(duì)數(shù)值
theta: 條件概率P(X|Y)的對(duì)數(shù)值
modelType:Multinomial,Bernoulli:實(shí)際上依據(jù)特征的分布不同,樸素貝葉斯又劃分為多個(gè)子類別,這實(shí)際上又是對(duì)特征的一種假設(shè)來簡化建模。
如果特征近似服從多項(xiàng)式分布,即特征只能取N個(gè)值,取到每個(gè)值的概率為pi,則p1+p2+..+pn=1?;诖思僭O(shè)構(gòu)建的貝葉斯分類稱為Multinomial NaiveBayesModel,典型的例子是基于詞頻向量的文本分類。
如果特征服從伯努利分布,基于此假設(shè)構(gòu)建的貝葉斯分類稱為 Bernoulli NaiveBayesModel,典型的例子是基于one-hot構(gòu)建的文本分類。
不同的特征分布假設(shè),將調(diào)用不同的概率計(jì)算函數(shù):
private val (thetaMinusNegTheta, negThetaSum) = modelType match {
? case Multinomial => (None, None)
? case Bernoulli =>
? ? val negTheta = thetaMatrix.map(value => math.log(1.0 - math.exp(value)))//事件失敗的概率
? ? val ones = new DenseVector(Array.fill(thetaMatrix.numCols) {1.0})
? ? //事件不失敗的概率
? ? val thetaMinusNegTheta = thetaMatrix.map { value =>
? ? ? value - math.log(1.0 - math.exp(value))
? ? }
? ? (Option(thetaMinusNegTheta), Option(negTheta.multiply(ones)))
? case _ =>
? ? // This should never happen.
? ? throw new UnknownError(s"Invalid modelType: $modelType.")
}
//特征的分布不一樣,其估計(jì)也不一樣
@Since("1.0.0")
override def predict(testData: Vector): Double = {
? modelType match {
? ? case Multinomial =>
? ? ? labels(multinomialCalculation(testData).argmax)
? ? case Bernoulli =>
? ? ? labels(bernoulliCalculation(testData).argmax)
? }
}
另外為方便計(jì)算和避免小數(shù)值,根據(jù)對(duì)數(shù)運(yùn)算法則,可將乘積運(yùn)算轉(zhuǎn)換為加法運(yùn)算,后驗(yàn)概率的估計(jì)值為
//計(jì)算每個(gè)類別的概率p(yi|X1,x2)=p(yi)*p(x1|yi)*p(x2|yi)*.../P(X) ?實(shí)際計(jì)算的是log(p(yi)) + log(p(x1|yi)) +***
//全概率P(X)對(duì)每個(gè)類別一致,可以不算
private def multinomialCalculation(testData: Vector) = {
? val prob = thetaMatrix.multiply(testData)//求出
? BLAS.axpy(1.0, piVector, prob)
? prob
}
private def bernoulliCalculation(testData: Vector) = {
? testData.foreachActive((_, value) =>
? ? if (value != 0.0 && value != 1.0) {//伯努利事件的結(jié)果只有兩種狀態(tài)
? ? ? throw new SparkException(
? ? ? ? s"Bernoulli naive Bayes requires 0 or 1 feature values but found $testData.")
? ? }
? )
? val prob = thetaMinusNegTheta.get.multiply(testData)
? BLAS.axpy(1.0, piVector, prob)
? BLAS.axpy(1.0, negThetaSum.get, prob)
? prob
}
NaiveBayes
再來看NaiveBayesModel的參數(shù)估計(jì),參數(shù)估計(jì)由NaiveBayes類開始。
NaiveBayes構(gòu)造函數(shù)有個(gè)lambda參數(shù),一般在估計(jì)P(Xi|Y)時(shí),對(duì)于在訓(xùn)練數(shù)據(jù)中沒有出現(xiàn)的Xi,會(huì)得到其估計(jì)P(Xij|Y)=0
在實(shí)際應(yīng)用中,對(duì)于某個(gè)類別沒有出現(xiàn)在樣本集中或者某個(gè)特征沒有出現(xiàn)在某類樣本集中,這個(gè)時(shí)候就需要加入平滑因子lambda去調(diào)整,一般常用拉普拉斯平滑進(jìn)行處理。??
類的分布估計(jì)調(diào)整為
多項(xiàng)式模型下的參數(shù)估計(jì)調(diào)整為:
伯努力模型下參數(shù)估計(jì)調(diào)整為:
樸素貝葉斯模型的訓(xùn)練是在mllib.NaiveBayes中由用戶調(diào)用其run來完成訓(xùn)練的。run方法調(diào)用了ml.NaiveBayes類的trainWithLabelCheck方法來完成參數(shù)估計(jì)的。
接下來看看trainWithLabelCheck進(jìn)行參數(shù)估計(jì)的過程
private[spark] def trainWithLabelCheck(
? ? ? dataset: Dataset[_],
? ? ? positiveLabel: Boolean): NaiveBayesModel = {
? ? if (positiveLabel) {...}
? ? val modelTypeValue = $(modelType)
? ? val requireValues: Vector => Unit = {...}
? ? //估算argmax p(yi)*p(X|Yi) ==> argmax log(p(yi)) + log(p(X|Yi))
? ? //p(yi) = numDocuments in lable i / numDocuments all
? ? //p(X|Yi) = p(X1|Yi)*p(X2|Yi)... ==> log(p(X1|Yi)) + log(p(X2|Yi))
? ? //p(X1|Yi) = featureSum in lable i / featureSum all
? ? //特征數(shù)量
? ? val numFeatures = dataset.select(col($(featuresCol))).head().getAs[Vector](0).size
? ? //特征權(quán)重
? ? val w = if (!isDefined(weightCol) || $(weightCol).isEmpty) lit(1.0) else col($(weightCol))
?
? ? // Aggregates分布式計(jì)算 進(jìn)行文檔和特征的統(tǒng)計(jì)計(jì)數(shù)
? ? //aggregateByKey再collect等價(jià)于aggregateByKeyLocally,返回的是一個(gè)HashMap<lable id,object>
? ? //aggregated具體形式為 [lable i, numDocuments in lable i, a vector contains <feature1Sum,feature2Sum,..>]
? ? val aggregated = dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd
? ? ? .map { row => (row.getDouble(0), (row.getDouble(1), row.getAs[Vector](2)))
? ? ? }.aggregateByKey[(Double, DenseVector)]((0.0, Vectors.zeros(numFeatures).toDense))(//分類別統(tǒng)計(jì)featureSum
? ? ? seqOp = {
? ? ? ? ?case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
? ? ? ? ? ?requireValues(features)
? ? ? ? ? ?BLAS.axpy(weight, features, featureSum)//常數(shù)乘以向量加另一個(gè)向量
? ? ? ? ? ?(weightSum + weight, featureSum)
? ? ? },
? ? ? combOp = {
? ? ? ? ?case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
? ? ? ? ? ?BLAS.axpy(1.0, featureSum2, featureSum1)//featureSum2 + featureSum1
? ? ? ? ? ?(weightSum1 + weightSum2, featureSum1)
? ? ? }).collect().sortBy(_._1)//sortBy lable index
? ? //分類數(shù)
? ? val numLabels = aggregated.length
? ? //總的樣本數(shù)量
? ? val numDocuments = aggregated.map(_._2._1).sum
? ? val labelArray = new Array[Double](numLabels)
? ? //初始化存儲(chǔ)p(yi)的數(shù)組
? ? val piArray = new Array[Double](numLabels)
? ? //用于計(jì)算p(xi|yk)的參數(shù),類別數(shù)numLabels*特征數(shù)量numFeatures大小的數(shù)組
? ? val thetaArray = new Array[Double](numLabels * numFeatures)
? ? val lambda = $(smoothing)//平滑參數(shù)
? ? val piLogDenom = math.log(numDocuments + numLabels * lambda)//這個(gè)是估計(jì)p(yi)的分母,見公式。。。
? ? var i = 0
? ? //迭代aggregated這個(gè)存在本地的HashMap
? ? aggregated.foreach { case (label, (n, sumTermFreqs)) =>
? ? ? labelArray(i) = label
? ? ? piArray(i) = math.log(n + lambda) - piLogDenom //計(jì)算log(p(yi)) , 是(numDocuments in lable i + lambda)/(numDocuments + numLabels * lambda)的對(duì)數(shù)形式
? ? ? val thetaLogDenom = $(modelType) match {//這個(gè)是計(jì)算公式。。。的分母部分
? ? ? ? case Multinomial => math.log(sumTermFreqs.values.sum + numFeatures * lambda)//實(shí)際上上加了一個(gè)平滑因子的
? ? ? ? case Bernoulli => math.log(n + 2.0 * lambda)
? ? ? ? case _ =>
? ? ? ? ? throw new UnknownError(s"Invalid modelType: ${$(modelType)}.")
? ? ? }
? ? ? var j = 0
? ? ? while (j < numFeatures) {
? ? ? ? //第i類別第j個(gè)特征的參數(shù)估計(jì)
? ? ? ? thetaArray(i * numFeatures + j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom //計(jì)算log(p(Xk|Yi))
? ? ? ? j += 1
? ? ? }
? ? ? i += 1
? ? }
? ? val pi = Vectors.dense(piArray) //存儲(chǔ)log(p(yi))的數(shù)組
? ? val theta = new DenseMatrix(numLabels, numFeatures, thetaArray, true)
? ? new NaiveBayesModel(uid, pi, theta).setOldLabels(labelArray)
? }
Spark目前只實(shí)現(xiàn)了基于伯努利分布和二項(xiàng)分布的樸素貝葉斯算法,對(duì)于諸如高斯分布的樸素貝葉斯目前還沒有實(shí)現(xiàn),在需要時(shí)可參照上述兩個(gè)模型的過程來自己實(shí)現(xiàn)(重寫NaiveBayesModel的predict方法和NaiveBayes的參數(shù)估計(jì)方法)。
————————————————
版權(quán)聲明:本文為CSDN博主「大愚若智_」的原創(chuàng)文章,遵循 CC 4.0 BY-SA 版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/zbc1090549839/article/details/68067460
總結(jié)
以上是生活随笔為你收集整理的spark.mllib:NaiveBayes的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark.mllib:回归算法
- 下一篇: 胆囊炎能不能吃吊瓜子