SparkProgrammingRDDs
Introduction to Core Spark Concepts
- driver program:
- executors:
- the place to run the operations
- Spark automatically takes ur function and?ships it to executor nodes.
Programming with RDDs
- RDD: spark's core abstraction for working with data.
- RDD簡(jiǎn)單來說就是元素的分布式集合
- 在Spark中所有的工作都可以表示成創(chuàng)建一個(gè)新的RDDs,轉(zhuǎn)換已有的RDDs,或者是在RDDs上運(yùn)行operations
RDD Basics
- An?immutable?distributed collection of objects.
- 每個(gè)RDD被split成多個(gè)partitions,每個(gè)partition可能在cluster的不同節(jié)點(diǎn)上被計(jì)算
- RDD的創(chuàng)建:
- loading一個(gè)外部數(shù)據(jù)集
- distributing對(duì)象集合(eg: a list or set) RDD操作:(區(qū)分這兩種操作的原因是Spark的計(jì)算是lazy fashion的
Creating RDDs
- parallelize()
- textFile()
RDD Operations
- Transformation & Action
Transformations
- Compute lazily
- 沒有改變?cè)璕DD(immutable),而是生成了新的RDD,spark會(huì)保存這一系列依賴關(guān)系(lineage)
Actions
- Actually do something with our dataset
Passing Functions to Spark
- Scala: we can pass in functions defined inline, references to methods, or static functions
- Scala: 我們所傳送的函數(shù)和其中的數(shù)據(jù)引用需要被序列化(實(shí)現(xiàn)Java的Serializable接口)
- 如果我們pass一個(gè)對(duì)象中的函數(shù),或者包含了對(duì)象中的字段的引用(eg: self.field),spark會(huì)把整個(gè)對(duì)象發(fā)送給worker nodes,這會(huì)遠(yuǎn)大于你所需要的信息。并且如果你的對(duì)象不能持久化(pickle in python)的話,會(huì)導(dǎo)致是你的程序失敗。舉一個(gè)python的例子:
錯(cuò)誤示范如下:
View Code正確示范:(提取對(duì)象中你所需的字段為局部變量,然后傳進(jìn)去)
View Code- 同樣的,對(duì)scala我們也要盡量避免上述情況,而且要注意的是在scala中不需要顯示的self.或者this.,所以這種情況顯得很不明顯,但仍然要注意。舉個(gè)栗子
如果在scala中出現(xiàn)了NotSerializableException,那么多半是因?yàn)橐昧艘粋€(gè)不可序列化的類中的變量或字段。所以,傳送一個(gè)局部的可序列化的變量或函數(shù)才是安全的。
- Any code that is shared by RDD transformations must always be serializable.
Common Transformations and Actions
Basic RDDs
- 我們首先介紹基本的RDD操作,它們可以執(zhí)行在所有RDDs上而不用管數(shù)據(jù)
Element-wise transformations
- map() and filter()
- flatMap(): 為每一個(gè)輸入元素產(chǎn)生多個(gè)輸出元素。返回的是一個(gè)迭代器iterator
Psedudo set operations
- 一些簡(jiǎn)單的集合操作:(需要RDDs是同一類型的)
- RDD1.distinct() ?--> 十分昂貴的操作,需要shuffle all data over the network
- RDD1.union(RDD2) ?--> 最簡(jiǎn)單的集合操作,會(huì)保留原RDD中的重復(fù)值
- RDD1.intersection(RDD2) --> 需要去重(來識(shí)別共同元素),因而也需要shuffle
- RDD1.substract(RDD2) ?--> perform shuffle
- RDD1.cartesian(RDD2) ?-->?returns all possible pairs of (a, b) where a is in the source RDD and b is in the other RDD .十分昂貴
- 為什么叫psedudo即假的集合操作呢,因?yàn)檫@里的集合丟失了一個(gè)重要特性:uniqueness即元素的唯一性。因?yàn)槲覀兘?jīng)常有duplicates
Actions
- reduce() &?fold() :都需要返回值和RDD中的元素保持同一類型。
fold()接收與reduce接收的函數(shù)簽名相同的函數(shù),另外再加上一個(gè)初始值作為第一次調(diào)用的結(jié)果.
val sum = rdd.reduce((x, y) => x + y)- aggregate(): frees us from the constraint of having the return be the same types as the RDD we are working on.
aggregate的函數(shù)原型:
def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U可以看到,(zeroValue: U)是給定的一個(gè)初值,后半部分有兩個(gè)函數(shù),seqOp相當(dāng)于是在各個(gè)分區(qū)里進(jìn)行的聚合操作,它支持(U, T) => U,也就是支持不同類型的聚合。comOp是將sepOp后的結(jié)果聚合,此時(shí)的結(jié)果全部是U類,也就是只能進(jìn)行同構(gòu)聚合。
一個(gè)經(jīng)典的例子是求平均值。即先用seqOp求出各個(gè)分區(qū)中的sum和個(gè)數(shù),再將seqOp后的結(jié)果聚合得到總的sum和總的個(gè)數(shù)。
View Code- collect(): 返回整個(gè)RDD中的內(nèi)容,常用于單元測(cè)試,因?yàn)樗枰愕恼麄€(gè)數(shù)據(jù)集能夠fit on a single machine.
- take(n): 返回RDD中的n個(gè)元素,并且試圖最小化所訪問的partition數(shù),所以它可能會(huì)返回一個(gè)biased?collection。
- takeSample(withReplacement, num, seed): allows us to take a sample of our data either with or without replacement.
- foreach(): 可以允許我們?cè)诿總€(gè)元素上執(zhí)行操作or計(jì)算,而不需要把元素送回driver
?Converting Between RDD Types
- 一些functions只在某些特定類型RDD上可用。比如mean(), variance()只用于numericRDDs, join()只用于key/value pair RDDs.
- 在scala和Java中,這些方法未在標(biāo)準(zhǔn)RDD類中定義,因此為了訪問這些附加的功能,我們需要確保我們得到了正確的specialized class。
Scala
- 在scala中。RDDs的轉(zhuǎn)換可以通過使用隱式轉(zhuǎn)換(using implicit conversions)來自動(dòng)進(jìn)行。
- 看一段RDD.scala源碼中的介紹
- 關(guān)于scala隱式轉(zhuǎn)換:?當(dāng)對(duì)象調(diào)用類中不存在的方法或成員時(shí),編譯器會(huì)自動(dòng)將對(duì)象進(jìn)行隱式轉(zhuǎn)換
- 隱式轉(zhuǎn)換帶來的confusion:當(dāng)你在RDD上調(diào)用mean()這樣的方法時(shí),你會(huì)發(fā)現(xiàn)在RDDclass 的Scaladocs中找不到mean()方法,但是該方法能成功調(diào)用是由于實(shí)現(xiàn)了RDD[Double]到DoubleRDDFunctions的隱式轉(zhuǎn)換。
Persistence(Caching)
- As discussed earlier, Spark RDDs是惰性求值的,如果我們想要多次使用同一個(gè)RDD的話,Spark通常會(huì)每次都重新計(jì)算該RDD和它所有的依賴。這對(duì)于迭代算法是十分昂貴的。
- 一個(gè)比較直觀的例子如下,每次action的時(shí)候都會(huì)重新計(jì)算:
- 為了避免多次重復(fù)計(jì)算同一個(gè)RDD,我們可以讓Spark來persist數(shù)據(jù)。這樣的話,計(jì)算該RDD的那個(gè)節(jié)點(diǎn)會(huì)保存它們的partition。
- 如果有數(shù)據(jù)持久化的節(jié)點(diǎn)fail掉了,Spark會(huì)在需要的時(shí)候重新計(jì)算丟失的partitons。當(dāng)然我們也可以通過在多個(gè)節(jié)點(diǎn)保存副本的方式來避免節(jié)點(diǎn)故障時(shí)的slowdown。
- Spark有很多l(xiāng)evels of persistence供選擇。
-
Level Space Used CPU time In Memory On Disk Comments MEMORY_ONLY?
High Low Y N ? MEMORY_ONLY_SER?
Low High Y N ? MEMORY_AND_DISK?
High Medium Some Some Spils to disk if there is too much data to fit in memory. MEMORY_AND_DISK_SER?
Low High Some Some Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.?
DISK_ONLY?
Low High N Y ? - 在Java和scala中,缺省的情況下persist()回將未序列化的對(duì)象數(shù)據(jù)保存在JVM的堆中。
- 如果你試圖在內(nèi)存中cache過多的數(shù)據(jù),Spark將會(huì)自動(dòng)驅(qū)逐舊的partitions,使用最少最近使用(Least Recently Used, LRU)緩存策略。對(duì)于MEMORY_ONLY level,下次訪問的時(shí)候會(huì)重新計(jì)算這些被驅(qū)逐的分片。
- 由于Spark'的各種機(jī)制,無論使用哪種level,你都可以不用擔(dān)心job breaking。但是緩存不必要的數(shù)據(jù)將會(huì)導(dǎo)致有用數(shù)據(jù)被驅(qū)逐,從而增加重計(jì)算的時(shí)間。
- Spark提供了unpersist()方法可以讓你手工地將RDD移除緩存。
-
Off-heap caching is experimental and uses Tachyon. If you are interested in off-heap caching with Spark, take a look at the Running Spark on Tachyon guide.?
?
轉(zhuǎn)載于:https://www.cnblogs.com/wttttt/p/6826719.html
總結(jié)
以上是生活随笔為你收集整理的SparkProgrammingRDDs的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 体验套餐管理系统
- 下一篇: 字符输出流写文本文件【Writer、Fi