RDD 与 DataFrame原理-区别-操作详解
1. RDD原理及操作
RDD (resilientdistributed dataset),指的是一個(gè)只讀的,可分區(qū)的分布式數(shù)據(jù)集,這個(gè)數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中,在多次計(jì)算間重用。RDD內(nèi)部可以有許多分區(qū)(partitions),每個(gè)分區(qū)又擁有大量的記錄(records)。RDD具有五大特征:
- dependencies:建立RDD的依賴關(guān)系,主要RDD之間是寬窄依賴的關(guān)系,具有窄依賴的可以在同一個(gè)stage中進(jìn)行計(jì)
- partition:每個(gè)RDD會(huì)有若干個(gè)分區(qū),分區(qū)的大小決定RDD計(jì)算粒度,每個(gè)RDD的分區(qū)的計(jì)算都在單獨(dú)的任務(wù)中進(jìn)行
- preferedlocations:按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算”原則,在spark進(jìn)行任務(wù)調(diào)度的時(shí)候,優(yōu)先將任務(wù)分配到數(shù)據(jù)塊存儲(chǔ)的位置
- compute:spark中的計(jì)算都是以分區(qū)為基本單位的,compute函數(shù)只是對(duì)迭代器進(jìn)行復(fù)合,并不保存單次計(jì)算的結(jié)果
- partitioner:只存在于(K,V)類型的RDD中,非(K,V)類型的partitioner的值就是None
RDD的算子action會(huì)觸發(fā)真正的作業(yè)提交,而transformation算子是不會(huì)立即觸發(fā)作業(yè)提交的。在Spark中,所有RDD的轉(zhuǎn)換都是是惰性求值的。RDD的轉(zhuǎn)換操作transformation會(huì)生成新的RDD,新的RDD的數(shù)據(jù)依賴于原來(lái)的RDD的數(shù)據(jù),每個(gè)RDD又包含多個(gè)分區(qū)。那么一段程序?qū)嶋H上就構(gòu)造了一個(gè)由相互依賴的多個(gè)RDD組成的有向無(wú)環(huán)圖(DAG)。并通過(guò)在RDD上執(zhí)行action動(dòng)作將這個(gè)有向無(wú)環(huán)圖作為一個(gè)Job提交給Spark執(zhí)行。在DAG中又進(jìn)行stage的劃分,劃分的依據(jù)是依賴算子是否是shuffle(如reduceByKey,Join等)的,每個(gè)stage又可以劃分成若干task。接下來(lái)的事情就是driver發(fā)送task到executor,executor自己的線程池去執(zhí)行這些task,完成之后將結(jié)果返回給driver。action算子是劃分不同job的依據(jù)。Spark對(duì)于有向無(wú)環(huán)圖Job進(jìn)行調(diào)度,確定階段(Stage),分區(qū)(Partition),流水線(Pipeline),任務(wù)(Task)和緩存(Cache),進(jìn)行優(yōu)化,并在Spark集群上運(yùn)行Job。RDD之間的依賴分為寬依賴(依賴多個(gè)分區(qū))和窄依賴(只依賴一個(gè)分區(qū)),在確定階段時(shí),需要根據(jù)寬依賴shuffle劃分階段。根據(jù)分區(qū)劃分任務(wù)。
Spark支持故障恢復(fù)的方式也不同,提供兩種方式:
- Linage:通過(guò)數(shù)據(jù)的血緣關(guān)系,再執(zhí)行一遍前面的處理
- Checkpoint:將數(shù)據(jù)集存儲(chǔ)到持久存儲(chǔ)中。每次迭代的數(shù)據(jù)可以保存在內(nèi)存中,而不是寫(xiě)入文件
2. 窄依賴與寬依賴
shuffle 是劃分 DAG 中 stage 的標(biāo)識(shí),同時(shí)影響 Spark 執(zhí)行速度的關(guān)鍵步驟。RDD 的 Transformation 函數(shù)中,又分為窄依賴(narrow dependency)和寬依賴(wide dependency)的操作。窄依賴跟寬依賴的區(qū)別是是否發(fā)生 shuffle(洗牌) 操作。寬依賴會(huì)發(fā)生 shuffle 操作。窄依賴是子 RDD的各個(gè)分片(partition)不依賴于其他分片,能夠獨(dú)立計(jì)算得到結(jié)果;寬依賴指子 RDD 的各個(gè)分片會(huì)依賴于父RDD 的多個(gè)分片,所以會(huì)造成父 RDD 的各個(gè)分片在集群中重新分片。如下圖所示:map就是一種窄依賴,而join則會(huì)導(dǎo)致寬依賴:
如上面的map,filter,union屬于第一類窄依賴,而join with inputs co-partitioned(對(duì)輸入進(jìn)行協(xié)同劃分的join操作,也就是說(shuō)先按照key分組然后shuffle write的時(shí)候一個(gè)父分區(qū)對(duì)應(yīng)一個(gè)子分區(qū))則為第二類窄依賴?groupByKey和對(duì)輸入未協(xié)同劃分的join操作就是寬依賴,這是shuffle類操作。
首先,窄依賴允許在單個(gè)集群節(jié)點(diǎn)上流水線式執(zhí)行,這個(gè)節(jié)點(diǎn)可以計(jì)算所有父級(jí)分區(qū)。例如,可以逐個(gè)元素地依次執(zhí)行filter操作和map操作。相反,寬依賴需要所有的父RDD數(shù)據(jù)可用并且數(shù)據(jù)已經(jīng)通過(guò)類MapReduce的操作shuffle完成。?其次,在窄依賴中,節(jié)點(diǎn)失敗后的恢復(fù)更加高效。因?yàn)橹挥衼G失的父級(jí)分區(qū)需要重新計(jì)算,并且這些丟失的父級(jí)分區(qū)可以并行地在不同節(jié)點(diǎn)上重新計(jì)算。與此相反,在寬依賴的繼承關(guān)系中,單個(gè)失敗的節(jié)點(diǎn)可能導(dǎo)致一個(gè)RDD的所有先祖RDD中的一些分區(qū)丟失,導(dǎo)致計(jì)算的重新執(zhí)行。
下面看一段代碼段:
// Map: "cat" -> c, cat val rdd1 = rdd.Map(x => (x.charAt(0), x)) // groupby same key and count val rdd2 = rdd1.groupBy(x => x._1).Map(x => (x._1, x._2.toList.length))第一個(gè) Map 操作將 RDD 里的各個(gè)元素進(jìn)行映射, RDD 的各個(gè)數(shù)據(jù)元素之間不存在依賴,可以在集群的各個(gè)內(nèi)存中獨(dú)立計(jì)算,也就是并行化;第二個(gè) groupby 之后的 Map 操作,為了計(jì)算相同 key 下的元素個(gè)數(shù),需要把相同 key 的元素聚集到同一個(gè) partition 下,所以造成了數(shù)據(jù)在內(nèi)存中的重新分布,即 shuffle 操作。shuffle 操作是 spark 中最耗時(shí)的操作,應(yīng)盡量避免不必要的 shuffle(join 需要針對(duì)同一個(gè) key 合并,所以需要 shuffle)?。根據(jù)是否發(fā)生 shuffle 操作能夠?qū)⑵浞殖扇缦碌?stage 類型:
運(yùn)行到每個(gè) stage 的邊界時(shí),數(shù)據(jù)在父 stage 中按照 Task 寫(xiě)到磁盤(pán)上,而在子 stage 中通過(guò)網(wǎng)絡(luò)從上一個(gè) Task 中去讀取數(shù)據(jù)。這些操作會(huì)導(dǎo)致很嚴(yán)重的網(wǎng)絡(luò)傳輸以及磁盤(pán)的I/O,所以?stage 的邊界是非常占資源的,在編寫(xiě) Spark 程序的時(shí)候需要盡量避免的?。父 stage 中 partition 個(gè)數(shù)與子 stage 的 partition 個(gè)數(shù)可能不同,所以那些產(chǎn)生 stage 邊界的 Transformation 常常需要接受一個(gè) numPartition 的參數(shù)來(lái)覺(jué)得子 stage 中的數(shù)據(jù)將被切分為多少個(gè) partition。?PS:shuffle 操作的時(shí)候可以用 combiner 壓縮數(shù)據(jù),減少 IO 的消耗。
3. 為什么我們還需要Data Frame
在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫(kù)中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對(duì)藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對(duì)性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無(wú)從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡(jiǎn)單、通用的流水線優(yōu)化。
3.1?DataFrame創(chuàng)建
SparkSQL可以以其他RDD對(duì)象、parquet文件、json文件、hive表,以及通過(guò)JDBC連接到其他關(guān)系型數(shù)據(jù)庫(kù)作為數(shù)據(jù)源來(lái)生成DataFrame對(duì)象。
-
JDBC
- parquet
- JSON
- List
- RDD
?
總結(jié)
以上是生活随笔為你收集整理的RDD 与 DataFrame原理-区别-操作详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: IDEA+scala+spark程序开发
- 下一篇: 酷站大全