spark 常用函数介绍(python)
在開始之前,我先介紹一下,RDD是什么?
????? RDD是Spark中的抽象數(shù)據(jù)結(jié)構(gòu)類型,任何數(shù)據(jù)在Spark中都被表示為RDD。從編程的角度來看,RDD可以簡(jiǎn)單看成是一個(gè)數(shù)組。和普通數(shù)組的區(qū)別是,RDD中的數(shù)據(jù)是分區(qū)存儲(chǔ)的,這樣不同分區(qū)的數(shù)據(jù)就可以分布在不同的機(jī)器上,同時(shí)可以被并行處理。因此,Spark應(yīng)用程序所做的無非是把需要處理的數(shù)據(jù)轉(zhuǎn)換為RDD,然后對(duì)RDD進(jìn)行一系列的變換和操作從而得到結(jié)果。
創(chuàng)建RDD:
| 1 | >>> sc.parallelize([1,2,3,4,5],?3)??#意思是將數(shù)組中的元素轉(zhuǎn)換為RDD,并且存儲(chǔ)在3個(gè)分區(qū)上[1]、[2,3]、[4,5]。如果是4個(gè)分區(qū):[1]、[2]、[3]、[4,5] |
? 上面這種是數(shù)組創(chuàng)建,也可以從文件系統(tǒng)或者HDFS中的文件創(chuàng)建出來,后面會(huì)講到。
只要搞懂了spark的函數(shù)們,你就成功了一大半。
spark的函數(shù)主要分兩類,Transformations和Actions。Transformations為一些數(shù)據(jù)轉(zhuǎn)換類函數(shù),actions為一些行動(dòng)類函數(shù):
轉(zhuǎn)換:轉(zhuǎn)換的返回值是一個(gè)新的RDD集合,而不是單個(gè)值。調(diào)用一個(gè)變換方法,不會(huì)有任何求值計(jì)算,它只獲取一個(gè)RDD作為參數(shù),然后返回一個(gè)新的RDD。
行動(dòng):行動(dòng)操作計(jì)算并返回一個(gè)新的值。當(dāng)在一個(gè)RDD對(duì)象上調(diào)用行動(dòng)函數(shù)時(shí),會(huì)在這一時(shí)刻計(jì)算全部的數(shù)據(jù)處理查詢并返回結(jié)果值。
下面介紹spark常用的Transformations, Actions函數(shù):
Transformations
map(func [,?preservesPartitioning=False])? --- 返回一個(gè)新的分布式數(shù)據(jù)集,這個(gè)數(shù)據(jù)集中的每個(gè)元素都是經(jīng)過func函數(shù)處理過的。
| 123 | >>> data?=?[1,2,3,4,5]>>> distData?=?sc.parallelize(data).map(lambda?x: x+1).collect()#結(jié)果:[2,3,4,5,6] |
filter(func)? --- 返回一個(gè)新的數(shù)據(jù)集,這個(gè)數(shù)據(jù)集中的元素是通過func函數(shù)篩選后返回為true的元素(簡(jiǎn)單的說就是,對(duì)數(shù)據(jù)集中的每個(gè)元素進(jìn)行篩選,如果符合條件則返回true,不符合返回false,最后將返回為true的元素組成新的數(shù)據(jù)集返回)。
| 12 | >>> rdd?=?sc.parallelize(data).filter(lambda?x:x%2==0).collect()#結(jié)果:[2, 4] |
flatMap(func [,?preservesPartitioning=False])? --- 類似于map(func), 但是不同的是map對(duì)每個(gè)元素處理完后返回與原數(shù)據(jù)集相同元素?cái)?shù)量的數(shù)據(jù)集,而flatMap返回的元素?cái)?shù)不一定和原數(shù)據(jù)集相同。each input item can be mapped to 0 or more output items (so?funcshould return a Seq rather than a single item)
#### for flatMap() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結(jié)果:[1, 1, 1, 2, 2, 3] func=lambda x:range(1,x) print"上面的這個(gè)輸出結(jié)果可能不太好理解,可以根據(jù)下面的代碼的輸出結(jié)果來理解,其實(shí)就是所有的結(jié)果被打散后拼接起來了" print(func(1)) print(func(2)) print(func(3)) print(func(4)) print"-"*50 >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結(jié)果:[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]#### for map() >>> rdd = sc.parallelize([2,3,4]) >>> sorted(rdd.flatMap(lambda x: range(1,x)).collect()) #結(jié)果:[[1], [1, 2], [1, 2, 3]] >>> sorted(rdd.flatMap(lambda x:[(x,x), (x,x)]).collect()) #結(jié)果:[[(2, 2), (2, 2)], [(3, 3), (3, 3)], [(4, 4), (4, 4)]]mapPartitions(func [,?preservesPartitioning=False])? ---mapPartitions是map的一個(gè)變種。map的輸入函數(shù)是應(yīng)用于RDD中每個(gè)元素,而mapPartitions的輸入函數(shù)是應(yīng)用于每個(gè)分區(qū),也就是把每個(gè)分區(qū)中的內(nèi)容作為整體來處理的。
?
| 1234 | >>> rdd?=?sc.parallelize([1,2,3,4,5],?3)>>>?def?f(iterator):?yield?sum(iterator)>>> rdd.mapPartitions(f).collect()#結(jié)果:[1,5,9] |
mapPartitionsWithIndex(func [,?preservesPartitioning=False])? ---Similar to?mapPartitions, but takes two parameters. The first parameter is the index of the partition and the second is an iterator through all the items within this partition. The output is an iterator containing the list of items after applying whatever transformation the function encodes.
| 1234 | >>> rdd?=?sc.parallelize([1,2,3,4,5],?3)>>>?def?f(splitIndex, iterator):?yield?splitIndex>>> rdd.mapPartitionsWithIndex(f).collect()#結(jié)果:[0,1,2]?? #三個(gè)分區(qū)的索引 |
reduceByKey(func [,?numPartitions=None,?partitionFunc=<function portable_hash at 0x7fa664f3cb90>])? --- reduceByKey就是對(duì)元素為kv對(duì)的RDD中Key相同的元素的value進(jìn)行reduce,因此,key相同的多個(gè)元素的值被reduce為一個(gè)值,然后與原RDD中的key組成一個(gè)新的kv對(duì)。
| 12345 | >>>?from?operator?import?add>>> rdd?=?sc.parallelize([("a",?1), ("b",?1), ("a",?1)])>>>?sorted(rdd.reduceByKey(add).collect())>>>?#或者 sorted(rdd.reduceByKey(lambda a,b:a+b).collect())#結(jié)果:[('a', 2), ('b', 1)] |
aggregateByKey(zeroValue)(seqOp, combOp [,?numPartitions=None])? ---
sortByKey([ascending=True,?numPartitions=None,?keyfunc=<function <lambda> at 0x7fa665048c80>])? --- 返回排序后的數(shù)據(jù)集。該函數(shù)就是隊(duì)kv對(duì)的RDD數(shù)據(jù)進(jìn)行排序,keyfunc是對(duì)key進(jìn)行處理的函數(shù),如非需要,不用管。
| 123456 | >>> tmp?=?[('a',?1), ('b',?2), ('1',?3), ('D',?4)]>>> sc.parallelize(tmp).sortByKey(True,?1).collect()#結(jié)果: [('1', 3), ('D', 4), ('a', 1), ('b', 2)]>>> sc.parallelize(tmp).sortByKey(True,?2, keyfunc=lambda?k:k.lower()).collect()#上面的這個(gè)k.lower()的意思是會(huì)返回大寫字母的小寫形式,所以最終變成了a,b,d一起比較大小 #其中的2的意思是分成幾個(gè)區(qū)的意思 #結(jié)果:[('1', 3), ('a', 1), ('b', 2), ('D', 4)]#注意,比較兩個(gè)結(jié)果可看出,keyfunc對(duì)鍵的處理只是在數(shù)據(jù)處理的過程中起作用,不能真正的去改變鍵名 |
oin(otherDataset [,?numPartitions=None])? --- join就是對(duì)元素為kv對(duì)的RDD中key相同的value收集到一起組成(v1,v2),然后與原RDD中的key組合成一個(gè)新的kv對(duì),返回。
| 1234 | >>> x?=?sc.parallelize([("a",?1), ("b",?4)])>>> y?=?sc.parallelize([("a",?2), ("a",?5)])>>>?sorted(x.join(y).collect())#結(jié)果:[('a', (1, 2)), ('a', (1, 5))] |
cartesian(otherDataset)? --- 返回一個(gè)笛卡爾積的數(shù)據(jù)集,這個(gè)數(shù)據(jù)集是通過計(jì)算兩個(gè)RDDs得到的。
| 1234 | >>> x?=?sc.parallelize([1,2,3])>>> y?=?sc.parallelize([4,5])>>> x.cartesian(y).collect()#結(jié)果:[(1, 4), (1, 5), (2, 4), (2, 5), (3, 4), (3, 5)] |
Action?(這里只講支持python的,java和scala的后面用到了在做詳解,當(dāng)然支持python就一定支持java和scala)
reduce(func)? --- reduce將RDD中元素兩兩傳遞給輸入函數(shù),同時(shí)產(chǎn)生一個(gè)新的值,新產(chǎn)生的值與RDD中下一個(gè)元素再被傳遞給輸入函數(shù)直到最后只有一個(gè)值為止。
| 123 | >>>?from?operator?import?add>>> sc.parallelize([1,2,3,4,5]).reduce(add)# 結(jié)果:15 |
collect()? --- 返回RDD中的數(shù)據(jù),以list形式。
| 12 | >>> sc.parallelize([1,2,3,4,5]).collect()#結(jié)果:[1,2,3,4,5] |
count()? --- 返回RDD中的元素個(gè)數(shù)。
| 12 | >>> sc.parallelize([1,2,3,4,5]).count#結(jié)果:5 |
first()? --- 返回RDD中的第一個(gè)元素。
| 12 | >>> sc.parallelize([1,2,3,4,5]).first()#結(jié)果:1 |
take(n)? --- 返回RDD中前n個(gè)元素。
| 12 | >>> sc.parallelize([1,2,3,4,5]).take(2)#結(jié)果:[1,2] |
takeOrdered(n [,?key=None])? --- 返回RDD中前n個(gè)元素,但是是升序(默認(rèn))排列后的前n個(gè)元素,或者是通過key函數(shù)指定后的RDD(這個(gè)key我也沒理解透,后面在做詳解)
| 1234 | >>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3)#結(jié)果:[2,3,4]>>> sc.parallelize([9,7,3,2,6,4]).takeOrdered(3, key=lambda?x:-x)#結(jié)果:[9,7,6] |
saveAsTextFile(path [,?compressionCodecClass=None])? --- 該函數(shù)將RDD保存到文件系統(tǒng)里面,并且將其轉(zhuǎn)換為文本行的文件中的每個(gè)元素調(diào)用 tostring 方法。
parameters:? path - 保存于文件系統(tǒng)的路徑
compressionCodecClass - (None by default) string i.e. “org.apache.hadoop.io.compress.GzipCodec”
import tempfile from tempfile import TemporaryFile,NamedTemporaryFiletempFile = NamedTemporaryFile(delete=True) tempFile.close() print"tempFile.name=",tempFile.namesc.parallelize(range(10)).saveAsTextFile(tempFile.name) from fileinput import input from glob import glob#這是用來查找文件路徑名的 print ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) #以上代碼的最終結(jié)果就是把0-9保存在文件:/tmp/tmpwocaZg/part-00000中?Empty lines are tolerated when saving to text files:
| 12345 | >>> tempFile2?=?NamedTemporaryFile(delete=True)>>> tempFile2.close()>>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)>>> ''.join(sorted(input(glob(tempFile2.name?+?"/part-0000*"))))'\n\n\nbar\nfoo\n' |
?Using compressionCodecClass:
| 12345678 | >>> tempFile3?=?NamedTemporaryFile(delete=True)>>> tempFile3.close()>>> codec?=?"org.apache.hadoop.io.compress.GzipCodec">>> sc.parallelize(['foo',?'bar']).saveAsTextFile(tempFile3.name, codec)>>>?from?fileinput?import?input, hook_compressed>>> result?=?sorted(input(glob(tempFile3.name?+?"/part*.gz"), openhook=hook_compressed))>>> b''.join(result).decode('utf-8')u'bar\nfoo\n' |
countByKey()? --- 返回一個(gè)字典(key,count),該函數(shù)操作數(shù)據(jù)集為kv形式的數(shù)據(jù),用于統(tǒng)計(jì)RDD中擁有相同key的元素個(gè)數(shù)。
| 12345 | >>> defdict?=?sc.parallelize([("a",1), ("b",1), ("a",?1)]).countByKey()>>> defdict#結(jié)果:defaultdict(<type 'int'>, {'a': 2, 'b': 1})>>> defdict.items()#結(jié)果:[('a', 2), ('b', 1)] |
countByValue()? --- 返回一個(gè)字典(value,count),該函數(shù)操作一個(gè)list數(shù)據(jù)集,用于統(tǒng)計(jì)RDD中擁有相同value的元素個(gè)數(shù)。
| 12 | >>> sc.parallelize([1,2,3,1,2,5,3,2,3,2]).countByValue().items()#結(jié)果:[(1, 2), (2, 4), (3, 3), (5, 1)] |
foreach(func)? --- 運(yùn)行函數(shù)func來處理RDD中的每個(gè)元素,這個(gè)函數(shù)常被用來updating an Accumulator或者與外部存儲(chǔ)系統(tǒng)的交互。
| 123 | >>>?def?f(x):?print(x)>>> sc.parallelize([1,?2,?3,?4,?5]).foreach(f)#note: 打印是隨機(jī)的,并不是一定按1,2,3,4,5的順序打印 |
總結(jié)
以上是生活随笔為你收集整理的spark 常用函数介绍(python)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sublime+virtualenv+p
- 下一篇: python2.x和python3.x-