弹性式分布数据集RDD——Pyspark基础 (二)
連載中:http://ihoge.cn/tags/pyspark/
title: 彈性式分布數(shù)據(jù)集RDD——Pyspark基礎(chǔ) (二)
date: 2018-04-15 17:59:21
comments: true
categories:
- Spark
tags:
- pyspark
RDD的內(nèi)部運(yùn)行方式
RDD不僅是一組不可變的JVM(Java虛擬機(jī))對(duì)象的分布集,而且是Spark的核心,可以讓任務(wù)執(zhí)行高速運(yùn)算。
RDD將跟蹤(計(jì)入日記)應(yīng)用于每個(gè)快的所有轉(zhuǎn)換,以加速計(jì)算速度,并在發(fā)生錯(cuò)誤和部分?jǐn)?shù)據(jù)丟失時(shí)提供回退(容錯(cuò)機(jī)制)。
RDD采用并行的運(yùn)行方式,也就是每個(gè)轉(zhuǎn)換操作并行執(zhí)行,從而提高速度。
RDD有兩種并行操作:
- 轉(zhuǎn)換操作(返回指向新的RDD的指針)
- 動(dòng)作操作(在運(yùn)行計(jì)算后向驅(qū)動(dòng)程序返回值)
數(shù)據(jù)集的轉(zhuǎn)換通常是惰性的,這也意味著任何轉(zhuǎn)換操作僅在調(diào)用數(shù)據(jù)集上的操作時(shí)才執(zhí)行。該延遲執(zhí)行會(huì)產(chǎn)生風(fēng)多的精細(xì)查詢:針對(duì)性能進(jìn)行優(yōu)化查詢。這種優(yōu)化始于Spark的DAGScheduler——面向階段的調(diào)度器。DAGScheduler負(fù)責(zé)Stage級(jí)的調(diào)度詳見(jiàn):Spark運(yùn)行原理剖析
由于具有單獨(dú)的RDD轉(zhuǎn)換和動(dòng)作,DAGScheduler可以在查詢中執(zhí)行優(yōu)化。包括但不限于避免shuffle數(shù)據(jù)(最耗費(fèi)資源的任務(wù))
創(chuàng)建RDD
方式一: 用.parallelize(...)集合(元素list或array)
data = sc.parallelize([('a',1),('b',2),('c',3),('d',5),('e',5)])方式二: 讀入外部文件
- 支持多文件系統(tǒng)中讀取:如NTFS、FAT、HFS+(Mac OS Extended),或者如HDFS、S3、Cassandra這類的分布式文件系統(tǒng),還有其他類文件系統(tǒng)。
- 指出多種數(shù)據(jù)格式:如文本、parquet、JSON、Hive tables(Hive表)以及使用JDBC驅(qū)動(dòng)程序可讀取的關(guān)系數(shù)據(jù)庫(kù)中的數(shù)據(jù)。(注意:Spark可以自動(dòng)處理壓縮數(shù)據(jù)集)
��Tip1:讀取的方式不同,持有對(duì)象表達(dá)方式也不同。從文件中讀取的數(shù)據(jù)表示為MapPartitionsRDD;使用集合方法的數(shù)據(jù)表示為ParallelCollectionRDD
��**Tip2:**RDD是無(wú)schema的數(shù)據(jù)結(jié)構(gòu)(和DataFrame不同),所以我們幾乎可以混用任何數(shù)據(jù)結(jié)構(gòu):tuple、dict、list和spark等都能支持。如果對(duì)數(shù)據(jù)集使用.collect()方法,將把RDD對(duì)所有元素返回給驅(qū)動(dòng)程序,驅(qū)動(dòng)程序?qū)⑵湫蛄谢闪艘粋€(gè)列表。
data_from_file = sc.textFile("hdfs://master:9000/pydata/VS14MORT.txt.gz",4) # 這里表示4個(gè)分區(qū) def extractInformation(row):import reimport numpy as npselected_indices = [2,4,5,6,7,9,10,11,12,13,14,15,16,17,18,19,21,22,23,24,25,27,28,29,30,32,33,34,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,58,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,81,82,83,84,85,87,89]'''Input record schemaschema: n-m (o) -- xxxn - position fromm - position too - number of charactersxxx - description1. 1-19 (19) -- reserved positions2. 20 (1) -- resident status3. 21-60 (40) -- reserved positions4. 61-62 (2) -- education code (1989 revision)5. 63 (1) -- education code (2003 revision)6. 64 (1) -- education reporting flag7. 65-66 (2) -- month of death8. 67-68 (2) -- reserved positions9. 69 (1) -- sex10. 70 (1) -- age: 1-years, 2-months, 4-days, 5-hours, 6-minutes, 9-not stated11. 71-73 (3) -- number of units (years, months etc)12. 74 (1) -- age substitution flag (if the age reported in positions 70-74 is calculated using dates of birth and death)13. 75-76 (2) -- age recoded into 52 categories14. 77-78 (2) -- age recoded into 27 categories15. 79-80 (2) -- age recoded into 12 categories16. 81-82 (2) -- infant age recoded into 22 categories17. 83 (1) -- place of death18. 84 (1) -- marital status19. 85 (1) -- day of the week of death20. 86-101 (16) -- reserved positions21. 102-105 (4) -- current year22. 106 (1) -- injury at work23. 107 (1) -- manner of death24. 108 (1) -- manner of disposition25. 109 (1) -- autopsy26. 110-143 (34) -- reserved positions27. 144 (1) -- activity code28. 145 (1) -- place of injury29. 146-149 (4) -- ICD code30. 150-152 (3) -- 358 cause recode31. 153 (1) -- reserved position32. 154-156 (3) -- 113 cause recode33. 157-159 (3) -- 130 infant cause recode34. 160-161 (2) -- 39 cause recode35. 162 (1) -- reserved position36. 163-164 (2) -- number of entity-axis conditions37-56. 165-304 (140) -- list of up to 20 conditions57. 305-340 (36) -- reserved positions58. 341-342 (2) -- number of record axis conditions59. 343 (1) -- reserved position60-79. 344-443 (100) -- record axis conditions80. 444 (1) -- reserve position81. 445-446 (2) -- race82. 447 (1) -- bridged race flag83. 448 (1) -- race imputation flag84. 449 (1) -- race recode (3 categories)85. 450 (1) -- race recode (5 categories)86. 461-483 (33) -- reserved positions87. 484-486 (3) -- Hispanic origin88. 487 (1) -- reserved89. 488 (1) -- Hispanic origin/race recode'''record_split = re\.compile(r'([\s]{19})([0-9]{1})([\s]{40})([0-9\s]{2})([0-9\s]{1})([0-9]{1})([0-9]{2})' + r'([\s]{2})([FM]{1})([0-9]{1})([0-9]{3})([0-9\s]{1})([0-9]{2})([0-9]{2})' + r'([0-9]{2})([0-9\s]{2})([0-9]{1})([SMWDU]{1})([0-9]{1})([\s]{16})([0-9]{4})' +r'([YNU]{1})([0-9\s]{1})([BCOU]{1})([YNU]{1})([\s]{34})([0-9\s]{1})([0-9\s]{1})' +r'([A-Z0-9\s]{4})([0-9]{3})([\s]{1})([0-9\s]{3})([0-9\s]{3})([0-9\s]{2})([\s]{1})' + r'([0-9\s]{2})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})([A-Z0-9\s]{7})' + r'([A-Z0-9\s]{7})([\s]{36})([A-Z0-9\s]{2})([\s]{1})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})' + r'([A-Z0-9\s]{5})([A-Z0-9\s]{5})([A-Z0-9\s]{5})([\s]{1})([0-9\s]{2})([0-9\s]{1})' + r'([0-9\s]{1})([0-9\s]{1})([0-9\s]{1})([\s]{33})([0-9\s]{3})([0-9\s]{1})([0-9\s]{1})')try:rs = np.array(record_split.split(row))[selected_indices]except:rs = np.array(['-99'] * len(selected_indices))return rsdata_file = data_from_file.map(extractInformation) data_file.map(lambda row: row).take(1) data_file.cache() data_file.is_cached True全局作用域和局部作用域
Spark可以在兩種模式下運(yùn)行:本地和集群。本地運(yùn)行Spark代碼時(shí)和目前使用的python沒(méi)有說(shuō)明不同。然而他如果將相同的代碼部署到集群,便可能會(huì)導(dǎo)致大量的困擾,這就需要了解Spark是怎么在集群上執(zhí)行工作的。這里有一篇文章介紹的很詳細(xì)。參考:Spark運(yùn)行原理詳解
在集群模式下,提交任務(wù)時(shí)任務(wù)發(fā)送給了Master節(jié)點(diǎn)。該驅(qū)動(dòng)程序節(jié)點(diǎn)為任務(wù)創(chuàng)建DAG,并且決定哪一個(gè)執(zhí)行者(Worker)節(jié)點(diǎn)運(yùn)行特定的任務(wù)。然后該驅(qū)動(dòng)程序知識(shí)工作者執(zhí)行它們的任務(wù),并且在結(jié)束時(shí)將結(jié)果返回給驅(qū)動(dòng)程序。然而在這之前,驅(qū)動(dòng)程序?yàn)槊恳粋€(gè)任務(wù)的終止做準(zhǔn)備:驅(qū)動(dòng)程序中有一組變量和方法,以變工作者在RDD上執(zhí)行任務(wù)。
這組變量和方法在執(zhí)行者的上下問(wèn)本質(zhì)上是靜態(tài)的,每個(gè)執(zhí)行器從驅(qū)動(dòng)程序中獲取的一份變量和方法的副本。這意味著運(yùn)行任務(wù)時(shí),如果執(zhí)行者改變這些變量或覆蓋這些方法,它不影響任何其他執(zhí)行者的副本或者驅(qū)動(dòng)程序的變量和方法。這可能會(huì)導(dǎo)致一些意想不到的行為和運(yùn)行錯(cuò)誤,這些行為和錯(cuò)誤通常都很難被追蹤到。
轉(zhuǎn)換
轉(zhuǎn)換操作可以調(diào)整數(shù)據(jù)集。包括映射、篩選、鏈接、轉(zhuǎn)換數(shù)據(jù)集中的值。
.map()轉(zhuǎn)換
data_2014 = data_file.map(lambda x: x[16]) data_2014.take(10) ['2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '2014', '-99'].filter()轉(zhuǎn)換
data_filter = data_file.filter(lambda x: x[16] == '2014' and x[21] == '0') print(data_filter.count()) data_file.take(2) 22[array(['1', ' ', '2', '1', '01', 'M', '1', '087', ' ', '43', '23', '11',' ', '4', 'M', '4', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I64 ','238', '070', ' ', '24', '01', '11I64 ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', '01','I64 ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40'),array(['1', ' ', '2', '1', '01', 'M', '1', '058', ' ', '37', '17', '08',' ', '4', 'D', '3', '2014', 'U', '7', 'C', 'N', ' ', ' ', 'I250','214', '062', ' ', '21', '03', '11I250 ', '61I272 ', '62E669 ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', '03','I250 ', 'E669 ', 'I272 ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', ' ',' ', ' ', ' ', ' ', ' ', ' ', '01', ' ',' ', '1', '1', '100', '6'], dtype='<U40')].flatMap()轉(zhuǎn)換
.flatMap()方法和.map()工作類似,不同的是flatMap()返回一個(gè)扁平的結(jié)果而不是一個(gè)列表。
data_flat = data_file.flatMap(lambda x: (x[16], int(x[16])+1)) data_flat.take(10) ['2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015, '2014', 2015].flatMap()可以用于過(guò)濾一些格式不正確的記錄。在這個(gè)機(jī)制下,.flatMap()方法吧每一行看作一個(gè)列表對(duì)待,然后將所有記錄簡(jiǎn)單的加入到一起,通過(guò)傳遞一個(gè)空列表可以丟棄格式不正確的記錄。
.distinct()轉(zhuǎn)換
這里用該方法檢查性別列表是否只包含了男性和女性驗(yàn)證我們是否準(zhǔn)確解釋了數(shù)據(jù)集。
distinct_gender = data_file.map(lambda x: x[5]).distinct() distinct_gender.collect() ['M', 'F', '-99'].sample() 轉(zhuǎn)換
該方法返回?cái)?shù)據(jù)集的隨機(jī)樣本。第一個(gè)參數(shù)withReplacement指定采樣是否應(yīng)該替換,第二個(gè)參數(shù)fraction定義返回?cái)?shù)據(jù)量的百分比,第三個(gè)參數(shù)是偽隨機(jī)數(shù)產(chǎn)生器的種子seed。
為了節(jié)省運(yùn)算時(shí)間,這里選取愿數(shù)據(jù)千分之一的隨機(jī)數(shù)據(jù)作為下面的練習(xí)數(shù)據(jù)。
data_sample = data_file.sample(False, 0.001, 666) data_sample.cache() PythonRDD[25] at RDD at PythonRDD.scala:48.leftOuterJoin()轉(zhuǎn)換
- .leftOuterJoin(): 根據(jù)兩個(gè)數(shù)據(jù)集中都有得值來(lái)連接兩個(gè)RDD,并返回左側(cè)的RDD記錄,而右邊的記錄副加載兩個(gè)RDD匹配的地方。
- .join() :只返回兩個(gè)RDD之間的關(guān)聯(lián)數(shù)值
- .intersection():返回兩個(gè)RDD中相等的記錄
.repartition()轉(zhuǎn)換
重新對(duì)數(shù)據(jù)集進(jìn)行分區(qū),改變數(shù)據(jù)集分賽區(qū)的數(shù)量。此功能應(yīng)該謹(jǐn)慎并且僅當(dāng)真正需要的時(shí)候使用,因?yàn)樗鼤?huì)充足數(shù)據(jù),導(dǎo)致性能產(chǎn)生巨大的影響。
print(len(rdd2.glom().collect())) rdd2 = rdd2.repartition(4) print(len(rdd2.glom().collect())) 3 4動(dòng)作
.collect() 動(dòng)作
返回所有RDD的元素給驅(qū)動(dòng)程序
��同時(shí)常用的還有: .collectAsMap()方法
.take() 動(dòng)作
可以說(shuō)這事最有用的方法,返回單個(gè)數(shù)據(jù)分區(qū)的前n行。
rdd.take(1) #等同于: rdd.first().reduce() 動(dòng)作
該方法使用指定的方法減少RDD中的元素。可以用該方法計(jì)算RDD總的元素之和:
rdd1.map(lambda x: x[1]).reduce(lambda x, y: x + y)在每一個(gè)分區(qū)里,reduce()方法運(yùn)行求和方法,將改總和返回給最終聚合所在的程序節(jié)點(diǎn)。
??警告:
要謹(jǐn)慎注意的是,reduce傳遞的函數(shù)需要時(shí)關(guān)聯(lián)的,既滿足元素順序改變結(jié)果不變,操作符順序改變結(jié)果不變。如:
這里我們希望輸出結(jié)果是10.0,第一個(gè)只把RDD放在一個(gè)分區(qū),輸出結(jié)果符合預(yù)期。但是在第二個(gè)例子中,分了2個(gè)區(qū),結(jié)果就不對(duì)了。因?yàn)樵摲椒ㄊ窃诿總€(gè)分區(qū)并行計(jì)算的。
.reduceByKey() 動(dòng)作
該方法和.reduce()方法類似,但是實(shí)在key-key基礎(chǔ)上運(yùn)行:
data_key = sc.parallelize([('a',3), ('a',1), ('b',6), ('d',1), ('b',6), ('d',15), ('d',3), ('a',7), ('b', 8)],4) data_key.reduceByKey(lambda x, y: x+y).collect() [('b', 20), ('a', 11), ('d', 19)].count() 動(dòng)作
.count() 方法統(tǒng)計(jì)出了RDD里所有的元素?cái)?shù)量。
rdd.count().count() 方法產(chǎn)生入戲方法同樣的結(jié)果,但不需要把整個(gè)數(shù)據(jù)集移動(dòng)到驅(qū)動(dòng)程序:
len(rdd.collect()). # ??警告:不要這樣做!!.countByKey() 動(dòng)作
如果數(shù)據(jù)集是Ket-Value形式,可以使用.countByKey()方法
data_key.countByKey().items() dict_items([('a', 3), ('b', 3), ('d', 3)]).saveAsTextFile() 動(dòng)作
該方法將RDD保存為文本文件:每個(gè)文件一個(gè)分區(qū)
data_key.saveAsTextFile('hdfs://master:9000/out/data_key.txt')要讀取它的時(shí)候需要解析,因?yàn)樗行卸急灰暈樽址?#xff1a;
def parseInput(row):import repattern = re.compile(r"\(\'([a-z]+)\',.([0-9]+)\)") # 這里“+”號(hào)代表匹配一個(gè)或多個(gè)匹配字符,否則針對(duì)雙位數(shù)動(dòng)作操作會(huì)報(bào)錯(cuò)row_split = pattern.split(row)return (row_split[1], row_split[2]) data_key_read = sc.textFile('hdfs://master:9000/out/data_key.txt') data_key_read.map(parseInput).collect() [('a', '3'),('a', '1'),('b', '6'),('d', '1'),('b', '6'),('d', '15'),('d', '3'),('a', '7'),('b', '8')]��同時(shí)還有:
- rdd.saveAsHadoopDataset
- rdd.saveAsSequenceFile
- …
等方法
.foreach() 動(dòng)作
這個(gè)方法對(duì)RDD里的每個(gè)元素,用迭代方法應(yīng)用相同的函數(shù);和.map()相比,.foreach()方法按照一個(gè)接一個(gè)的方式,對(duì)每一條記錄應(yīng)用一個(gè)定義好的函數(shù)。當(dāng)希望將數(shù)據(jù)曹村道PySpark本身不支持的數(shù)據(jù)庫(kù)是,該方法很有用。
def f(x):print(x)rdd.foreach(f)小結(jié):
- RDD是Spark的核心;這些無(wú)schema數(shù)據(jù)結(jié)構(gòu)早Spark中處理的最基本的數(shù)據(jù)結(jié)構(gòu)。
- RDD的兩種創(chuàng)建方式: parallelize 和 文件讀取
- Spark中的轉(zhuǎn)化是惰性的,只在操作被調(diào)用時(shí)應(yīng)用。
- Scala 和 Python RDD之間一個(gè)主要的區(qū)別是速度: Python RDD 比 Scala 慢很多!
總結(jié)
以上是生活随笔為你收集整理的弹性式分布数据集RDD——Pyspark基础 (二)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: Spark运行原理剖析
- 下一篇: 免安装免配置 还免费的Spark 集群