日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

spark数据处理示例一:分类

發布時間:2024/1/23 编程问答 23 豆豆
生活随笔 收集整理的這篇文章主要介紹了 spark数据处理示例一:分类 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

spark數據處理示例一:分類

@(SPARK)[spark, ML]

  • spark數據處理示例一分類
    • 知識點
      • 1slice
      • 2NaN
      • 3mapValue
      • 4groupBy
      • 5state
      • 6isNaN
      • 7scala的range結構
    • 一REPL測試
      • 1數據準備
      • 2啟動spark
      • 3讀入數據并簡單驗證讀入情況
      • 4去除標題行
      • 5提取行中的信息
        • 1定義缺失值的處理
        • 2提取行中的字段
        • 3以case類對象的形式返回分析結果
        • 4使用parse函數分析數據
      • 6聚合無效
      • 7統計true和false的數量
      • 7連續變量的概要統計
    • 二代碼應用
      • 1case類MatchData
      • 2載入數據
      • 3去除標題行
      • 4缺失值的處理
      • 5將每一行解釋為一個MatchData對象
      • 6統計true和false的數量
      • 7將9個屬性的基本統計信息輸出
      • 8定義NAStatCounter
        • 1變量
        • 2add方法
        • 3toString方法
        • 4apply方法
        • 5NAStatCounter的完整代碼
      • 9計算每個屬性的NAStatCounter
      • 10statsWithMissing
      • 11計算每個屬性的缺失數量及2種分類的平均值差異
      • 12建立評分模型
      • 13驗證模型
      • 完整代碼

參考spark高級數據分析第2章

知識點

1、slice

2、NaN

3、mapValue\

4、groupBy

5、state

6、isNaN

7、scala的range結構

本項目根據訓練數據,找出2個某個數據的類型(應該是true還是false),并用于下一步的預測。詳細見第二部分的分析。

這里只使用了spark的基本API,沒有使用mllib的算法。

(一)REPL測試

1、數據準備

下載并解壓至~/Downloads/donation中
https://archive.ics.uci.edu/ml/machine-learning-databases/00210/donation.zip

2、啟動spark

本例先在local模式下運行

bin/spark-shell

或者將文件上傳至hdfs

hadoop fs -put ./donation/ /tmp/

再使用:

bin/spark-shell --master yarn-client

3、讀入數據并簡單驗證讀入情況

scala> val rawblocks = sc.textFile("/Users/liaoliuqing/Downloads/donation") rawblocks: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[1] at textFile at <console>:21scala> rawblocks.first res0: String = "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"scala> rawblocks.take(5).foreach(println) "id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE 39086,47614,1,?,1,?,1,1,1,1,1,TRUE 70031,70237,1,?,1,?,1,1,1,1,1,TRUE 84795,97439,1,?,1,?,1,1,1,1,1,TRUEscala> rawblocks.count res2: Long = 5749142

4、去除標題行

從上面的數據輸出中可以看到第一行是標題行,表明每個列分別是什么意思。但在實際數據分析中,我們并不需要這一行,因此將其刪除。

scala> val noheader = rawblocks.filter(line => !line.contains("id_1")) noheader: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[4] at filter at <console>:23scala> noheader.count res6: Long = 5749132

將行中有”id_1”字段的行去掉,這一般是標題行,當做也可以以其它字段作標準。去除后發現少了10行數據,目錄中剛好有10個文件,每個文件去除第一行,即去除了10行。

5、提取行中的信息

(1)定義缺失值的處理

數據中存在數據丟失,這些數據以?代替,因此要先處理,否則直接調用toDouble會出錯:

def myToDouble(s:String) = {if("?".equals(s)) Double.NaN else s.toDouble }

關于NaN: In computing, NaN, standing for not a number, is a numeric data type value representing an undefined or unrepresentable value, especially in floating-point calculations.

驗證一下上面的方法:

scala> myToDouble("4") res10: Double = 4.0scala>scala> myToDouble("?") res11: Double = NaN

(2)提取行中的字段

def parse(line: String) = {val pieces = line.split(",")val id1 = pieces(0).toIntval id2 = pieces(1).toIntval scores = pieces.slice(2,11).map(myToDouble)val matched = pieces(11).toBoolean(id1,id2,scores,matched) } parse: (line: String)(Int, Int, Array[Double], Boolean)

這個方法將第1、2個字段作為id提供出來,中間9個字段作為double值組成一個array,最后是一個是否match的布爾值,它的返回是:

parse: (line: String)(Int, Int, Array[Double], Boolean)

驗證一下上面的函數:

scala> noheader.take(5).map(parse).foreach(println) (37291,53113,[D@2138bd8c,true) (39086,47614,[D@1424435e,true) (70031,70237,[D@58c2daa6,true) (84795,97439,[D@60a0f5d0,true) (36950,42116,[D@676a5c3f,true)

上面的返回是一個有4個元素的元組。下面我們將其封閉成一個對象返回。

(3)以case類對象的形式返回分析結果

scala> case class MatchData(id1: Int, id2: Int, scores: Array[Double], matched: Boolean) defined class MatchDatadef parse(line: String) = {val pieces = line.split(",")val id1 = pieces(0).toIntval id2 = pieces(1).toIntval scores = pieces.slice(2,11).map(myToDouble)val matched = pieces(11).toBooleanMatchData(id1,id2,scores,matched) }

再看一下返回的結果:

scala> noheader.take(5).map(parse).foreach(println) MatchData(37291,53113,[D@dd278c2,true) MatchData(39086,47614,[D@74f60fa4,true) MatchData(70031,70237,[D@467d13f9,true) MatchData(84795,97439,[D@3daa6496,true) MatchData(36950,42116,[D@7db1d37a,true)

(4)使用parse函數分析數據

scala> val parsed = noheader.map(line => parse(line)) parsed: org.apache.spark.rdd.RDD[MatchData] = MapPartitionsRDD[5] at map at <console>:31scala> parsed.first res15: MatchData = MatchData(37291,53113,[D@4e0d2c7f,true)

OK,現在數據已經提取好了,下面進一步分析。

6、聚合(無效)

將分析好的數據按照matched字段進行聚合

scala> val grouped = parsed.groupBy(md => md.matched) grouped: org.apache.spark.rdd.RDD[(Boolean, Iterable[MatchData])] = ShuffledRDD[7] at groupBy at <console>:33scala> grouped.mapValues(x=>x.size).foreach(println)

7、統計true和false的數量

scala> val matchCount = parsed.map(md => md.matched).countByValue() matchCount: scala.collection.Map[Boolean,Long] = Map(true -> 20931, false -> 5728201)

以下對輸出結果進行排序:

scala> val matchCountsSeq = matchCount.toSeq matchCountsSeq: Seq[(Boolean, Long)] = ArrayBuffer((true,20931), (false,5728201))scala> matchCountsSeq.sortBy(_._1).foreach(println) (false,5728201) (true,20931)scala> matchCountsSeq.sortBy(_._2).foreach(println) (true,20931) (false,5728201)scala> matchCountsSeq.sortBy(_._2).reverse.foreach(println) (false,5728201) (true,20931)

先將對象轉化為Seq類型,然后使用sortBy來排序。reverse可反序。

7、連續變量的概要統計

spark提供了stats對RDD[Double]進行概要信息的統計,它是RDD[Double]的一個隱式動作。

scala> parsed.map(md => md.scores(0)).stats() res12: org.apache.spark.util.StatCounter = (count: 5749132, mean: NaN, stdev: NaN, max: NaN, min: NaN)

由于存在NaN的值,導致計算出錯了,我們將其去除:

scala> import java.lang.Double.isNaN import java.lang.Double.isNaNscala> parsed.map(md => md.scores(0)).filter(!isNaN(_)).stats() res13: org.apache.spark.util.StatCounter = (count: 5748125, mean: 0.712902, stdev: 0.388758, max: 1.000000, min: 0.000000)

只要你愿意,可以對scores中的所有值計算這個概要信息。

val stats = (0 until 9).map(i => {parsed.map(md => md.scores(i)).filter(!isNaN(_)).stats() })

(二)代碼應用

本示例的數據有12列,其中第一、二列為2個id,第3~11是9個數值,這些數值表示這2個id所代表的事物(或者人)在9個屬性上的比較數據,最后一個屬性是一個布爾值,表示這2個id是否同一個事物:

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match" 37291,53113,0.833333333333333,?,1,?,1,1,1,1,0,TRUE 39086,47614,1,?,1,?,1,1,1,1,1,TRUE 70031,70237,1,?,1,?,1,1,1,1,1,TRUE

我們要做的就是分析這9個數據,得出一個模型,以便當提供這9個數據時,判斷這2個id是否同一個事物。

1、case類MatchData

創建一個case類,將每一行數據保存于一個對象中。

case class MatchData(id1: Int, id2: Int,scores: Array[Double], matched: Boolean) case class Scored(md: MatchData, score: Double)

2、載入數據

數據的下載請見第一部分的介紹

val rawblocks = sc.textFile("file:///Users/liaoliuqing/Downloads/donation2")

當然,更常見的是讀取hdfs中的數據。注意,如果全部使用donation中的數據,有可以機器的內存不足,因此刪除數據只剩下2個文件即可(1個也不行,會出錯)。

3、去除標題行

def isHeader(line: String) = line.contains("id_1") val noheader = rawblocks.filter(x => !isHeader(x))

每個文件的第一行都是一個標題行,先將其去除。

4、缺失值的處理

文件記錄中存在大量的?號,表示這個數據缺失了,我們需要將其轉化為NaN,否則直接調用toDouble會出錯

def toDouble(s: String) = {if ("?".equals(s)) Double.NaN else s.toDouble}

5、將每一行解釋為一個MatchData對象

def parse(line: String) = {val pieces = line.split(',')val id1 = pieces(0).toIntval id2 = pieces(1).toIntval scores = pieces.slice(2, 11).map(toDouble)val matched = pieces(11).toBooleanMatchData(id1, id2, scores, matched) }val parsed = noheader.map(line => parse(line)) parsed.cache()

6、統計true和false的數量

val matchCounts = parsed.map(md => md.matched).countByValue()

對結果排序并輸出

val matchCountsSeq = matchCounts.toSeq matchCountsSeq.sortBy(_._2).reverse.foreach(println)

輸出為:

(false,1145640) (true,4186)

即樣本中只4186個是true的,其余都是false的。

7、將9個屬性的基本統計信息輸出

val stats = (0 until 9).map(i => {parsed.map(_.scores(i)).filter(!_.isNaN).stats() }) stats.foreach(println)

輸出結果為:

(count: 1149603, mean: 0.712452, stdev: 0.389030, max: 1.000000, min: 0.000000) (count: 20650, mean: 0.898884, stdev: 0.273071, max: 1.000000, min: 0.000000) (count: 1149826, mean: 0.315906, stdev: 0.334438, max: 1.000000, min: 0.000000) (count: 465, mean: 0.326669, stdev: 0.366702, max: 1.000000, min: 0.000000) (count: 1149826, mean: 0.955133, stdev: 0.207011, max: 1.000000, min: 0.000000) (count: 1149678, mean: 0.225125, stdev: 0.417664, max: 1.000000, min: 0.000000) (count: 1149678, mean: 0.488465, stdev: 0.499867, max: 1.000000, min: 0.000000) (count: 1149678, mean: 0.222706, stdev: 0.416062, max: 1.000000, min: 0.000000) (count: 1147303, mean: 0.005550, stdev: 0.074288, max: 1.000000, min: 0.000000)

stats函數會分析RDD[Double]中的元素,計算數量,平均值,均方差,最大值,最小值等。
其實這一步對下面的分析沒有直接作用,可忽略。

8、定義NAStatCounter

(1)變量

2個變量分別表示缺失值的數量以及一個StatCounter對象,StatCounter包括5個屬性:

private var n: Long = 0 // Running count of our valuesprivate var mu: Double = 0 // Running mean of our valuesprivate var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)private var maxValue: Double = Double.NegativeInfinity // Running max of our valuesprivate var minValue: Double = Double.PositiveInfinity // Running min of our values

即與上面stats()方法的輸出相同。

(2)add方法

定義了2個NAStatCounter對象add時的操作,即如果這個值是NaN的話,則缺失值加1,否則的話就2個NAStatCounter對象執行merge方法。merge方法的定義為:

def merge(value: Double): StatCounter = {val delta = value - mun += 1mu += delta / nm2 += delta * (value - mu)maxValue = math.max(maxValue, value)minValue = math.min(minValue, value)this}

即是如何更新它的幾個數據而已。

(3)toString方法

使得打印時更好的表示內容

(4)apply方法

最后還定義了apply方法,表示創建一個NAStatCounter對象時的操作。

(5)NAStatCounter的完整代碼

class NAStatCounter extends Serializable {val stats: StatCounter = new StatCounter()var missing: Long = 0def add(x: Double): NAStatCounter = {if (x.isNaN) {missing += 1} else {stats.merge(x)}this}def merge(other: NAStatCounter): NAStatCounter = {stats.merge(other.stats)missing += other.missingthis}override def toString: String = {"stats: " + stats.toString + " NaN: " + missing} }object NAStatCounter extends Serializable {def apply(x: Double) = new NAStatCounter().add(x) }

9、計算每個屬性的NAStatCounter

將每個屬性轉化為一個NAStatCounter對象,并輸出

val nasRDD = parsed.map(md => {md.scores.map(d => NAStatCounter(d)) }) val reduced = nasRDD.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) } }) reduced.foreach(println)

其實這一步對最終結果也沒有作用,只用于中間調試。

輸出為:

stats: (count: 1149603, mean: 0.712452, stdev: 0.389030, max: 1.000000, min: 0.000000) NaN: 223 stats: (count: 20650, mean: 0.898884, stdev: 0.273071, max: 1.000000, min: 0.000000) NaN: 1129176 stats: (count: 1149826, mean: 0.315906, stdev: 0.334438, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 465, mean: 0.326669, stdev: 0.366702, max: 1.000000, min: 0.000000) NaN: 1149361 stats: (count: 1149826, mean: 0.955133, stdev: 0.207011, max: 1.000000, min: 0.000000) NaN: 0 stats: (count: 1149678, mean: 0.225125, stdev: 0.417664, max: 1.000000, min: 0.000000) NaN: 148 stats: (count: 1149678, mean: 0.488465, stdev: 0.499867, max: 1.000000, min: 0.000000) NaN: 148 stats: (count: 1149678, mean: 0.222706, stdev: 0.416062, max: 1.000000, min: 0.000000) NaN: 148 stats: (count: 1147303, mean: 0.005550, stdev: 0.074288, max: 1.000000, min: 0.000000) NaN: 2523

10、statsWithMissing

定義statsWithMissing,用于分析缺失值

def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d))iter.foreach(arr => {nas.zip(arr).foreach { case (n, d) => n.add(d) }})Iterator(nas)})nastats.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }})}

11、計算每個屬性的缺失數量及2種分類的平均值差異

val statsm = statsWithMissing(parsed.filter(_.matched).map(_.scores)) val statsn = statsWithMissing(parsed.filter(!_.matched).map(_.scores)) statsm.zip(statsn).map { case(m, n) =>(m.missing + n.missing, m.stats.mean - n.stats.mean) }.foreach(println)

輸出結果:

(223,0.286371147556274) (1129176,0.09237251848914796) (0,0.6840609479157178) (1149361,0.7866299180271783) (0,0.03376179754806352) (148,0.7736308747874063) (148,0.5112459666546485) (148,0.7760586525457857) (2523,0.9562752950948621)

這里可以看出第2,5,6,7,8這5個屬性比較大,即當結果屬于不同類別時,這5個屬性較大。因此我們選取這5個屬性。
下面對結果進行一些分析

12、建立評分模型

我們簡單的將上述5個屬性進行相加,作為評分的標準

def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else d val ct = parsed.map(md => {val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sumScored(md, score) })

最后ct是一個MatchData與score組成的對象的RDD。

13、驗證模型

我們設定了閾值分別為4.0與2.0,然后重新計算true和flase的數量

ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue().foreach(println) ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue().foreach(println)

結果如下:

(false,134) (true,4175) (false,119766) (true,4186)

對比原始數據:

(false,1145640) (true,4186)

* 當閾值為4.0時,即這5個屬性的值加起來大于4.0,我們將絕大部分的true類別選取出來了,同時只有少量的flase類別。
* 當閾值為2.0時,即這5個屬性的值加起來大于2.0,我們將全部的true類別選取出來了,但同時混入了大量的false類別。

因此根據應用情景,如果我們需要盡可能多的true值,即將閾值降低。但如果要同時兼顧true和false這2種類型,則需要將閾值適度提高。

真正應用時,除了訓練數據,應該還要有驗證數據,用驗證數據來檢驗模型的準確率。

完整代碼

先在本機測試,因此設置setMaster(“local[2]”),且目錄為file:///
如果在集群中運行,將setMaster去掉,目錄通過參數傳入一個hdfs的地址。

package com.lujinhong.sparkdemo.ml.basicimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.util.StatCountercase class MatchData(id1: Int, id2: Int,scores: Array[Double], matched: Boolean) case class Scored(md: MatchData, score: Double)object RunIntro extends Serializable {def main(args: Array[String]): Unit = {val sc = new SparkContext(new SparkConf().setAppName("Intro").setMaster("local[2]"))val rawblocks = sc.textFile("file:///Users/liaoliuqing/Downloads/donation2")def isHeader(line: String) = line.contains("id_1")val noheader = rawblocks.filter(x => !isHeader(x))def toDouble(s: String) = {if ("?".equals(s)) Double.NaN else s.toDouble}def parse(line: String) = {val pieces = line.split(',')val id1 = pieces(0).toIntval id2 = pieces(1).toIntval scores = pieces.slice(2, 11).map(toDouble)val matched = pieces(11).toBooleanMatchData(id1, id2, scores, matched)}val parsed = noheader.map(line => parse(line))parsed.cache()val matchCounts = parsed.map(md => md.matched).countByValue()val matchCountsSeq = matchCounts.toSeqmatchCountsSeq.sortBy(_._2).reverse.foreach(println)val stats = (0 until 9).map(i => {parsed.map(_.scores(i)).filter(!_.isNaN).stats()})stats.foreach(println)val nasRDD = parsed.map(md => {md.scores.map(d => NAStatCounter(d))})val reduced = nasRDD.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }})reduced.foreach(println)val statsm = statsWithMissing(parsed.filter(_.matched).map(_.scores))val statsn = statsWithMissing(parsed.filter(!_.matched).map(_.scores))statsm.zip(statsn).map { case(m, n) =>(m.missing + n.missing, m.stats.mean - n.stats.mean)}.foreach(println)def naz(d: Double) = if (Double.NaN.equals(d)) 0.0 else dval ct = parsed.map(md => {val score = Array(2, 5, 6, 7, 8).map(i => naz(md.scores(i))).sumScored(md, score)})ct.filter(s => s.score >= 4.0).map(s => s.md.matched).countByValue().foreach(println)ct.filter(s => s.score >= 2.0).map(s => s.md.matched).countByValue().foreach(println)}def statsWithMissing(rdd: RDD[Array[Double]]): Array[NAStatCounter] = {val nastats = rdd.mapPartitions((iter: Iterator[Array[Double]]) => {val nas: Array[NAStatCounter] = iter.next().map(d => NAStatCounter(d))iter.foreach(arr => {nas.zip(arr).foreach { case (n, d) => n.add(d) }})Iterator(nas)})nastats.reduce((n1, n2) => {n1.zip(n2).map { case (a, b) => a.merge(b) }})} }class NAStatCounter extends Serializable {val stats: StatCounter = new StatCounter()var missing: Long = 0def add(x: Double): NAStatCounter = {if (x.isNaN) {missing += 1} else {stats.merge(x)}this}def merge(other: NAStatCounter): NAStatCounter = {stats.merge(other.stats)missing += other.missingthis}override def toString: String = {"stats: " + stats.toString + " NaN: " + missing} }object NAStatCounter extends Serializable {def apply(x: Double) = new NAStatCounter().add(x) }

總結

以上是生活随笔為你收集整理的spark数据处理示例一:分类的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。