日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪(fǎng)問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Spark弹性式数据集RDDs

發(fā)布時(shí)間:2025/1/21 编程问答 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark弹性式数据集RDDs 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

title: Spark彈性式數(shù)據(jù)集RDDs
date: 2021-05-08 16:24:20
tags:

  • Spark

RDD 全稱(chēng)為 Resilient Distributed Datasets,是 Spark 最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合,支持并行操作,可以由外部數(shù)據(jù)集或其他 RDD 轉(zhuǎn)換而來(lái).

一、RDD簡(jiǎn)介

RDD 全稱(chēng)為 Resilient Distributed Datasets,是 Spark 最基本的數(shù)據(jù)抽象,它是只讀的、分區(qū)記錄的集合,支持并行操作,可以由外部數(shù)據(jù)集或其他 RDD 轉(zhuǎn)換而來(lái),它具有以下特性:

一個(gè) RDD 由一個(gè)或者多個(gè)分區(qū)(Partitions)組成。對(duì)于 RDD 來(lái)說(shuō),每個(gè)分區(qū)會(huì)被一個(gè)計(jì)算任務(wù)所處理,用戶(hù)可以在創(chuàng)建 RDD 時(shí)指定其分區(qū)個(gè)數(shù),如果沒(méi)有指定,則默認(rèn)采用程序所分配到的 CPU 的核心數(shù);

RDD 擁有一個(gè)用于計(jì)算分區(qū)的函數(shù) compute;

RDD 會(huì)保存彼此間的依賴(lài)關(guān)系,RDD 的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的依賴(lài)關(guān)系,這種 RDD 之間的依賴(lài)關(guān)系就像流水線(xiàn)一樣。在部分分區(qū)數(shù)據(jù)丟失后,可以通過(guò)這種依賴(lài)關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì) RDD 的所有分區(qū)進(jìn)行重新計(jì)算;

Key-Value 型的 RDD 還擁有 Partitioner(分區(qū)器),用于決定數(shù)據(jù)被存儲(chǔ)在哪個(gè)分區(qū)中,目前 Spark 中支持 HashPartitioner(按照哈希分區(qū)) 和 RangeParationer(按照范圍進(jìn)行分區(qū));

一個(gè)優(yōu)先位置列表 (可選),用于存儲(chǔ)每個(gè)分區(qū)的優(yōu)先位置 (prefered location)。對(duì)于一個(gè) HDFS 文件來(lái)說(shuō),這個(gè)列表保存的就是每個(gè)分區(qū)所在的塊的位置,按照“移動(dòng)數(shù)據(jù)不如移動(dòng)計(jì)算“的理念,Spark 在進(jìn)行任務(wù)調(diào)度的時(shí)候,會(huì)盡可能的將計(jì)算任務(wù)分配到其所要處理數(shù)據(jù)塊的存儲(chǔ)位置。

RDD[T] 抽象類(lèi)的部分相關(guān)代碼如下:

// 由子類(lèi)實(shí)現(xiàn)以計(jì)算給定分區(qū) def compute(split: Partition, context: TaskContext): Iterator[T]// 獲取所有分區(qū) protected def getPartitions: Array[Partition]// 獲取所有依賴(lài)關(guān)系 protected def getDependencies: Seq[Dependency[_]] = deps// 獲取優(yōu)先位置列表 protected def getPreferredLocations(split: Partition): Seq[String] = Nil// 分區(qū)器 由子類(lèi)重寫(xiě)以指定它們的分區(qū)方式 @transient val partitioner: Option[Partitioner] = None

二、創(chuàng)建RDD

RDD 有兩種創(chuàng)建方式,分別介紹如下:

2.1 由現(xiàn)有集合創(chuàng)建

這里使用 spark-shell 進(jìn)行測(cè)試,啟動(dòng)命令如下:

spark-shell --master local[4]

啟動(dòng) spark-shell 后,程序會(huì)自動(dòng)創(chuàng)建應(yīng)用上下文,相當(dāng)于執(zhí)行了下面的 Scala 語(yǔ)句:

val conf = new SparkConf().setAppName("Spark shell").setMaster("local[4]") val sc = new SparkContext(conf)

由現(xiàn)有集合創(chuàng)建 RDD,你可以在創(chuàng)建時(shí)指定其分區(qū)個(gè)數(shù),如果沒(méi)有指定,則采用程序所分配到的 CPU 的核心數(shù):

val data = Array(1, 2, 3, 4, 5) // 由現(xiàn)有集合創(chuàng)建 RDD,默認(rèn)分區(qū)數(shù)為程序所分配到的 CPU 的核心數(shù) val dataRDD = sc.parallelize(data) // 查看分區(qū)數(shù) dataRDD.getNumPartitions // 明確指定分區(qū)數(shù) val dataRDD = sc.parallelize(data,2)

執(zhí)行結(jié)果如下:

2.2 引用外部存儲(chǔ)系統(tǒng)中的數(shù)據(jù)集

引用外部存儲(chǔ)系統(tǒng)中的數(shù)據(jù)集,例如本地文件系統(tǒng),HDFS,HBase 或支持 Hadoop InputFormat 的任何數(shù)據(jù)源。

val fileRDD = sc.textFile("/usr/file/emp.txt") // 獲取第一行文本 fileRDD.take(1)

使用外部存儲(chǔ)系統(tǒng)時(shí)需要注意以下兩點(diǎn):

如果在集群環(huán)境下從本地文件系統(tǒng)讀取數(shù)據(jù),則要求該文件必須在集群中所有機(jī)器上都存在,且路徑相同;
支持目錄路徑,支持壓縮文件,支持使用通配符。

2.3 textFile & wholeTextFiles

兩者都可以用來(lái)讀取外部文件,但是返回格式是不同的:

textFile:其返回格式是 RDD[String] ,返回的是就是文件內(nèi)容,RDD 中每一個(gè)元素對(duì)應(yīng)一行數(shù)據(jù);
wholeTextFiles:其返回格式是 RDD[(String, String)],元組中第一個(gè)參數(shù)是文件路徑,第二個(gè)參數(shù)是文件內(nèi)容;
兩者都提供第二個(gè)參數(shù)來(lái)控制最小分區(qū)數(shù);
從 HDFS 上讀取文件時(shí),Spark 會(huì)為每個(gè)塊創(chuàng)建一個(gè)分區(qū)。

三、操作RDD
RDD 支持兩種類(lèi)型的操作:transformations(轉(zhuǎn)換,從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集)和 actions(在數(shù)據(jù)集上運(yùn)行計(jì)算后將值返回到驅(qū)動(dòng)程序)。RDD 中的所有轉(zhuǎn)換操作都是惰性的,它們只是記住這些轉(zhuǎn)換操作,但不會(huì)立即執(zhí)行,只有遇到 action 操作后才會(huì)真正的進(jìn)行計(jì)算,這類(lèi)似于函數(shù)式編程中的惰性求值。

def textFile(path: String,minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {...} def wholeTextFiles(path: String,minPartitions: Int = defaultMinPartitions): RDD[(String, String)]={..}

三、操作RDD

RDD 支持兩種類(lèi)型的操作:transformations(轉(zhuǎn)換,從現(xiàn)有數(shù)據(jù)集創(chuàng)建新數(shù)據(jù)集)和 actions(在數(shù)據(jù)集上運(yùn)行計(jì)算后將值返回到驅(qū)動(dòng)程序)。RDD 中的所有轉(zhuǎn)換操作都是惰性的,它們只是記住這些轉(zhuǎn)換操作,但不會(huì)立即執(zhí)行,只有遇到 action 操作后才會(huì)真正的進(jìn)行計(jì)算,這類(lèi)似于函數(shù)式編程中的惰性求值。

val list = List(1, 2, 3) // map 是一個(gè) transformations 操作,而 foreach 是一個(gè) actions 操作 sc.parallelize(list).map(_ * 10).foreach(println) // 輸出: 10 20 30

四、緩存RDD

4.1 緩存級(jí)別

Spark 速度非常快的一個(gè)原因是 RDD 支持緩存。成功緩存后,如果之后的操作使用到了該數(shù)據(jù)集,則直接從緩存中獲取。雖然緩存也有丟失的風(fēng)險(xiǎn),但是由于 RDD 之間的依賴(lài)關(guān)系,如果某個(gè)分區(qū)的緩存數(shù)據(jù)丟失,只需要重新計(jì)算該分區(qū)即可。

Spark 支持多種緩存級(jí)別 :

啟動(dòng)堆外內(nèi)存需要配置兩個(gè)參數(shù):

spark.memory.offHeap.enabled :是否開(kāi)啟堆外內(nèi)存,默認(rèn)值為 false,需要設(shè)置為 true;
spark.memory.offHeap.size : 堆外內(nèi)存空間的大小,默認(rèn)值為 0,需要設(shè)置為正值。

4.2 使用緩存

緩存數(shù)據(jù)的方法有兩個(gè):persist 和 cache 。cache 內(nèi)部調(diào)用的也是 persist,它是 persist 的特殊化形式,等價(jià)于 persist(StorageLevel.MEMORY_ONLY)。示例如下:

// 所有存儲(chǔ)級(jí)別均定義在 StorageLevel 對(duì)象中 fileRDD.persist(StorageLevel.MEMORY_AND_DISK) fileRDD.cache()

4.3 移除緩存

Spark 會(huì)自動(dòng)監(jiān)視每個(gè)節(jié)點(diǎn)上的緩存使用情況,并按照最近最少使用(LRU)的規(guī)則刪除舊數(shù)據(jù)分區(qū)。當(dāng)然,你也可以使用 RDD.unpersist() 方法進(jìn)行手動(dòng)刪除。

五、理解shuffle

5.1 shuffle介紹

在 Spark 中,一個(gè)任務(wù)對(duì)應(yīng)一個(gè)分區(qū),通常不會(huì)跨分區(qū)操作數(shù)據(jù)。但如果遇到 reduceByKey 等操作,Spark 必須從所有分區(qū)讀取數(shù)據(jù),并查找所有鍵的所有值,然后匯總在一起以計(jì)算每個(gè)鍵的最終結(jié)果 ,這稱(chēng)為 Shuffle。

5.2 Shuffle的影響

Shuffle 是一項(xiàng)昂貴的操作,因?yàn)樗ǔ?huì)跨節(jié)點(diǎn)操作數(shù)據(jù),這會(huì)涉及磁盤(pán) I/O,網(wǎng)絡(luò) I/O,和數(shù)據(jù)序列化。某些 Shuffle 操作還會(huì)消耗大量的堆內(nèi)存,因?yàn)樗鼈兪褂枚褍?nèi)存來(lái)臨時(shí)存儲(chǔ)需要網(wǎng)絡(luò)傳輸?shù)臄?shù)據(jù)。Shuffle 還會(huì)在磁盤(pán)上生成大量中間文件,從 Spark 1.3 開(kāi)始,這些文件將被保留,直到相應(yīng)的 RDD 不再使用并進(jìn)行垃圾回收,這樣做是為了避免在計(jì)算時(shí)重復(fù)創(chuàng)建 Shuffle 文件。如果應(yīng)用程序長(zhǎng)期保留對(duì)這些 RDD 的引用,則垃圾回收可能在很長(zhǎng)一段時(shí)間后才會(huì)發(fā)生,這意味著長(zhǎng)時(shí)間運(yùn)行的 Spark 作業(yè)可能會(huì)占用大量磁盤(pán)空間,通常可以使用 spark.local.dir 參數(shù)來(lái)指定這些臨時(shí)文件的存儲(chǔ)目錄。

5.3 導(dǎo)致Shuffle的操作

由于 Shuffle 操作對(duì)性能的影響比較大,所以需要特別注意使用,以下操作都會(huì)導(dǎo)致 Shuffle:

涉及到重新分區(qū)操作: 如 repartition 和 coalesce;

所有涉及到 ByKey 的操作:如 groupByKey 和 reduceByKey,但 countByKey 除外;

聯(lián)結(jié)操作:如 cogroup 和 join。

五、寬依賴(lài)和窄依賴(lài)

RDD 和它的父 RDD(s) 之間的依賴(lài)關(guān)系分為兩種不同的類(lèi)型:

窄依賴(lài) (narrow dependency):父 RDDs 的一個(gè)分區(qū)最多被子 RDDs 一個(gè)分區(qū)所依賴(lài);

寬依賴(lài) (wide dependency):父 RDDs 的一個(gè)分區(qū)可以被子 RDDs 的多個(gè)子分區(qū)所依賴(lài)。

如下圖,每一個(gè)方框表示一個(gè) RDD,帶有顏色的矩形表示分區(qū):

區(qū)分這兩種依賴(lài)是非常有用的:

首先,窄依賴(lài)允許在一個(gè)集群節(jié)點(diǎn)上以流水線(xiàn)的方式(pipeline)對(duì)父分區(qū)數(shù)據(jù)進(jìn)行計(jì)算,例如先執(zhí)行 map 操作,然后執(zhí)行 filter 操作。而寬依賴(lài)則需要計(jì)算好所有父分區(qū)的數(shù)據(jù),然后再在節(jié)點(diǎn)之間進(jìn)行 Shuffle,這與 MapReduce 類(lèi)似。

窄依賴(lài)能夠更有效地進(jìn)行數(shù)據(jù)恢復(fù),因?yàn)橹恍柚匦聦?duì)丟失分區(qū)的父分區(qū)進(jìn)行計(jì)算,且不同節(jié)點(diǎn)之間可以并行計(jì)算;而對(duì)于寬依賴(lài)而言,如果數(shù)據(jù)丟失,則需要對(duì)所有父分區(qū)數(shù)據(jù)進(jìn)行計(jì)算并再次 Shuffle。

六、DAG的生成

RDD(s) 及其之間的依賴(lài)關(guān)系組成了 DAG(有向無(wú)環(huán)圖),DAG 定義了這些 RDD(s) 之間的 Lineage(血統(tǒng)) 關(guān)系,通過(guò)血統(tǒng)關(guān)系,如果一個(gè) RDD 的部分或者全部計(jì)算結(jié)果丟失了,也可以重新進(jìn)行計(jì)算。那么 Spark 是如何根據(jù) DAG 來(lái)生成計(jì)算任務(wù)呢?主要是根據(jù)依賴(lài)關(guān)系的不同將 DAG 劃分為不同的計(jì)算階段 (Stage):

對(duì)于窄依賴(lài),由于分區(qū)的依賴(lài)關(guān)系是確定的,其轉(zhuǎn)換操作可以在同一個(gè)線(xiàn)程執(zhí)行,所以可以劃分到同一個(gè)執(zhí)行階段;

對(duì)于寬依賴(lài),由于 Shuffle 的存在,只能在父 RDD(s) 被 Shuffle 處理完成后,才能開(kāi)始接下來(lái)的計(jì)算,因此遇到寬依賴(lài)就需要重新劃分階段。

參考鏈接:https://blog.csdn.net/m0_37809146/article/details/91278827

總結(jié)

以上是生活随笔為你收集整理的Spark弹性式数据集RDDs的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。