日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Spark RDD编程模型及算子介绍(二)

發布時間:2024/3/13 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark RDD编程模型及算子介绍(二) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

文章目錄

    • 常見的Action算子
    • 常見分區操作算子

常見的Action算子

  • countByKey算子:統計Key出現的次數,部分代碼如下:
rdd_file = sc.textFile("../Data/input/words.txt") rdd_map = rdd_file.flatMap(lambda line: line.split(" ")).map(lambda x:(x, 1)) rdd_count = rdd_map.countByKey() print(rdd_count) print(type(rdd_count)) # 返回結果為字典 # defaultdict(<class 'int'>, {'Apple': 4, 'Banana': 5, 'Orange': 4, 'Peach': 2}) # <class 'collections.defaultdict'>
  • collect算子:將RDD各個分區內的數據,統一收集到Driver中,形成一個List對象。RDD是分布式對象,數據量可以很大,所以用這個算子之前需要知道如果數據集結果很大,就會把driver內存撐爆,出現oom。

  • reduce算子:對RDD數據集按照傳入的邏輯進行聚合操作,部分代碼如下:

rdd = sc.parallelize(range(1,10)) rdd_reduce = rdd.reduce(lambda a,b : a+b) print(rdd_reduce) # 45
  • fold算子:和reduce一樣接收傳入邏輯進行聚合,聚合是帶有初始值的。這個初始值既要作用在分區內,也要作用在分區間,部分代碼如下:
rdd = sc.parallelize(range(1,10),3) rdd_reduce = rdd.fold(10,lambda a,b : a+b) print(rdd_reduce) # 1 分為[1,2,3] [4,5,6] [7,8,9] # 2 每個分區+10 # 3 最后匯總再+10 得到結果85
  • first算子:取出RDD第一個元素
sc.parallelize([1,2,3,4]).first() # 1
  • take算子:取出RDD的前N個元素
sc.parallelize([1,2,3,4],3).take(2) # [1,2]
  • top算子:對RDD元素進行降序排序,取前N個
sc.parallelize([1,2,3,4],3).top(2) # [4, 3]
  • count算子:計算RDD有多少條數據,返回值為一個數字
sc.parallelize([1,2,3,4],3).count() # 4
  • takeSample算子:隨機抽樣RDD的數據,部分代碼如下:
rdd = sc.parallelize([1,2,3,4,5,6,7,6,5,4,3,2,1],1) rdd_takeSample1 = rdd.takeSample(True, 18) print(rdd_takeSample1) rdd_takeSample2 = rdd.takeSample(False, 18) print(rdd_takeSample2)# [1, 1, 1, 4, 6, 4, 1, 1, 5, 4, 6, 7, 5, 1, 6, 6, 6, 2] # [2, 4, 2, 5, 5, 6, 3, 7, 4, 1, 6, 3, 1] # 參數一:bool型,True表示運行取同一個數據,False表示不允許取同一個數據,與數據內容無關,是否重復表示的是同一個位置的數據。 # 參數二:抽樣的數目(設置為false則無法超越RDD總數) # 參數三:隨機種子(一般不需要傳參)
  • takeOrdered算子:對RDD排序取前N個,部分代碼如下:
rdd = sc.parallelize([1,2,3,4,5,6,7]) #升序 rdd_takeOrdered1 = rdd.takeOrdered(4) #降序 rdd_takeOrdered2 = rdd.takeOrdered(4,lambda x : -x)print(rdd_takeOrdered1) print(rdd_takeOrdered2) # [1, 2, 3, 4] # [7, 6, 5, 4]
  • foreach算子:對RDD的每個元素,執行邏輯操作與map類似,但是這個方法沒有返回值。如果想顯示值,只能在里面自行打印(無需經過Driver,直接在Executor打印效率更高)。
rdd = sc.parallelize([1,2,3,4,5,6,7],1) rdd1 = rdd.foreach(lambda x : 2*x +1) rdd2 = rdd.foreach(lambda x : print(2*x +1)) print(rdd1) 3 5 7 9 11 13 15 None
  • saveAsTextFile算子:保存文件API,分布式執行,不經過Driver,每個分區所在的Executor直接控制數據寫出到目標文件系統中,每個分區產生1個結果文件。
#設置為三個分區 rdd_file = sc.textFile("hdfs://node1:8020/Test/WordCount.txt",3) rdd_words = rdd_file.flatMap(lambda line: line.split(" ")) rdd_map = rdd_words.map(lambda x:(x, 1)) rdd_total = rdd_map.reduceByKey(lambda a,b: a + b) rdd_rs = rdd_total.saveAsTextFile("hdfs://node1:8020/Test/word_rs1")

結果如下圖所示在HDFS WebUI上查看

常見分區操作算子

  • mapPartitions算子:與map相似,只是一次被傳遞的是一整個分區的數據,雖然在執行次數上與map相同,但是可以因為減少了網絡io的傳輸次數,效率會大大的提高。部分代碼如下:
rdd = sc.parallelize([1,2,3,4,5,6],3) def func(iter):rs = list()for it in iter:rs.append(2 * it + 1)return rs rdd_part = rdd.mapPartitions(func) rdd_rs = rdd_part.collect() print(rdd_rs)# [3, 5, 7, 9, 11, 13]
  • foreachPartition算子:與普通foreach一樣,只是一次被傳遞的是一整個分區的數據,部分代碼如下:
rdd = sc.parallelize([1,2,3,4,5,6],3) # 因為沒有返回值所以不需要return def func(iter):rs = list()for it in iter:rs.append(2 * it + 1)print(rs)rdd_part = rdd.foreachPartition(func)# [3, 5] # [7, 9] # [11, 13]
  • partitionBy算子:對RDD進行自定義分區操作,部分代碼如下
# 參數1 重新分區后有幾個分區 # 參數2 自定義分區規則,函數傳入(返回編號為int類型,分區編號從0開始,不要超過分區數) rdd = sc.parallelize([('a',1),('b',2),('c',3),('d',4),('e',5),('f',6)])def func(key):if key == 'a' or key == 'b' : return 0if key == 'c' or key == 'd' : return 1return 2rdd_part = rdd.partitionBy(3,func) rdd_rs = rdd_part.glom().collect() print(rdd_rs)# [[('a', 1), ('b', 2)], [('c', 3), ('d', 4)], [('e', 5), ('f', 6)]]
  • repartition算子:對RDD的分區執行重新分區。不建議使用此算子,除非做全局排序的時候,將其設置為1。如果修改盡量減少,不要增加,增加會導致shuffle。不管是增加還是減少都會影響并行計算(內存迭代并行的管道數量),部分代碼如下:
rdd = sc.parallelize([1,2,3,4,5,6],3) rdd_re1 = rdd.getNumPartitions() print(rdd_re1) rdd_re2 = rdd.repartition(1).getNumPartitions() print(rdd_re2) rdd_re3 = rdd.repartition(5).getNumPartitions() print(rdd_re3) # 3 # 1 # 5
  • coalesce算子:對分區數量進行增減,部分代碼如下:
# 參數1:分區數 # 參數2:Bool True表示允許shuffle,False表示不允許(默認)。 rdd_re4 = rdd.coalesce(1).getNumPartitions() print(rdd_re4) rdd_re5 = rdd.coalesce(5).getNumPartitions() print(rdd_re5) rdd_re6 = rdd.coalesce(5,shuffle=True).getNumPartitions() print(rdd_re6) # 1 # 3 沒有加shuffle=True這里有個API安全機制,分區不會增加 # 5
  • 在源碼中我們可以發現reparation算子底層調用的就是coalesce算子,只不過shuffle定義為true。源碼如下:
def repartition(self, numPartitions):return self.coalesce(numPartitions, shuffle=True)

總結

以上是生活随笔為你收集整理的Spark RDD编程模型及算子介绍(二)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。