Spark编程指南笔记
Spark編程指南筆記
標簽:?spark?編程?筆記?| 發表時間:2015-02-02 16:00 | 作者: 分享到: 出處:http://blog.javachen.com/rss.xml本文是參考Spark官方編程指南(Spark 版本為1.2)整理出來的學習筆記,主要是用于加深對 Spark 的理解,并記錄一些知識點。
1. 一些概念
每一個 Spark 的應用,都是由一個驅動程序構成,它運行用戶的 main 函數,在一個集群上執行各種各樣的并行操作。
Spark 提出的最主要抽象概念是?彈性分布式數據集,它是一個有容錯機制(劃分到集群的各個節點上)并可以被并行操作的元素集合。目前有兩種類型的RDD:
- 并行集合:接收一個已經存在的 Scala 集合,然后進行各種并行計算。
- 外部數據集:外部存儲系統,例如一個共享的文件系統,HDFS、HBase以及任何支持 Hadoop InputFormat 的數據源。
這兩種類型的 RDD 都可以通過相同的方式進行操作。用戶可以讓 Spark 保留一個 RDD 在內存中,使其能在并行操作中被有效的重復使用,并且,RDD 能自動從節點故障中恢復。
Spark 的第二個抽象概念是?共享變量,可以在并行操作中使用。在默認情況下,Spark 通過不同節點上的一系列任務來運行一個函數,它將每一個函數中用到的變量的拷貝傳遞到每一個任務中。有時候,一個變量需要在任務之間,或任務與驅動程序之間被共享。
Spark 支持兩種類型的共享變量:?廣播變量,可以在內存的所有的結點上緩存變量;?累加器:只能用于做加法的變量,例如計數或求和。
2. 編寫程序
初始化 Spark
Spark 程序需要做的第一件事情,就是創建一個 SparkContext 對象,它將告訴 Spark 如何訪問一個集群。這個通常是通過下面的構造器來實現的:
new SparkContext(master, appName, [sparkHome], [jars])參數說明:
- master:用于指定所連接的 Spark 或者 Mesos 集群的 URL。
- appName?:應用的名稱,將會在集群的 Web 監控 UI 中顯示。
- sparkHome:可選,你的集群機器上 Spark 的安裝路徑(所有機器上路徑必須一致)。
- jars:可選,在本地機器上的 JAR 文件列表,其中包括你應用的代碼以及任何的依賴,Spark 將會把他們部署到所有的集群結點上。
在 python 中初始化,示例代碼如下:
//conf = SparkContext("local", "Hello Spark") conf = SparkConf().setAppName("Hello Spark").setMaster("local") sc = SparkContext(conf=conf)說明:如果部署到集群,在分布式模式下運行,最后兩個參數是必須的。
第一個參數可以是以下任一種形式:
| local | 默認值,使用一個 Worker 線程本地化運行(完全不并行) |
| local[K] | 使用 K 個 Worker 線程本地化運行(理想情況下,K 應該根據運行機器的 CPU 核數設定) |
| spark://HOST:PORT | 連接到指定的 Spark 單機版集群 master 進程所在的主機和端口,端口默認是7077 |
| mesos://HOST:PORT | 連接到指定的 Mesos 集群。host 參數是Moses master的hostname。端口默認是5050 |
如果你在一個集群上運行 spark-shell,則 master 參數默認為?local,在啟動之前你可以通過修改配置文件指定?ADD_JAR?環境變量將 JAR 文件們加載在集群上,這個變量需要包括一個用逗號分隔的 JAR 文件列表。
運行代碼
運行代碼有幾種方式,一是通過 spark-shell 來運行 scala 代碼,一是編寫 java 代碼并打成包以 spark on yarn 方式運行,還有一種是通過 pyspark 來運行 python 代碼。
更多內容,參考?Spark安裝和使用。
3. 彈性分布式數據集
3.1 并行集合
并行集合是通過調用 SparkContext 的?parallelize?方法,在一個已經存在的 Scala 集合上創建一個 Seq 對象。
parallelize 方法還可以接受一個參數?slices,表示數據集切分的份數。Spark 將會在集群上為每一份數據起一個任務。典型地,你可以在集群的每個 CPU 上分布 2-4個 slices。一般來說,Spark 會嘗試根據集群的狀況,來自動設定 slices 的數目,當然,你也可以手動設置。
Scala 示例程序:
scala> val data = Array(1, 2, 3, 4, 5) data: Array[Int] = Array(1, 2, 3, 4, 5)scala> var distData = sc.parallelize(data) distData: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14scala> distData.reduce((a, b) => a + b) res4: Int = 15Java 示例程序:
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5); JavaRDD<Integer> distData = sc.parallelize(data); Integer sum=distData.reduce((a, b) -> a + b);Python 示例程序:
data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b)3.2 外部數據源
Spark可以從存儲在 HDFS,或者 Hadoop 支持的其它文件系統(包括本地文件,Amazon S3, Hypertable, HBase 等等)上的文件創建分布式數據集。Spark 支持?TextFile、?SequenceFiles?以及其他任何?Hadoop InputFormat?格式的輸入。
TextFile 的 RDD 可以通過下面方式創建,該方法接受一個文件的 URI 地址,該地址可以是本地路徑,或者?hdfs://、?s3n://?等 URL 地址。
// scala 語法 val distFile = sc.textFile("data.txt")// java 語法 JavaRDD<String> distFile = sc.textFile("data.txt");// python 語法 distFile = sc.textFile("data.txt")一些說明:
- 引用的路徑必須是絕對路徑,并且必須在每一個 worker 節點上保持一致。
- 輸入的地址可以是一個目錄,也可以是正則匹配表達式,也可以是壓縮的文件。
- textFile 方法也可以通過輸入一個可選的第二參數,來控制文件的分片數目。默認情況下,Spark 為每一塊文件創建一個分片(HDFS 默認的塊大小為64MB),但是你也可以通過傳入一個更大的值,來指定一個更高的片值,但不能指定一個比塊數更小的片值。
除了 TextFile,Spark 還支持其他格式的輸入:
- SparkContext.wholeTextFiles?方法可以讀取一個包含多個小文件的目錄,并以 filename,content 鍵值對的方式返回結果。
- 對于 SequenceFiles,可以使用 SparkContext 的?sequenceFile[K, V]` 方法創建。像 IntWritable 和 Text 一樣,它們必須是 Hadoop 的 Writable 接口的子類。另外,對于幾種通用 Writable 類型,Spark 允許你指定原生類型來替代。例如:sequencFile[Int, String] 將會自動讀取 IntWritable 和 Texts。
- 對于其他類型的 Hadoop 輸入格式,你可以使用?SparkContext.hadoopRDD?方法,它可以接收任意類型的 JobConf 和輸入格式類,鍵類型和值類型。按照像 Hadoop 作業一樣的方法設置輸入源就可以了。
- RDD.saveAsObjectFile?和?SparkContext.objectFile?提供了以 Java 序列化的簡單方式來保存 RDD。雖然這種方式沒有 Avro 高效,但也是一種簡單的方式來保存任意的 RDD。
3.3 RDD 操作
RDD支持兩種操作:
- 轉換:從現有的數據集創建一個新的數據集;
- 動作:在數據集上運行計算后,返回一個值給驅動程序。
例如,map 是一種轉換,它將數據集每一個元素都傳遞給函數,并返回一個新的分布數據集表示結果,而 reduce 是一種動作,通過一些函數將所有的元素疊加起來,并將最終結果返回給運行程序。
Spark 中的?所有轉換都是惰性的,也就是說,他們并不會直接計算結果。相反的,它們只是記住應用到基礎數據集上的這些轉換動作。只有當發生一個要求返回結果給運行程序的動作時,這些轉換才會真正運行。
默認情況下,每一個轉換過的 RDD 都會在你運行一個動作時被重新計算。不過,你也可以使用?persist?或者?cache?方法,持久化一個 RDD 在內存中。在這種情況下,Spark 將會在集群中,保存相關元素,下次你查詢這個 RDD 時,它將能更快速訪問。除了持久化到內存,Spark 也支持在磁盤上持久化數據集,或在節點之間復制數據集。
Scala 示例:
val lines = sc.textFile("data.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)Java 示例:
JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(s -> s.length()); int totalLength = lineLengths.reduce((a, b) -> a + b);Python 示例:
lines = sc.textFile("data.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b)代碼說明:
- 第一行定義了一個基礎 RDD,但并沒有開始載入內存,僅僅將 lines 指向了這個file
- 第二行也僅僅定義了 linelengths 是作為 map 的結果,但也沒有開始運行 map 這個過程
- 直到第三句話才開始運行,各個 worker 節點開始運行自己的 map、reduce 過程
你也可以調用?lineLengths.persist()?來持久化 RDD。
除了使用 lambda 表達式,也可以通過函數來運行轉換或者動作,使用函數需要注意局部變量的作用域問題。
例如下面的 Python 代碼中的 field 變量:
class MyClass(object):def __init__(self):self.field = "Hello"def doStuff(self, rdd):field = self.fieldreturn rdd.map(lambda s: field + x)如果使用 Java 語言,則需要用到匿名內部類:
class GetLength implements Function<String, Integer> {public Integer call(String s) { return s.length(); } } class Sum implements Function2<Integer, Integer, Integer> {public Integer call(Integer a, Integer b) { return a + b; } }JavaRDD<String> lines = sc.textFile("data.txt"); JavaRDD<Integer> lineLengths = lines.map(new GetLength()); int totalLength = lineLengths.reduce(new Sum());Spark 也支持鍵值對的操作,這在分組和聚合操作時候用得到。定義一個鍵值對對象時,需要自定義該對象的 equals() 和 hashCode() 方法。
在 Scala 中有一個?Tuple2?對象表示鍵值對,這是一個內置的對象,通過?(a,b)?就可以創建一個 Tuple2 對象。在你的程序中,通過導入?org.apache.spark.SparkContext._?就可以對 Tuple2 進行操作。對鍵值對的操作方法,可以查看?PairRDDFunctions
下面是一個用 scala 統計單詞出現次數的例子:
val lines = sc.textFile("data.txt") val pairs = lines.map(s => (s, 1)) val counts = pairs.reduceByKey((a, b) => a + b)接下來,你還可以執行?counts.sortByKey()、?counts.collect()?等操作。
如果用 Java 統計,則代碼如下:
JavaRDD<String> lines = sc.textFile("data.txt"); JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1)); JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);用 Python 統計,代碼如下:
lines = sc.textFile("data.txt") pairs = lines.map(lambda s: (s, 1)) counts = pairs.reduceByKey(lambda a, b: a + b)測試
現在來結合上面的例子實現一個完整的例子。下面,我們來?分析 Nginx 日志中狀態碼出現次數,并且將結果按照狀態碼從小到大排序。
先將測試數據上傳到 hdfs:
$ hadoop fs -put access.log然后,編寫一個 python 文件,保存為 SimpleApp.py:
from pyspark import SparkContextlogFile = "access.log"sc = SparkContext("local", "Simple App")logData = sc.textFile(logFile).cache()counts = logData.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) # This is just a demo on how to bring all the sorted data back to a single node. # In reality, we wouldn't want to collect all the data to the driver node. output = counts.collect() for (word, count) in output: print "%s: %i" % (word, count) counts.saveAsTextFile("spark_results")sc.stop()接下來,運行下面代碼:
$ spark-submit --master local[4] SimpleApp.py運行成功之后,你會在終端看到以下輸出:
200: 6827 206: 120 301: 7 304: 10 403: 38 404: 125 416: 1并且,在hdfs 上 /user/spark/spark_results/part-00000 內容如下:
(u'200', 6827) (u'206', 120) (u'301', 7) (u'304', 10) (u'403', 38) (u'404', 125) (u'416', 1)其實,這個例子和官方提供的例子很相像,具體請看?wordcount.py。
常見的轉換
| map(func) | 返回一個新分布式數據集,由每一個輸入元素經過func函數轉換后組成 |
| filter(func) | 返回一個新數據集,由經過func函數計算后返回值為 true 的輸入元素組成 |
| flatMap(func) | 類似于 map,但是每一個輸入元素可以被映射為0或多個輸出元素,因此 func 應該返回一個序列 |
| mapPartitions(func) | 類似于 map,但獨立地在 RDD 的每一個分塊上運行,因此在類型為 T 的 RDD 上運行時,func 的函數類型必須是?Iterator[T] ? Iterator[U] |
| mapPartitionsWithSplit(func) | 類似于 mapPartitions, 但 func 帶有一個整數參數表示分塊的索引值。因此在類型為 T的RDD上運行時,func 的函數類型必須是?(Int, Iterator[T]) ? Iterator[U] |
| sample(withReplacement,fraction, seed) | 根據 fraction 指定的比例,對數據進行采樣,可以選擇是否用隨機數進行替換,seed 用于指定隨機數生成器種子 |
| union(otherDataset) | 返回一個新的數據集,新數據集是由源數據集和參數數據集聯合而成 |
| distinct([numTasks])) | 返回一個包含源數據集中所有不重復元素的新數據集 |
| groupByKey([numTasks]) | 在一個鍵值對的數據集上調用,返回一個?(K,Seq[V])對的數據集 。注意:默認情況下,只有8個并行任務來做操作,但是你可以傳入一個可選的 numTasks 參數來改變它 |
| reduceByKey(func, [numTasks]) | 在一個鍵值對的數據集上調用時,返回一個鍵值對的數據集,使用指定的 reduce 函數,將相同 key 的值聚合到一起。類似 groupByKey,reduce 任務個數是可以通過第二個可選參數來配置的 |
| sortByKey([ascending], [numTasks]) | 在一個鍵值對的數據集上調用,K 必須實現?Ordered?接口,返回一個按照 Key 進行排序的鍵值對數據集。升序或降序由 ascending 布爾參數決定 |
| join(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W) 類型的數據集上調用時,返回一個相同key對應的所有元素對在一起的?(K, (V, W))?數據集 |
| cogroup(otherDataset, [numTasks]) | 在類型為(K,V)和(K,W) 的數據集上調用,返回一個?(K, Seq[V], Seq[W])?元組的數據集。這個操作也可以稱之為 groupwith |
| cartesian(otherDataset) | 笛卡爾積,在類型為 T 和 U 類型的數據集上調用時,返回一個 (T, U) 對數據集(兩兩的元素對) |
| pipe(command, [envVars]) | 對 RDD 進行管道操作 |
| coalesce(numPartitions) | 減少 RDD 的分區數到指定值。在過濾大量數據之后,可以執行此操作 |
| repartition(numPartitions) | 重新給 RDD 分區 |
| repartitionAndSortWithinPartitions(partitioner) | 重新給 RDD 分區,并且每個分區內以記錄的 key 排序 |
常用的動作
常用的動作列表
| reduce(func) | 通過函數 func 聚集數據集中的所有元素。這個功能必須可交換且可關聯的,從而可以正確的被并行執行。 |
| collect() | 在驅動程序中,以數組的形式,返回數據集的所有元素。這通常會在使用 filter 或者其它操作并返回一個足夠小的數據子集后再使用會比較有用。 |
| count() | 返回數據集的元素的個數。 |
| first() | 返回數據集的第一個元素,類似于?take(1) |
| take(n) | 返回一個由數據集的前 n 個元素組成的數組。注意,這個操作目前并非并行執行,而是由驅動程序計算所有的元素 |
| takeSample(withReplacement,num, seed) | 返回一個數組,在數據集中隨機采樣 num 個元素組成,可以選擇是否用隨機數替換不足的部分,seed 用于指定的隨機數生成器種子 |
| takeOrdered(n, [ordering]) | 返回自然順序或者自定義順序的前 n 個元素 |
| saveAsTextFile(path) | 將數據集的元素,以 textfile 的形式,保存到本地文件系統,HDFS或者任何其它 hadoop 支持的文件系統。對于每個元素,Spark 將會調用?toString?方法,將它轉換為文件中的文本行 |
| saveAsSequenceFile(path)?(Java and Scala) | 將數據集的元素,以 Hadoop sequencefile 的格式保存到指定的目錄下 |
| saveAsObjectFile(path)?(Java and Scala) | 將數據集的元素,以 Java 序列化的方式保存到指定的目錄下 |
| countByKey() | 對(K,V)類型的 RDD 有效,返回一個 (K,Int) 對的 Map,表示每一個key對應的元素個數 |
| foreach(func) | 在數據集的每一個元素上,運行函數 func 進行更新。這通常用于邊緣效果,例如更新一個累加器,或者和外部存儲系統進行交互,例如HBase |
3.4 RDD持久化
Spark 最重要的一個功能,就是在不同操作間,持久化(或緩存)一個數據集在內存中,這將使得后續的動作變得更加迅速。緩存是用 Spark 構建迭代算法的關鍵。 使用以下兩種方法可以標記要緩存的 RDD:
lineLengths.persist() lineLengths.cache()取消緩存則用:
lineLengths.unpersist()每一個RDD都可以用不同的保存級別進行保存,通過將一個?org.apache.spark.storage.StorageLevel?對象傳遞給?persist(self, storageLevel)?可以控制 RDD 持久化到磁盤、內存或者是跨節點復制等等。?cache()?方法是使用默認存儲級別的快捷方法,也就是?StorageLevel.MEMORY_ONLY。 完整的可選存儲級別如下:
| MEMORY_ONLY | 默認的級別, 將 RDD 作為反序列化的的對象存儲在 JVM 中。如果不能被內存裝下,一些分區將不會被緩存,并且在需要的時候被重新計算 |
| MEMORY_AND_DISK | 將 RDD 作為反序列化的的對象存儲在 JVM 中。如果不能被與內存裝下,超出的分區將被保存在硬盤上,并且在需要時被讀取 |
| MEMORY_ONLY_SER | 將 RDD 作為序列化的的對象進行存儲(每一分區占用一個字節數組)。通常來說,這比將對象反序列化的空間利用率更高,尤其當使用fast serializer,但在讀取時會比較占用CPU |
| MEMORY_AND_DISK_SER | 與?MEMORY_ONLY_SER?相似,但是把超出內存的分區將存儲在硬盤上而不是在每次需要的時候重新計算 |
| DISK_ONLY | 只將 RDD 分區存儲在硬盤上 |
| MEMORY_ONLY_2、?MEMORY_AND_DISK_2等 | 與上述的存儲級別一樣,但是將每一個分區都復制到兩個集群結點上 |
| OFF_HEAP | 開發中 |
Spark 的不同存儲級別,旨在滿足內存使用和 CPU 效率權衡上的不同需求。我們建議通過以下的步驟來進行選擇:
- 如果你的 RDD 可以很好的與默認的存儲級別契合,就不需要做任何修改了。這已經是 CPU 使用效率最高的選項,它使得 RDD的操作盡可能的快。
- 如果不行,試著使用?MEMORY_ONLY_SER?并且選擇一個快速序列化的庫使得對象在有比較高的空間使用率的情況下,依然可以較快被訪問。
- 盡可能不要存儲到硬盤上,除非計算數據集的函數,計算量特別大,或者它們過濾了大量的數據。否則,重新計算一個分區的速度,和與從硬盤中讀取基本差不多快。
- 如果你想有快速故障恢復能力,使用復制存儲級別。例如:用 Spark 來響應web應用的請求。所有的存儲級別都有通過重新計算丟失數據恢復錯誤的容錯機制,但是復制存儲級別可以讓你在 RDD 上持續的運行任務,而不需要等待丟失的分區被重新計算。
- 如果你想要定義你自己的存儲級別,比如復制因子為3而不是2,可以使用?StorageLevel?單例對象的?apply()方法。
4. 共享變量
5. 參考文章
- http://spark.apache.org/docs/latest/programming-guide.html
- http://rdc.taobao.org/?p=2024
- http://blog.csdn.net/u011391905/article/details/37929731
- http://segmentfault.com/blog/whuwb/1190000000723037
總結
以上是生活随笔為你收集整理的Spark编程指南笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 深度学习:推动NLP领域发展的新引擎
- 下一篇: FFmpeg的添加logo,去logo