spark总结——转载
?
轉載自: ? ?spark總結??
?
第一個Spark程序
| /** * 功能:用spark實現的單詞計數程序 * 環境:spark 1.6.1, scala 2.10.4 */ // 導入相關類庫 import org.apache.spark._ object WordCount { def main(args: Array[String]) { // 建立spark運行上下文 val sc = new SparkContext("local[3]", "WordCount", new SparkConf()) // 加載數據,創建RDD val inRDD = sc.textFile("words.txt", 3) // 對RDD進行轉換,得到最終結果 val res = inRDD.flatMap(_.split(' ')).map((_, 1)).reduceByKey(_ + _) // 將計算結果collect到driver節點,并打印 res.collect.foreach(println) // 停止spark運行上下文 sc.stop() } } |
關于RDD
彈性分布式數據集(RDD)是分布式處理的一個數據集的抽象,RDD是只讀的,在RDD之上的操作都是并行的。實際上,RDD只是一個邏輯實體,其中存儲了分布式數據集的一些信息,并沒有包含所謂的“物理數據”,“物理數據”只有在RDD被計算并持久化之后才存在于內存或磁盤中。RDD的重要內部屬性有:
- 計算RDD分區的函數。
- 所依賴的直接父RDD列表。
- RDD分區及其地址列表。
- RDD分區器。
- RDD分區優先位置。
RDD操作起來與Scala集合類型沒有太大差別,這就是Spark追求的目標:像編寫單機程序一樣編寫分布式程序,但它們的數據和運行模型有很大的不同,用戶需要具備更強的系統把控能力和分布式系統知識。
Transformation與Action
RDD提供了兩種類型的操作:transformation操作(轉化操作)和action操作(行動操作)。transformation操作是得到一個新的RDD,方式很多,比如從數據源生成一個新的RDD,從RDD生成一個新的RDD。action操作則是得到其他數據類型的結果。
所有的transformation都是采用的懶策略,就是如果只是將transformation提交是不會執行計算的,spark在內部只是用新的RDD記錄這些transformation操作并形成RDD對象的有向無環圖(DAG),計算只有在action被提交的時候才被觸發。實際上,我們不應該把RDD看作存放著特定數據的數據集,而最好把每個RDD當作我們通過transformation操作構建出來的、記錄如何計算數據的指令列表。
RDD的action算子會觸發一個新的job,spark會在DAG中尋找是否有cached或者persisted的中間結果,如果沒有找到,那么就會重新執行這些中間過程以重新計算該RDD。因此,如果想在多個action操作中重用同一個RDD,那么最好使用?cache()/persist()將RDD緩存在內存中,但如果RDD過大,那么最好使用?persist(StorageLevel.MEMORY_AND_DISK)?代替。注意cache/persist僅僅是設置RDD的存儲等級,因此你應該在第一次調用action之前調用cache/persist。cache/persist使得中間計算結果存在內存中,這個才是說為啥Spark是內存計算引擎的地方。在MR里,你是要放到HDFS里的,但Spark允許你把中間結果放內存里。
在spark程序中打印日志時,尤其需要注意打印日志的代碼很有可能使用到了action算子,如果沒有緩存中間RDD就可能導致程序的效率大大降低。另外,如果一個RDD的計算過程中有抽樣、隨機值或者其他形式的變化,那么一定要緩存中間結果,否則程序執行結果可能都是不準確的!
參考鏈接及進一步閱讀:
- Spark會把數據都載入到內存么?
- Using Spark’s cache for correctness, not just performance
RDD持久化(緩存)
正如在轉化和行動操作部分所說的一樣,為了避免在一個RDD上多次調用action操作從而可能導致的重新計算,我們應該將該RDD在第一次調用action之前進行持久化。對RDD進行持久化對于迭代式和交互式應用非常有好處,好處大大滴有。
持久化可以使用cache()或者persist()。默認情況下的緩存級別為MEMORY_ONLY,spark會將對象直接緩存在JVM的堆空間中,而不經過序列化處理。我們可以給persist()傳遞持久化級別參數以指定的方式持久化RDD。MEMORY_AND_DISK持久化級別盡量將RDD緩存在內存中,如果內存緩存不下了,就將剩余分區緩存在磁盤中。MEMORY_ONLY_SER將RDD進行序列化處理(每個分區序列化為一個字節數組)然后緩存在內存中。還有MEMORY_AND_DISK_SER等等很多選項。選擇持久化級別的原則是:盡量選擇緩存在內存中,如果內存不夠,則首選序列化內存方式,除非RDD分區重算開銷比緩存到磁盤來的更大(很多時候,重算RDD分區會比從磁盤中讀取要快)或者序列化之后內存還是不夠用,否則不推薦緩存到磁盤上。
如果要緩存的數據太多,內存中放不下,spark會自動利用最近最少使用(LRU)策略把最老的分區從內存中移除。對于僅放在內存中的緩存級別,下次要用到已被移除的分區時,這些分區就需要重新計算。對于使用內存與磁盤的緩存級別,被移除的分區都會被寫入磁盤。
另外,RDD還有一個unpersist()方法,用于手動把持久化的RDD從緩存中移除。
環境變量SPARK_LOCAL_DIRS用來設置RDD持久化到磁盤的目錄,它同時也是shuffle的緩存目錄。
各種RDD與RDD操作
基本RDD
抽象類RDD包含了各種數據類型的RDD都適用的通用操作。下面對基本類型RDD的操作進行分門別類地介紹。
針對各個元素的轉化操作:
- map: 對各個元素進行映射操作。
- flatMap: 對各個元素進行映射操作,并將最后結果展平。
- filter: 過濾不滿足條件的元素。filter操作可能會引起數據傾斜,甚至可能導致空分區,新形成的RDD將會包含這些可能生成的空分區。所有這些都可能會導致問題,要想解決它們,最好在filter之后重新分區。
偽集合操作:
盡管RDD不是嚴格意義上的集合,但它支持許多數學上的集合操作。注意:這些操作都要求操作的RDD是相同的數據類型的。
- distinct: 對RDD中的元素進行去重處理。需要注意的是,distinct操作開銷很大,因為它需要shuffle所有數據,以確保每一個元素都只有一份。
- union: 返回一個包含兩個或多個RDD中所有元素的RDD。spark的union并不會去重,這點與數學上的不同。
- intersection: 返回兩個RDD中都有的元素。intersection會在運行時除去所有重復的元素,因此它也需要shuffle,性能要差一些。
- subtract: 返回一個由只存在于第一個RDD中而不存在于第二個RDD中的所有元素組成的RDD。它也需要shuffle。
- cartesian: 計算兩個RDD的笛卡爾積。需要注意的是,求大規模RDD的笛卡爾積開銷巨大。
- sample: 對RDD進行采樣,返回一個采樣RDD。
基于分區的轉化操作:
- glom: 將每個分區中的所有元素都形成一個數組。如果在處理當前元素時需要使用前后的元素,該操作將會非常有用,不過有時我們可能還需要將分區邊界的數據收集起來并廣播到各節點以備使用。
- mapPartitions: 基于分區的map,spark會為操作分區的函數該分區的元素的迭代器。
- mapPartitionsWithIndex: 與mapPartitions不同之處在于帶有分區的序號。
管道(pipe)操作:
spark在RDD上提供了pipe()方法。通過pipe(),你可以使用任意語言將RDD中的各元素從標準輸入流中以字符串形式讀出,并將這些元素執行任何你需要的操作,然后把結果以字符串形式寫入標準輸出,這個過程就是RDD的轉化操作過程。
使用pipe()的方法很簡單,假如我們有一個用其他語言寫成的從標準輸入接收數據并將處理結果寫入標準輸出的可執行腳本,我們只需要將該腳本分發到各個節點相同路徑下,并將其路徑作為pipe()的參數傳入即可。
行動操作:
- foreach: 對每個元素進行操作,并不會返回結果。
- foreachPartition: 基于分區的foreach操作,操作分區元素的迭代器,并不會返回結果。
- reduce: 對RDD中所有元素進行規約,最終得到一個規約結果。reduce接收的規約函數要求其返回值類型與RDD中元素類型相同。
- fold: 與reduce類似,不同的是,它接受一個“初始值”來作為每個分區第一次調用時的結果。fold同樣要求規約函數返回值類型與RDD元素類型相同。
- aggregate: 與reduce和fold類似,但它把我們從返回值類型必須與所操作的RDD元素類型相同的限制中解放出來。
- count: 返回RDD元素個數。
- collect: 收集RDD的元素到driver節點,如果數據有序,那么collect得到的數據也會是有序的。大數據量最好不要使用RDD的collect,因為它會在本機上生成一個新的Array,以存儲來自各個節點的所有數據,此時更好的辦法是將數據存儲在HDFS等分布式持久化層上。
- take: 返回指定數量的元素到driver節點。它會嘗試只訪問盡量少的分區,因此該操作會得到一個不均衡的集合。需要注意的是,該操作返回元素的順序與你預期的可能不一樣。
- top: 如果為元素定義了順序,就可以使用top返回前幾個元素。
- takeSample: 返回采樣數據。
鍵值對RDD
PairRDDFunctions封裝了用于操作鍵值對RDD的一些功能函數。一些文件讀取操作(sc.sequenceFile()等)會直接返回RDD[(K, V)]類型。在RDD上使用map操作也可以將一個RDD轉換為RDD[(K, V)]類型。在用Scala書寫的Spark程序中,RDD[(K, V)]類型到PairRDDFunctions類型的轉換一般由隱式轉換函數完成。
基本類型RDD的操作同樣適用于鍵值對RDD。下面對鍵值對類型RDD特有的操作進行分門別類地介紹。
針對各個元素的轉化操作:
- mapValues: 對各個鍵值對的值進行映射。該操作會保留RDD的分區信息。
- flatMapValues: 對各個鍵值對的值進行映射,并將最后結果展平。該操作會保留RDD的分區信息。
聚合操作:
- reduceByKey: 與reduce相當類似,它們都接收一個函數,并使用該函數對值進行合并。不同的是,reduceByKey是transformation操作,reduceByKey只是對鍵相同的值進行規約,并最終形成RDD[(K, V)],而不像reduce那樣返回單獨一個“值”。
- foldByKey: 與fold類似,就像reduceByKey之于reduce那樣。熟悉MapReduce中的合并器(combiner)概念的你可能已經注意到,reduceByKey和foldByKey會在為每個鍵計算全局的總結果之前先自動在每臺機器上進行本地合并。用戶不需要指定合并器。更泛化的combineByKey可以讓你自定義合并的行為。
- combineByKey: 是最常用的基于鍵進行聚合的函數,大多數基于鍵聚合的函數都是用它實現的。與aggregate一樣,combineByKey可以讓用戶返回與輸入數據的類型不同的返回值。combineByKey的內部實現分為三步來完成:首先根據是否需要在map端進行combine操作決定是否對RDD先進行一次mapPartitions操作(利用createCombiner、mergeValue、mergeCombiners三個函數)來達到減少shuffle數據量的作用。第二步根據partitioner對MapPartitionsRDD進行shuffle操作。最后在reduce端對shuffle的結果再進行一次combine操作。
數據分組:
- groupBy: 根據自定義的東東進行分組。groupBy是基本RDD就有的操作。
- groupByKey: 根據鍵對數據進行分組。雖然groupByKey+reduce也可以實現reduceByKey一樣的效果,但是請你記住:groupByKey是低效的,而reduceByKey會在本地先進行聚合,然后再通過網絡傳輸求得最終結果。
在執行聚合或分組操作時,可以指定分區數以對并行度進行調優。
連接:
- cogroup: 可以對多個RDD進行連接、分組、甚至求鍵的交集。其他的連接操作都是基于cogroup實現的。
- join: 對數據進行內連接,也即當兩個鍵值對RDD中都存在對應鍵時才輸出。當一個輸入對應的某個鍵有多個值時,生成的鍵值對RDD會包含來自兩個輸入RDD的每一組相對應的記錄,也即笛卡爾積。
- leftOuterJoin: 即左外連接,源RDD的每一個鍵都有對應的記錄,第二個RDD的值可能缺失,因此用Option表示。
- rightOuterJoin: 即右外連接,與左外連接相反。
- fullOuterJoin: 即全外連接,它是是左右外連接的并集。
如果一個RDD需要在多次連接操作中使用,對該RDD分區并持久化分區后的RDD是有益的,它可以避免不必要的shuffle。
數據排序:
在基本類型RDD中,sortBy()可以用來排序,max()和min()則可以用來方便地獲取最大值和最小值。另外,在OrderedRDDFunctions中,存在一個sortByKey()可以方便地對鍵值對RDD進行排序,通過spark提供的隱式轉換函數可以將RDD自動地轉換為OrderedRDDFunctions,并隨意地使用它的排序功能。
行動操作:
鍵值對RDD提供了一些額外的行動操作供我們隨意使用。如下:
- countByKey: 對每個鍵對應的元素分別計數。
- collectAsMap: 將結果以Map的形式返回,以便查詢。
- lookup: 返回給定鍵對應的所有值。
數值RDD
DoubleRDDFunctions為包含數值數據的RDD提供了一些描述性的統計操作,RDD可以通過隱式轉換方便地使用這些方便的功能。
這些數值操作是通過流式算法實現的,允許以每次一個元素的方式構建出模型。這些統計數據都會在調用stats()時通過一次遍歷數據計算出來,并以StatCounter對象返回。如果你只想計算這些統計數據中的一個,也可以直接對RDD調用對應的方法。更多信息參見Spark API。
RDD依賴、窄寬依賴
RDD依賴與DAG
一系列轉化操作形成RDD的有向無環圖(DAG),行動操作觸發作業的提交與執行。每個RDD維護了其對直接父RDD(一個或多個)的依賴,其中包含了父RDD的引用和依賴類型信息,通過dependencies()我們可以獲取對應RDD的依賴,其返回一個依賴列表。
通過RDD的父RDD引用就可以從DAG上向前回溯找到其所有的祖先RDD。spark提供了toDebugString方法來查看RDD的譜系。對于如下一段簡單的代碼:
| val input = sc.parallelize(1 to 10) val repartitioned = input.repartition(2) val sum = repartitioned.sum |
我們就可以通過在RDD上調用toDebugString來查看其依賴以及轉化關系,結果如下:
| // input.toDebugString res0: String = (4) ParallelCollectionRDD[0] at parallelize at <console>:21 [] // repartitioned.toDebugString res1: String = (2) MapPartitionsRDD[4] at repartition at <console>:23 [] | CoalescedRDD[3] at repartition at <console>:23 [] | ShuffledRDD[2] at repartition at <console>:23 [] +-(4) MapPartitionsRDD[1] at repartition at <console>:23 [] | ParallelCollectionRDD[0] at parallelize at <console>:21 [] |
上述repartitioned的依賴鏈存在兩個縮進等級。同一縮進等級的轉化操作構成一個Stage(階段),它們不需要混洗(shuffle)數據,并可以流水線執行(pipelining)。
窄依賴和寬依賴
spark中RDD之間的依賴分為窄(Narrow)依賴和寬(Wide)依賴兩種。我們先放出一張示意圖:
窄依賴指父RDD的每一個分區最多被一個子RDD的分區所用,表現為一個父RDD的分區對應于一個子RDD的分區,或多個父RDD的分區對應于一個子RDD的分區。圖中,map/filter和union屬于第一類,對輸入進行協同劃分(co-partitioned)的join屬于第二類。
寬依賴指子RDD的分區依賴于父RDD的多個或所有分區,這是因為shuffle類操作,如圖中的groupByKey和未經協同劃分的join。
窄依賴對優化很有利。邏輯上,每個RDD的算子都是一個fork/join(此join非上文的join算子,而是指同步多個并行任務的barrier(路障)): 把計算fork到每個分區,算完后join,然后fork/join下一個RDD的算子。如果直接翻譯到物理實現,是很不經濟的:一是每一個RDD(即使 是中間結果)都需要物化到內存或存儲中,費時費空間;二是join作為全局的barrier,是很昂貴的,會被最慢的那個節點拖死。如果子RDD的分區到父RDD的分區是窄依賴,就可以實施經典的fusion優化,把兩個fork/join合為一個;如果連續的變換算子序列都是窄依賴,就可以把很多個fork/join并為一個,不但減少了大量的全局barrier,而且無需物化很多中間結果RDD,這將極大地提升性能。Spark把這個叫做流水線(pipeline)優化。關于流水線優化,從MapPartitionsRDD中compute()的實現就可以看出端倪,該compute方法只是對迭代器進行復合,復合就是嵌套,因此數據處理過程就是對每條記錄進行同樣的嵌套處理直接得出所需結果,而沒有中間計算結果,同時也要注意:依賴過長將導致嵌套過深,從而可能導致棧溢出。
轉換算子序列一碰上shuffle類操作,寬依賴就發生了,流水線優化終止。在具體實現 中,DAGScheduler從當前算子往前回溯依賴圖,一碰到寬依賴,就生成一個stage來容納已遍歷的算子序列。在這個stage里,可以安全地實施流水線優化。然后,又從那個寬依賴開始繼續回溯,生成下一個stage。
另外,寬窄依賴的劃分對spark的容錯也具有重要作用,參見本文容錯機制部分。
DAG到任務的劃分
用戶代碼定義RDD的有向無環圖,行動操作把DAG轉譯為執行計劃,進一步生成任務在集群中調度執行。
具體地說,RDD的一系列轉化操作形成RDD的DAG,在RDD上調用行動操作將觸發一個Job(作業)的運行,Job根據DAG中RDD之間的依賴關系(寬依賴/窄依賴,也即是否發生shuffle)的不同將DAG劃分為多個Stage(階段),一個Stage對應DAG中的一個或多個RDD,一個Stage對應多個RDD是因為發生了流水線執行(pipelining),一旦Stage劃分出來,Task(任務)就會被創建出來并發給內部的調度器,進而分發到各個executor執行,一個Stage會啟動很多Task,每個Task都是在不同的數據分區上做同樣的事情(即執行同樣的代碼段),Stage是按照依賴順序處理的,而Task則是獨立地啟動來計算出RDD的一部分,一旦Job的最后一個Stage結束,一個行動操作也就執行完畢了。
Stage分為兩種:ShuffleMapStage和ResultStage。ShuffleMapStage是非最終stage,后面還有其他的stage,所以它的輸出一定是需要shuffle并作為后續stage的輸入。ShuffleMapStage的最后Task就是ShuffleMapTask。ResultStage是一個Job的最后一個Stage,直接生成結果或存儲。ResultStage的最后Task就是ResultTask。一個Job含有一個或多個Stage,最后一個為ResultTask,其他都為ShuffleMapStage。
RDD不能嵌套
RDD嵌套是不被支持的,也即不能在一個RDD操作的內部再使用RDD。如果在一個RDD的操作中,需要訪問另一個RDD的內容,你可以嘗試join操作,或者將數據量較小的那個RDD廣播(broadcast)出去。
你同時也應該注意到:join操作可能是低效的,將其中一個較小的RDD廣播出去然后再join可以避免不必要的shuffle,俗稱“小表廣播”。
使用其他分區數據
由于RDD不能嵌套,這使得“在計算一個分區時,訪問另一個分區的數據”成為一件困難的事情。那么有什么好的解決辦法嗎?請繼續看。
spark依賴于RDD這種抽象模型進行粗粒度的并行計算,一般情況下每個節點的每次計算都是針對單一記錄,當然也可以使用 RDD.mapPartition 來對分區進行處理,但都限制在一個分區內(當然更是一個節點內)。
spark的worker節點相互之間不能直接進行通信,如果在一個節點的計算中需要使用到另一個分區的數據,那么還是有一定的困難的。
你可以將整個RDD的數據全部廣播(如果數據集很大,這可不是好辦法),或者廣播一些其他輔助信息;也可以從所有節點均可以訪問到的文件(hdfs文件)或者數據庫(關系型數據庫或者hbase)中讀取;更進一步或許你應該修改你的并行方案,使之滿足“可針對拆分得到的小數據塊進行并行的獨立的計算,然后歸并得到大數據塊的計算結果”的MapReduce準則,在“劃分大的數據,并行獨立計算,歸并得到結果”的過程中可能存在數據冗余之類的,但它可以解決一次性沒法計算的大數據,并最終提高計算效率,hadoop和spark都依賴于MapReduce準則。
對RDD進行分區
何時進行分區?
spark程序可以通過控制RDD分區方式來減少通信開銷。分區并不是對所有應用都是有好處的,如果給定RDD只需要被掃描一次,我們完全沒有必要對其預先進行分區處理。只有當數據集多次在諸如連接這種基于鍵的操作中使用時,分區才會有幫助,同時記得將分區得到的新RDD持久化哦。
更多的分區意味著更多的并行任務(Task)數。對于shuffle過程,如果分區中數據量過大可能會引起OOM,這時可以將RDD劃分為更多的分區,這同時也將導致更多的并行任務。spark通過線程池的方式復用executor JVM進程,每個Task作為一個線程存在于線程池中,這樣就減少了線程的啟動開銷,可以高效地支持單個executor內的多任務執行,這樣你就可以放心地將任務數量設置成比該應用分配到的CPU cores還要多的數量了。
如何分區與分區信息
在創建RDD的時候,可以指定分區的個數,如果沒有指定,則分區個數是系統默認值,即該程序所分配到的CPU核心數。在Java/Scala中,你可以使用rdd.getNumPartitions(1.6.0+)或rdd.partitions.size()來獲取分區個數。
對基本類型RDD進行重新分區,可以通過repartition()函數,只需要指定重分區的分區數即可。repartition操作會引起shuffle,因此spark提供了一個優化版的repartition,叫做coalesce(),它允許你指定是否需要shuffle。在使用coalesce時,需要注意以下幾個問題:
- coalesce默認shuffle為false,這將形成窄依賴,例如我們將1000個分區重新分到100個中時,并不會引起shuffle,而是原來的10個分區合并形成1個分區。
- 但是對于從很多個(比如1000個)分區重新分到很少的(比如1個)分區這種極端情況,數據將會分布到很少節點(對于從1000到1的重新分區,則是1個節點)上運行,完全無法開掘集群的并行能力,為了規避這個問題,可以設置shuffle為true。由于shuffle可以分隔stage,這就保證了上一階段stage中的任務仍是很多個分區在并行計算,不這樣設置的話,則兩個上下游的任務將合并成一個stage進行計算,這個stage便會在很少的分區中進行計算。
- 如果當前每個分區的數據量過大,需要將分區數量增加,以利于充分利用并行,這時我們可以設置shuffle為true。對于數據分布不均而需要重分區的情況也是如此。spark默認使用hash分區器將數據重新分區。
對RDD進行預置的hash分區,需將RDD轉換為RDD[(key,value)]類型,然后就可以通過隱式轉換為PairRDDFunctions,進而可以通過如下形式將RDD哈希分區,HashPartitioner會根據RDD中每個(key,value)中的key得出該記錄對應的新的分區號:
| PairRDDFunctions.partitionBy(new HashPartitioner(n)) |
另外,spark還提供了一個范圍分區器,叫做RangePartitioner。范圍分區器爭取將所有的分區盡可能分配得到相同多的數據,并且所有分區內數據的上界是有序的。
一個RDD可能存在分區器也可能沒有,我們可以通過RDD的partitioner屬性來獲取其分區器,它返回一個Option對象。
如何進行自定義分區
spark允許你通過提供一個自定義的Partitioner對象來控制RDD的分區方式,這可以讓你利用領域知識進一步減少通信開銷。
要實現自定義的分區器,你需要繼承Partitioner類,并實現下面三個方法即可:
- numPartitions: 返回創建出來的分區數。
- getPartition: 返回給定鍵的分區編號(0到numPartitions-1)。
- equals: Java判斷相等性的標準方法。這個方法的實現非常重要,spark需要用這個方法來檢查你的分區器對象是否和其他分區器實例相同,這樣spark才可以判斷兩個RDD的分區方式是否相同。
影響分區方式的操作
spark內部知道各操作會如何影響分區方式,并將會對數據進行分區的操作的結果RDD自動設置為對應的分區器。
不過轉化操作的結果并不一定會按照已知的分區方式分區,這時輸出的RDD可能就會丟失分區信息。例如,由于map()或flatMap()函數理論上可以改變元素的鍵,因此當你對一個哈希分區的鍵值對RDD調用map/flatMap時,結果RDD就不會再有分區方式信息。不過,spark提供了另外兩個操作mapValues()和flatMapValues()作為替代方法,它們可以保證每個二元組的鍵保持不變。
這里列出了所有會為生成的結果RDD設好分區方式的操作:cogroup()、?join()、?leftOuterJoin()、?rightOuterJoin()、?fullOuterJoin()、groupWith()、?groupByKey()、?reduceByKey()、combineByKey()、?partitionBy()、?sortBy()、?sortByKey()、?mapValues()(如果父RDD有分區方式的話)、?flatMapValues()(如果父RDD有分區方式的話)、?filter()(如果父RDD有分區方式的話) 等。其他所有操作生成的結果都不會存在特定的分區方式。
最后,對于二元操作,輸出數據的分區方式取決于父RDD的分區方式。默認情況下,結果會采用哈希分區,分區的數量和操作的并行度一樣。不過,如果其中一個父RDD已經設置過分區方式,那么結果就會采用那種分區方式;如果兩個父RDD都設置過分區方式,結果RDD會采用第一個父RDD的分區方式。
從分區中獲益的操作
spark的許多操作都引入了將數據根據鍵跨節點進行shuffle的過程。所有這些操作都會從數據分區中獲益。這些操作主要有:cogroup()、?join()、?leftOuterJoin()、?rightOuterJoin()、fullOuterJoin()、?groupWith()、?groupByKey()、?reduceByKey()、?combineByKey()、?lookup()等。
RDD分區優先位置
RDD分區優先位置與spark的調度有關,在spark進行任務調度的時候,會盡可能將任務分配到數據塊所存儲的節點。我們可以通過RDD的preferredLocations()來獲取指定分區的優先位置,返回值是該分區的優先位置列表。
數據加載與保存
從程序中的集合生成
sc.parallelize()可用于從程序中的集合產生RDD。sc.makeRDD()也是在程序中生成RDD,不過其還允許指定每一個RDD分區的優先位置。
以上這些方式一般用于原型開發和測試,因為它們需要把你的整個數據集先放在一臺機器(driver節點)的內存中,從而限制了只能用較小的數據量。
從文本文件加載數據
sc.textFile()默認從hdfs中讀取文件,在路徑前面加上hdfs://可顯式表示從hdfs中讀取文件,在路徑前面加上file://表示從本地文件系統讀。給sc.textFile()傳遞的文件路徑可以是如下幾種情形:
- 一個文件路徑,這時候只裝載指定的文件。
- 一個目錄路徑,這時候只裝載指定目錄下面的所有文件(不包括子目錄下面的文件)。
- 通過通配符的形式加載多個文件或者加載多個目錄下面的所有文件。
如果想一次性讀取一個目錄下面的多個文件并想知道數據來自哪個文件,可以使用sc.wholeTextFiles。它會返回一個鍵值對RDD,其中鍵是輸入文件的文件名。由于該函數會將一個文件作為RDD的一個元素進行讀取,因此所讀取的文件不能太大,以便其可以在一個機器上裝得下。
同其他transform算子一樣,文本讀取操作也是惰性的并由action算子觸發,如果發生重新計算,那么讀取數據的操作也可能會被再次執行。另外,在spark中超出內存大小的文件同樣是可以被處理的,因為spark并不是將數據一次性全部裝入內存,而是邊裝入邊計算。
從數據庫加載數據
spark中可以使用JdbcRDD從數據庫中加載數據。spark會將數據從數據庫中拷貝到集群各個節點,因此使用JdbcRDD會有初始的拷貝數據的開銷。也可以考慮使用sqoop將數據從數據庫中遷移到hdfs中,然后從hdfs中讀取數據。
將結果寫入文本文件
rdd.saveAsTextFile()用于將RDD寫入文本文件。spark會將傳入該函數的路徑參數作為目錄對待,默認情況下會在對應目錄輸出多個文件,這取決于并行度。如果要將結果寫入hdfs的一個文件中,可以這樣:
| rdd.coalesce(1).saveAsTextFile("filename") |
而不要使用repartition,因為repartition會引起shuffle,而coalesce在默認情況下會避免shuffle。
關于文件系統
spark支持讀寫很多文件系統,包括本地文件系統、HDFS、Amazon S3等等很多。
spark在本地文件系統中讀取文件時,它要求文件在集群中所有節點的相同路徑下都可以找到。我們可以通過sc.addFile()來將文件弄到所有節點同路徑下面,并在各計算節點中通過SparkFiles.get()來獲取對應文件在該節點上的絕對路徑。
sc.addFile()的輸入文件路徑不僅可以是本地文件系統的,還可以是HDFS等spark所支持的所有文件系統,甚至還可以是來自網絡的,如HTTP、HTTPS、FTP。
關于并行
慎用可變數據
當可變數據用于并發/并行/分布式程序時,都有可能出現問題,因此對于會并發執行的代碼段不要使用可變數據。
尤其要注意不要在scala的object中使用var變量!其實scala的object單例對象只是對java中靜態的一種封裝而已,在class文件層面,object單例對象就是用java中靜態(static)來實現的,而java靜態成員變量不會被序列化!在編寫并行計算程序時,不要在scala的object中使用var變量,如果確實需要使用var變量,請寫在class中。
另外,在分布式執行的spark代碼段中使用可變的閉包變量也可能會出現不同步問題,因此請謹慎使用。
閉包 vs 廣播變量
有兩種方式將你的數據從driver節點發送到worker節點:通過閉包和通過廣播變量。閉包是隨著task的組裝和分發自動進行的,而廣播變量則是需要程序猿手動操作的,具體地可以通過如下方式操作廣播變量(假設sc為SparkContext類型的對象,bc為Broadcast類型的對象):
- 可通過sc.broadcast(xxx)創建廣播變量。
- 可在各計算節點中(閉包代碼中)通過bc.value來引用廣播的數據。
- bc.unpersist()可將各executor中緩存的廣播變量刪除,后續再使用時數據將被重新發送。
- bc.destroy()可將廣播變量的數據和元數據一同銷毀,銷毀之后就不能再使用了。
任務閉包包含了任務所需要的代碼和數據,如果一個executor數量小于RDD partition的數量,那么每個executor就會得到多個同樣的任務閉包,這通常是低效的。而廣播變量則只會將數據發送到每個executor一次,并且可以在多個計算操作中共享該廣播變量,而且廣播變量使用了類似于p2p形式的非常高效的廣播算法,大大提高了效率。另外,廣播變量由spark存儲管理模塊進行管理,并以MEMORY_AND_DISK級別進行持久化存儲。
什么時候用閉包自動分發數據?情況有幾種:
- 數據比較小的時候。
- 數據已在driver程序中可用。典型用例是常量或者配置參數。
什么時候用廣播變量分發數據?情況有幾種:
- 數據比較大的時候(實際上,spark支持非常大的廣播變量,甚至廣播變量中的元素數超過java/scala中Array的最大長度限制(2G,約21.5億)都是可以的)。
- 數據是某種分布式計算結果。典型用例是訓練模型等中間計算結果。
當數據或者變量很小的時候,我們可以在Spark程序中直接使用它們,而無需使用廣播變量。
對于大的廣播變量,序列化優化可以大大提高網絡傳輸效率,參見本文序列化優化部分。
巧用累加器
累加器提供了將工作節點中的值聚合到驅動器程序中的簡單語法。累加器的一個常見用途是在調試時對作業執行過程中的事件進行計數。可以通過sc.accumulator(xxx)來創建一個累加器,并在各計算節點中(閉包代碼中)直接寫該累加器。
累加器只能在驅動程序中被讀取,對于計算節點(閉包代碼)是只寫的,這大大精簡了累加器的設計。
使用累加器時,我們要注意的是:對于在RDD轉化操作中使用的累加器,如果發生了重新計算(這可能在很多種情況下發生),那么累加器就會被重復更新,這會導致問題。而在行動操作(如foreach)中使用累加器卻不會出現這種情況。因此,在轉化操作中,累加器通常只用于調試目的。盡管將來版本的spark可能會改善這一問題,但在spark 1.2.0中確實存在這個問題。
關于shuffle
在經典的MapReduce中,shuffle(混洗)是連接map階段和reduce階段的橋梁(注意這里的術語跟spark的map和reduce操作沒有直接關系),它是將各個map的輸出結果重新組合作為下階段各個reduce的輸入這樣的一個過程,由于這一過程涉及各個節點相互之間的數據傳輸,故此而名“混洗”。下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle階段是介于map階段和reduce階段之間。
Spark的shuffle過程類似于經典的MapReduce,但是有所改進。spark中的shuffle在實現上主要分為shuffle write和shuffle fetch這兩個大的階段。如下圖所示,shuffle過程大致可以描述為:
- 首先每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M×R,其中M是Map的個數,R是Reduce的個數。
- 其次Mapper產生的結果會根據設置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據key哈希到不同的bucket中去。
- 當Reducer啟動時,它會根據自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket作為Reducer的輸入進行處理。
spark的shuffle實現隨著spark版本的迭代正在逐步完善和成熟,這中間曾出現過多種優化實現,關于spark shuffle的演進過程和具體實現參見后面的參考鏈接。
shuffle(具體地是shuffle write階段)會引起數據緩存到本地磁盤文件,從spark 1.3開始,這些緩存的shuffle文件只有在相應RDD不再被使用時才會被清除,這樣在lineage重算的時候shuffle文件就不需要重新創建了,從而加快了重算效率(請注意這里的緩存并保留shuffle數據這一行為與RDD持久化和檢查點機制是不同的,緩存并保留shuffle數據只是省去了重算時重建shuffle文件的開銷,因此我們才有理由在shuffle(寬依賴)之后對形成的RDD進行持久化)。在standalone模式下,我們可以在spark-env.sh中通過環境變量SPARK_LOCAL_DIRS來設置shuffle數據的本地磁盤緩存目錄。為了優化效率,本地shuffle緩存目錄的設置都應該使用由單個逗號隔開的目錄列表,并且這些目錄分布在不同的磁盤上,寫操作會被均衡地分配到所有提供的目錄中,磁盤越多,可以提供的總吞吐量就越高。另外,SPARK_LOCAL_DIRS也是RDD持久化到磁盤的目錄。
參考鏈接及進一步閱讀:
- 詳細探究Spark的shuffle實現
- Spark Shuffle operations
序列化優化
在spark中,序列化通常出現在跨節點的數據傳輸(如廣播變量、shuffle等)和數據持久化過程中。序列化和反序列化的速度、序列化之后數據大小等都影響著集群的計算效率。
spark默認使用Java序列化庫,它對于除基本類型的數組以外的任何對象都比較低效。為了優化序列化效率,你可以在spark配置文件中通過spark.serializer屬性來設置你想使用的序列化庫,一般情況下,你可以使用這個序列化庫:org.apache.spark.serializer.KryoSerializer。
為了獲得最佳性能,你還應該向Kryo注冊你想要序列化的類,注冊類可以讓Kryo避免把每個對象的完整類名寫下來,成千上萬條記錄累計節省的空間相當可觀。如果你想強制要求這種注冊,可以把spark.kryo.registrationRequired設置為true,這樣Kryo會在遇到未注冊的類時拋出錯誤。使用Kryo序列化庫并注冊所需類的示例如下:
| val conf = new SparkConf() conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrationRequired", "true") conf.registerKryoClasses(Array(classOf[MyClass], classOf[MyOtherClass])) |
Spark調度
應用調度
應用是指用戶提交的spark應用程序。spark應用程序之間的調度關系,不一定由spark所管理。
在YARN和Mesos模式下,底層資源的調度策略由YARN和Mesos集群資源管理器所決定。
只有在standalone模式下,spark master按照當前集群資源是否滿足等待列表中的spark應用對資源的需求,而決定是否創建一個SparkContext對應的driver,進而完成spark應用的啟動過程,這個過程可以粗略地認為是一種粗顆粒度的有條件的FIFO(先進先出)調度策略。
作業調度
作業是指spark應用程序內部的由action算子觸發并提交的Job。在給定的spark應用中,不同線程的多個job可以并發執行,并且這個調度是線程安全的,這使得一個spark應用可以處理多個請求。
默認地,spark作業調度是FIFO的,在多線程的情況下,某些線程提交的job可能被大大推遲執行。
不過我們可以通過配置FAIR(公平)調度器來使spark在作業之間輪詢調度,這樣所有的作業都能得到一個大致公平的共享的集群資源。這就意味著即使有一個很長的作業在運行,較短的作業在提交之后也能夠得到不錯的響應。要啟用一個FAIR作業調度,需在創建SparkContext之前配置一下spark.scheduler.mode為FAIR:
| // 假設conf是你的SparkConf變量 conf.set("spark.scheduler.mode", "FAIR") |
公平調度還支持在池中將工作分組(這樣就形成兩級調度池),而不同的池可以設置不同的調度選項(如權重)。這種方式允許更重要的job配置在高優先級池中優先調度。如果沒有設置,新提交的job將進入默認池中,我們可以通過在對應線程中給SparkContext設置本地屬性spark.scheduler.pool來設置該線程對應的pool:
| // 假設sc是你的SparkContext變量 sc.setLocalProperty("spark.scheduler.pool", "pool1") |
在設置了本地屬性之后,所有在這個線程中提交的job都將會使用這個調度池的名字。如果你想清除該線程相關的pool,只需調用如下代碼:
| sc.setLocalProperty("spark.scheduler.pool", null) |
在默認情況下,每個調度池擁有相同的優先級來共享整個應用所分得的集群資源。同樣的,默認池中的每個job也擁有同樣的調度優先級,但是在用戶創建的每個池中,job是通過FIFO方式進行調度的。
關于公平調度池的詳細配置,請參見官方文檔:Spark Job Scheduling。
如果你想閱讀相關實現代碼,可以觀看Schedulable.scala、SchedulingAlgorithm.scala以及SchedulableBuilder.scala等相關文件。
參考鏈接及進一步閱讀:
- Spark Job Scheduling
- Spark 作業調度–job執行方式介紹
- spark internal - 作業調度
容錯機制與檢查點
spark容錯機制是粗粒度并且是輕量級的,主要依賴于RDD的依賴鏈(lineage)。spark能夠通過lineage獲取足夠的信息來重新計算和恢復丟失的數據分區。這樣的基于lineage的容錯機制可以理解為粗粒度的重做日志(redo log)。
鑒于spark的基于lineage的容錯機制,RDD DAG中寬窄依賴的劃分對容錯也有很重要的作用。如果一個節點宕機了,而且運算是窄依賴,那只要把丟失的父RDD分區重算即可,跟其他節點沒有依賴。而寬依賴需要父RDD的所有分區都存在,重算代價就很高了。可以這樣理解為什么窄依賴開銷小而寬依賴開銷大:在窄依賴中,在子RDD分區丟失、重算父RDD分區時,父RDD相應分區的所有數據都是子RDD分區的數據,并不存在冗余計算;而在寬依賴中,丟失一個子RDD分區將導致其每個父RDD的多個甚至所有分區的重算,而重算的結果并不都是給當前丟失的子RDD分區用的,這樣就存在了冗余計算。
不過我們可以通過檢查點(checkpoint)機制解決上述問題,通過在RDD上做檢查點可以將物理RDD數據存儲到持久層(HDFS、S3等)中。在RDD上做檢查點的方法是在調用action算子之前調用checkpoint(),并且RDD最好是緩存在內存中的,否則可能導致重算(參見API注釋)。示例如下:
| // 假設rdd是你的RDD變量 rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) rdd.checkpoint() val count = rdd.count() |
在RDD上做檢查點會切斷RDD依賴,具體地spark會清空該RDD的父RDD依賴列表。并且由于檢查點機制是將RDD存儲在外部存儲系統上,所以它可以被其他應用重用。
過長的lineage(如在pagerank、spark streaming等中)也將導致過大的重算代價,而且還會占用很多系統資源。因此,在遇到寬依賴或者lineage足夠長時,我們都應該考慮做檢查點。
集群監控與運行日志
spark在應用執行時記錄詳細的進度信息和性能指標。這些內容可以在兩個地方找到:spark的網頁用戶界面以及driver進程和executor進程生成的日志文件中。
網頁用戶界面
在瀏覽器中打開?http://master:8080?頁面,你可以看到集群概況,包括:集群節點、可用的和已用的資源、已運行的和正在運行的應用等。
http://master:4040?頁面用來監控正在運行的應用(默認端口為4040,如果有多個應用在運行,那么端口順延,如4041、4042),包括其執行進度、構成Job的Stage的執行情況、Stage詳情、已緩存RDD的信息、各executor的信息、spark配置項以及應用依賴信息等,該頁面經常用來發現應用的效率瓶頸并輔助優化,不過該頁面只有在有spark應用運行時才可以被訪問到。
上述404x端口可用于查看正在運行的應用的執行詳情,但是應用運行結束之后該頁面就不可以訪問了。要想查看已經執行結束的應用的執行詳情,則需開啟事件日志機制,具體地設置如下兩個選項:
-
spark.eventLog.enabled: 設置為true時開啟事件日志機制。這樣已完成的spark作業就可以通過歷史服務器查看。
-
spark.eventLog.dir: 開啟事件日志機制時的事件日志文件存儲位置。如果要在歷史服務器中查看事件日志,需要將該值設置為一個全局可見的文件系統路徑,比如HDFS中。最后,請確保目錄以 ‘/‘ 結束,否則可能會出現如下錯誤:
Application history not found ... No event logs found for application ...
Did you specify the correct logging directory?
在配置好上述選項之后,我們就可以查看新提交的應用的詳細執行信息了。在不同的部署模式中,查看的方式不同。在standalone模式中,可以直接在master節點的UI界面(上述8080端口對應的頁面)中直接單擊已完成應用以查看詳細執行信息。在YARN/Mesos模式中,就要開啟歷史服務器了,此處略去。
Metrics系統
spark在其內部擁有一個可配置的度量系統(Metrics),它能夠將spark的內部狀態通過HTTP、JMX、CSV等多種不同形式呈現給用戶。同時,用戶也可以定義自己的數據源(Metrics Source)和數據輸出方式(Metrics Sink),從而獲取自己所需的數據。此處略去詳情,可參考下面的鏈接進一步閱讀。
參考鏈接及進一步閱讀:
- Spark Monitoring and Instrumentation: Metrics
查看日志文件
spark日志文件的具體位置取決于具體的部署模式。在standalone模式中,日志默認存儲于各個工作節點的spark目錄下的work目錄中,此時所有日志還可以直接通過主節點的網頁用戶界面進行查看。
默認情況下,spark輸出的日志包含的信息量比較合適。我們可以自定義日志行為,改變日志等級或存儲位置。spark日志系統使用log4j實現,我們只需將conf目錄下的log4j.properties.template復制一個并命名為log4j.properties,然后自定義修改即可。
SparkConf與配置
spark中最主要的配置機制是通過SparkConf類對spark進行配置。當創建出一個SparkContext時,就需要創建出一個SparkConf的實例作為參數。
SparkConf實例包含用戶要重載的配置選項的鍵值對,spark中的每個配置選項都是基于字符串形式的鍵值對。你可以調用SparkConf的set()或者setXxx()來設置對應選項。
另外,spark-submit腳本可以動態設置配置項。當應用被spark-submit腳本啟動時,腳本會把這些配置項設置到運行環境中。當一個新的SparkConf被創建出來時,這些環境變量會被檢測出來并且自動配到SparkConf中。這樣在使用spark-submit時,用戶應用通常只需創建一個“空”的SparkConf,并直接傳遞給SparkContext的構造方法即可。
spark-submit為常用的spark配置選項提供了專用的標記,還有一個通用標記--conf來接收任意spark配置項的值,形如--conf 屬性名=屬性值。
spark-submit也支持從文件中讀取配置項的值。默認情況下,spark-submit會在spark安裝目錄中找到conf/spark-defaults.conf文件,讀取該文件中以空格隔開的鍵值對數據。你也可以通過spark-submit的--properties-File選項來自定義該文件的路徑。
spark-defaults.conf的作用范圍要搞清楚,編輯driver所在機器上的spark-defaults.conf,該文件會影響到driver所提交運行的application,及專門為該application提供計算資源的executor的啟動參數。
spark有特定的優先級順序來選擇實際配置。優先級最高的是在用戶代碼中顯式調用set()方法設置的選項。其次是通過spark-submit傳遞的參數。再次是寫在配置文件中的值。最后是系統默認值。如果你想知道應用中實際生效的配置,可以在應用的網頁用戶界面中查看。
下面列出一些常用的配置項,完整的配置項列表可以參見官方配置文檔。
| spark.master | (none) | 表示要連接的集群管理器。 |
| spark.app.name | (none) | 應用名,將出現在UI和日志中。 |
| spark.driver.memory | 1g | 為driver進程分配的內存。注意:在客戶端模式中,不能在SparkConf中直接配置該項,因為driver JVM進程已經啟動了。 |
| spark.executor.memory | 1g | 為每個executor進程分配的內存。 |
| spark.executor.cores | all/1 | 每個executor可用的核心數。針對standalone和YARN模式。更多參見官方文檔。 |
| spark.cores.max | (not set) | 設置standalone和Mesos模式下應用程序的核心數上限。 |
| spark.speculation | false | 設置為true時開啟任務預測執行機制。當出現比較慢的任務時,這種機制會在另外的節點上也嘗試執行該任務的一個副本。打開此選項會幫助減少大規模集群中個別較慢的任務帶來的影響。 |
| spark.driver.extraJavaOptions | (none) | 設置driver節點的JVM啟動參數。 |
| spark.executor.extraJavaOptions | (none) | 設置executor節點的JVM啟動參數。 |
| spark.serializer | JavaSerializer | 指定用來進行序列化的類庫,包括通過網絡傳輸數據或緩存數據時的序列化。為了速度,推薦使用KryoSerializer。 |
| spark.eventLog.enabled | false | 設置為true時開啟事件日志機制。這樣已完成的spark作業就可以通過歷史服務器查看。 |
| spark.eventLog.dir | file:///tmp/spark-events | 開啟事件日志機制時的事件日志文件存儲位置。如果要在歷史服務器中查看事件日志,需要將該值設置為一個全局可見的文件系統路徑,比如HDFS中。最后,請確保目錄以 ‘/‘ 結束,否則可能會出現錯誤,參見本文集群監控部分。 |
一些問題的解決辦法
/tmp目錄寫滿
由于Spark在計算的時候會將中間結果存儲到/tmp目錄,而目前linux又都支持tmpfs,其實說白了就是將/tmp目錄掛載到內存當中。那么這里就存在一個問題,中間結果過多導致/tmp目錄寫滿而出現如下錯誤:
| No Space Left on the device |
解決辦法就是針對tmp目錄不啟用tmpfs,修改/etc/fstab。
無法創建進程
有時可能會遇到如下錯誤,即無法創建進程:
| java.lang.OutOfMemory, unable to create new native thread |
導致這種錯誤的原因比較多。有一種情況并非真的是內存不足引起的,而是由于超出了允許的最大文件句柄數或最大進程數。
排查的步驟就是查看一下允許打開的文件句柄數和最大進程數,如果數值過低,使用ulimit將其調高之后,再試試問題是否已經解決。
不可序列化
| Task not serializable: java.io.NotSerializableException |
作為RDD操作算子參數的匿名函數使用外部變量從而形成閉包。為了效率,spark并不是將所有東東都序列化以分發到各個executor。spark會先對該匿名函數進行ClosureCleaner.clean()處理(將該匿名函數涉及到的$outer中的與閉包無關的變量移除),然后將該匿名函數對象及閉包涉及到的對象序列化并包裝成task分發到各個executor。
看到這里,你或許就發現了一個問題,那就是不管怎樣,spark需要序列化的對象必須都可以被序列化!Task not serializable: java.io.NotSerializableException錯誤就是由于相應的對象不能被序列化造成的!
為了解決這個問題,首先你可以使用?-Dsun.io.serialization.extendedDebugInfo=true?java選項來讓jvm打印出更多的關于序列化的信息,以便了解哪些對象不可以被序列化。然后就是使這些對象對應的類可序列化,或者將這些對象定義在RDD操作算子的參數(匿名函數)中以取消閉包。
缺少winutils.exe
在windows上進行spark程序測試時,你可能會碰到如下幾個問題:
| java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. |
| java.lang.NullPointerException at java.lang.ProcessBuilder.start(ProcessBuilder.java:1010) |
原因就是缺少 hadoop 的?winutils.exe?這個文件。解決方法是:下載一個(注意是32位還是64位),新建一個文件夾 D:\hadoop\bin\ 并將 winutils.exe 放入其中,并保證winutils.exe雙擊運行沒有報*.dll缺失的錯誤,然后在程序中設置一下hadoop目錄即可,如下:
| System.setProperty("hadoop.home.dir", "D:\hadoop\") |
轉載于:https://www.cnblogs.com/xjh713/p/7301246.html
總結
以上是生活随笔為你收集整理的spark总结——转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: fiddler 抓取手机app请求包
- 下一篇: 第一次刷Leetcode,为什么耗费很多