日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark使用总结与分享

發(fā)布時間:2025/3/19 编程问答 19 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark使用总结与分享 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

背景

? ?

使用spark開發(fā)已有幾個月。相比于python/hive,scala/spark學(xué)習(xí)門檻較高。尤其記得剛開時,舉步維艱,進展十分緩慢。不過謝天謝地,這段苦澀(bi)的日子過去了。憶苦思甜,為了避免項目組的其他同學(xué)走彎路,決定總結(jié)和梳理spark的使用經(jīng)驗。

? ?

Spark基礎(chǔ)

? ?

基石RDD

? ?

spark的核心是RDD(彈性分布式數(shù)據(jù)集),一種通用的數(shù)據(jù)抽象,封裝了基礎(chǔ)的數(shù)據(jù)操作,如map,filter,reduce等。RDD提供數(shù)據(jù)共享的抽象,相比其他大數(shù)據(jù)處理框架,如MapReduce,Pegel,DryadLINQ和HIVE等均缺乏此特性,所以RDD更為通用。

? ?

簡要地概括RDD:RDD是一個不可修改的,分布的對象集合。每個RDD由多個分區(qū)組成,每個分區(qū)可以同時在集群中的不同節(jié)點上計算。RDD可以包含Python,Java和Scala中的任意對象。

? ?

Spark生態(tài)圈中應(yīng)用都是基于RDD構(gòu)建(下圖),這一點充分說明RDD的抽象足夠通用,可以描述大多數(shù)應(yīng)用場景。

? ?

RDD操作類型—轉(zhuǎn)換和動作

? ?

RDD的操作主要分兩類:轉(zhuǎn)換(transformation)和動作(action)。兩類函數(shù)的主要區(qū)別是,轉(zhuǎn)換接受RDD并返回RDD,而動作接受RDD但是返回非RDD。轉(zhuǎn)換采用惰性調(diào)用機制,每個RDD記錄父RDD轉(zhuǎn)換的方法,這種調(diào)用鏈表稱之為血緣(lineage);而動作調(diào)用會直接計算。

采用惰性調(diào)用,通過血緣連接的RDD操作可以管道化(pipeline),管道化的操作可以直接在單節(jié)點完成,避免多次轉(zhuǎn)換操作之間數(shù)據(jù)同步的等待

使用血緣串聯(lián)的操作可以保持每次計算相對簡單,而不用擔(dān)心有過多的中間數(shù)據(jù),因為這些血緣操作都管道化了,這樣也保證了邏輯的單一性,而不用像MapReduce那樣,為了竟可能的減少map reduce過程,在單個map reduce中寫入過多復(fù)雜的邏輯。

? ?

? ?

RDD使用模式

? ?

RDD使用具有一般的模式,可以抽象為下面的幾步

  • 加載外部數(shù)據(jù),創(chuàng)建RDD對象
  • 使用轉(zhuǎn)換(如filter),創(chuàng)建新的RDD對象
  • 緩存需要重用的RDD
  • 使用動作(如count),啟動并行計算
  • ? ?

    RDD高效的策略

    ? ?

    Spark官方提供的數(shù)據(jù)是RDD在某些場景下,計算效率是Hadoop的20X。這個數(shù)據(jù)是否有水分,我們先不追究,但是RDD效率高的由一定機制保證的:

  • RDD數(shù)據(jù)只讀,不可修改。如果需要修改數(shù)據(jù),必須從父RDD轉(zhuǎn)換(transformation)到子RDD。所以,在容錯策略中,RDD沒有數(shù)據(jù)冗余,而是通過RDD父子依賴(血緣)關(guān)系進行重算實現(xiàn)容錯。
  • RDD數(shù)據(jù)在內(nèi)存中,多個RDD操作之間,數(shù)據(jù)不用落地到磁盤上,避免不必要的I/O操作。
  • RDD存放的數(shù)據(jù)可以是java對象,所以避免的不必要的對象序列化和反序列化。
  • 總而言之,RDD高效的主要因素是盡量避免不必要的操作和犧牲數(shù)據(jù)的操作精度,用來提高計算效率。

    ? ?

    ? ?

    Spark使用技巧

    ? ?

    RDD基本函數(shù)擴展

    ? ?

    RDD雖然提供了很多函數(shù),但是畢竟還是有限的,有時候需要擴展,自定義新的RDD的函數(shù)。在spark中,可以通過隱式轉(zhuǎn)換,輕松實現(xiàn)對RDD擴展。畫像開發(fā)過程中,平凡的會使用rollup操作(類似HIVE中的rollup),計算多個級別的聚合數(shù)據(jù)。下面是具體實,

    /**

    * 擴展spark rdd,為rdd提供rollup方法

    */

    implicit class RollupRDD[T: ClassTag](rdd: RDD[(Array[String], T)]) extends Serializable {

    ?

    /**

    * 類似Sql中的rollup操作

    *

    * @param aggregate 聚合函數(shù)

    * @param keyPlaceHold key占位符,默認(rèn)采用FaceConf.STAT_SUMMARY

    * @param isCache,確認(rèn)是否緩存數(shù)據(jù)

    * @return 返回聚合后的數(shù)據(jù)

    */

    def rollup[U: ClassTag](

    aggregate: Iterable[T] => U,

    keyPlaceHold: String = FaceConf.STAT_SUMMARY,

    isCache: Boolean = true): RDD[(Array[String], U)] = {

    ?

    if (rdd.take(1).isEmpty) {

    return rdd.map(x => (Array[String](), aggregate(Array[T](x._2))))

    }

    ?

    if (isCache) {

    rdd.cache // 提高計算效率

    }

    val totalKeyCount = rdd.first._1.size

    val result = { 1 to totalKeyCount }.par.map(untilKeyIndex => { // 并行計算

    rdd.map(row => {

    val combineKey = row._1.slice(0, untilKeyIndex).mkString(FaceConf.KEY_SEP) // 組合key

    (combineKey, row._2)

    }).groupByKey.map(row => { // 聚合計算

    val oldKeyList = row._1.split(FaceConf.KEY_SEP)

    val newKeyList = oldKeyList ++ Array.fill(totalKeyCount - oldKeyList.size) { keyPlaceHold }

    (newKeyList, aggregate(row._2))

    })

    }).reduce(_ ++ _) // 聚合結(jié)果

    ?

    result

    }

    ?

    }

    上面代碼聲明了一個隱式類,具有一個成員變量rdd,類型是RDD[(Array[String], T)],那么如果應(yīng)用代碼中出現(xiàn)了任何這樣的rdd對象,并且import當(dāng)前的隱式轉(zhuǎn)換,那么編譯器就會將這個rdd當(dāng)做上面的隱式類的對象,也就可以使用rollup函數(shù),和一般的map,filter方法一樣。

    ? ?

    ? ?

    RDD操作閉包外部變量原則

    ? ?

    RDD相關(guān)操作都需要傳入自定義閉包函數(shù)(closure),如果這個函數(shù)需要訪問外部變量,那么需要遵循一定的規(guī)則,否則會拋出運行時異常。閉包函數(shù)傳入到節(jié)點時,需要經(jīng)過下面的步驟:

  • 驅(qū)動程序,通過反射,運行時找到閉包訪問的所有變量,并封成一個對象,然后序列化該對象
  • 將序列化后的對象通過網(wǎng)絡(luò)傳輸?shù)絯orker節(jié)點
  • worker節(jié)點反序列化閉包對象
  • worker節(jié)點執(zhí)行閉包函數(shù),
  • 注意:外部變量在閉包內(nèi)的修改不會被反饋到驅(qū)動程序。

    簡而言之,就是通過網(wǎng)絡(luò),傳遞函數(shù),然后執(zhí)行。所以,被傳遞的變量必須可以序列化,否則傳遞失敗。本地執(zhí)行時,仍然會執(zhí)行上面四步。

    ? ?

    廣播機制也可以做到這一點,但是頻繁的使用廣播會使代碼不夠簡潔,而且廣播設(shè)計的初衷是將較大數(shù)據(jù)緩存到節(jié)點上,避免多次數(shù)據(jù)傳輸,提高計算效率,而不是用于進行外部變量訪問。

    ? ?

    ? ?

    RDD數(shù)據(jù)同步

    ? ?

    RDD目前提供兩個數(shù)據(jù)同步的方法:廣播和累計器。

    ? ?

    廣播 broadcast

    前面提到過,廣播可以將變量發(fā)送到閉包中,被閉包使用。但是,廣播還有一個作用是同步較大數(shù)據(jù)。比如你有一個IP庫,可能有幾G,在map操作中,依賴這個ip庫。那么,可以通過廣播將這個ip庫傳到閉包中,被并行的任務(wù)應(yīng)用。廣播通過兩個方面提高數(shù)據(jù)共享效率:1,集群中每個節(jié)點(物理機器)只有一個副本,默認(rèn)的閉包是每個任務(wù)一個副本;2,廣播傳輸是通過BT下載模式實現(xiàn)的,也就是P2P下載,在集群多的情況下,可以極大的提高數(shù)據(jù)傳輸速率。廣播變量修改后,不會反饋到其他節(jié)點。

    ? ?

    累加器 Accumulator

    累加器是一個write-only的變量,用于累加各個任務(wù)中的狀態(tài),只有在驅(qū)動程序中,才能訪問累加器。而且,截止到1.2版本,累加器有一個已知的缺陷,在action操作中,n個元素的RDD可以確保累加器只累加n次,但是在transformation時,spark不確保,也就是累加器可能出現(xiàn)n+1次累加。

    ? ?

    目前RDD提供的同步機制粒度太粗,尤其是轉(zhuǎn)換操作中變量狀態(tài)不能同步,所以RDD無法做復(fù)雜的具有狀態(tài)的事務(wù)操作。不過,RDD的使命是提供一個通用的并行計算框架,估計永遠(yuǎn)也不會提供細(xì)粒度的數(shù)據(jù)同步機制,因為這與其設(shè)計的初衷是違背的。

    ? ?

    RDD優(yōu)化技巧

    ? ?

    RDD緩存

    需要使用多次的數(shù)據(jù)需要cache,否則會進行不必要的重復(fù)操作。舉個例子

    val data = … // read from tdw

    println(data.filter(_.contains("error")).count)

    println(data.filter(_.contains("warning")).count)

    上面三段代碼中,data變量會加載兩次,高效的做法是在data加載完后,立刻持久化到內(nèi)存中,如下

    val data = … // read from tdw

    data.cache

    println(data.filter(_.contains("error")).count)

    println(data.filter(_.contains("warning")).count)

    這樣,data在第一加載后,就被緩存到內(nèi)存中,后面兩次操作均直接使用內(nèi)存中的數(shù)據(jù)。

    ? ?

    轉(zhuǎn)換并行化

    RDD的轉(zhuǎn)換操作時并行化計算的,但是多個RDD的轉(zhuǎn)換同樣是可以并行的,參考如下

    val dataList:Array[RDD[Int]] = …

    val sumList = data.list.map(_.map(_.sum))

    上面的例子中,第一個map是便利Array變量,串行的計算每個RDD中的每行的sum。由于每個RDD之間計算是沒有邏輯聯(lián)系的,所以理論上是可以將RDD的計算并行化的,在scala中可以輕松試下,如下

    val dataList:Array[RDD[Int]] = …

    val sumList = data.list.par.map(_.map(_.sum))

    注意紅色代碼。

    ? ?

    減少shuffle網(wǎng)絡(luò)傳輸

    一般而言,網(wǎng)絡(luò)I/O開銷是很大的,減少網(wǎng)絡(luò)開銷,可以顯著加快計算效率。任意兩個RDD的shuffle操作(join等)的大致過程如下,

    用戶數(shù)據(jù)userData和事件events數(shù)據(jù)通過用戶id連接,那么會在網(wǎng)絡(luò)中傳到另外一個節(jié)點,這個過程中,有兩個網(wǎng)絡(luò)傳輸過程。Spark的默認(rèn)是完成這兩個過程。但是,如果你多告訴spark一些信息,spark可以優(yōu)化,只執(zhí)行一個網(wǎng)絡(luò)傳輸。可以通過使用、HashPartition,在userData"本地"先分區(qū),然后要求events直接shuffle到userData的節(jié)點上,那么就減少了一部分網(wǎng)絡(luò)傳輸,減少后的效果如下,

    虛線部分都是在本地完成的,沒有網(wǎng)絡(luò)傳輸。在數(shù)據(jù)加載時,就按照key進行partition,這樣可以經(jīng)一部的減少本地的HashPartition的過程,示例代碼如下

    val userData = sc.sequenceFile[UserID, UserInfo]("hdfs://…")

    .partitionBy(new HashPartitioner(100)) // Create 100 partitions

    .persist()

    注意,上面一定要persist,否則會重復(fù)計算多次。100用來指定并行數(shù)量。

    ? ?

    Spark其他

    ? ?

    Spark開發(fā)模式

    ? ?

    由于spark應(yīng)用程序是需要在部署到集群上運行的,導(dǎo)致本地調(diào)試比較麻煩,所以經(jīng)過這段時間的經(jīng)驗累積,總結(jié)了一套開發(fā)流程,目的是為了盡可能的提高開發(fā)調(diào)試效率,同時保證開發(fā)質(zhì)量。當(dāng)然,這套流程可能也不是最優(yōu)的,后面需要持續(xù)改進。

    整個流程比較清楚,這里主要談?wù)劄槭裁葱枰獑卧獪y試。公司內(nèi)的大多數(shù)項目,一般不提倡單元測試,而且由于項目進度壓力,開發(fā)人員會非常抵觸單元測試,因為會花費"額外"的精力。Bug這東西不會因為項目趕進度而消失,而且恰好相反,可能因為趕進度,而高于平均水平。所以,如果不花時間進行單元測試,那么會花同樣多,甚至更多的時間調(diào)試。很多時候,往往一些很小的bug,卻導(dǎo)致你花了很長時間去調(diào)試,而這些bug,恰好是很容易在單元測試中發(fā)現(xiàn)的。而且,單元測試還可以帶來兩個額外的好處:1)API使用范例;2)回歸測試。所以,還是單元測試吧,這是一筆投資,而且ROI還挺高!不過凡事需要掌握分寸,單元測試應(yīng)該根據(jù)項目緊迫程度調(diào)整粒度,做到有所為,有所不為。

    ?

    Spark其他功能

    ? ?

    前面提到了spark生態(tài)圈,spark除了核心的RDD,還提供了之上的幾個很使用的應(yīng)用:

  • Spark SQL: 類似hive,使用rdd實現(xiàn)sql查詢
  • Spark Streaming: 流式計算,提供實時計算功能,類似storm
  • MLLib:機器學(xué)習(xí)庫,提供常用分類,聚類,回歸,交叉檢驗等機器學(xué)習(xí)算法并行實現(xiàn)。
  • GraphX:圖計算框架,實現(xiàn)了基本的圖計算功能,常用圖算法和pregel圖編程框架。
  • ? ?

    后面需要繼續(xù)學(xué)習(xí)和使用上面的功能,尤其是與數(shù)據(jù)挖掘強相關(guān)的MLLib。

    ? ?

    參考資料

  • An Architecture for Fast and General Data Processing on Large Clusters, by Matei Zaharia
  • Spark官方網(wǎng)站
  • Spark閉包函數(shù)外部變量訪問問題
  • Learning Spark Lightning-Fast Big Data.Analysis
  • 聲明:如有轉(zhuǎn)載本博文章,請注明出處。您的支持是我的動力!文章部分內(nèi)容來自互聯(lián)網(wǎng),本人不負(fù)任何法律責(zé)任。 分類:?數(shù)據(jù)分析&挖掘,大數(shù)據(jù),機器學(xué)習(xí)
    本文轉(zhuǎn)自bourneli博客園博客,原文鏈接:http://www.cnblogs.com/bourneli/p/4394271.html,如需轉(zhuǎn)載請自行聯(lián)系原作者

    總結(jié)

    以上是生活随笔為你收集整理的Spark使用总结与分享的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。