日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

理解spark闭包以及broadcast(转载)

發(fā)布時間:2023/12/20 编程问答 31 豆豆
生活随笔 收集整理的這篇文章主要介紹了 理解spark闭包以及broadcast(转载) 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

什么叫閉包:

跨作用域訪問函數變量。

又指的一個擁有許多變量和綁定了這些變量的環(huán)境的表達式(通常是一個函數),因而這些變量也是該表達式的一部分。

Spark閉包的問題引出:?
在spark中實現統(tǒng)計List(1,2,3)的和。如果使用下面的代碼,程序打印的結果不是6,而是0。這個和我們編寫單機程序的認識有很大不同。為什么呢?

test.scala代碼如下:

import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = 0//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }

運行方法:

scala -classpath $(echo *.jar ~/bigdata/spark-2.3.1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') test.scala

?

問題分析:?
counter是在foreach函數外部定義的,也就是在driver程序中定義,而foreach函數是屬于rdd對象的,rdd函數的執(zhí)行位置是各個worker節(jié)點(或者說worker進程),main函數是在driver節(jié)點上(或者說driver進程上)執(zhí)行的,所以當counter變量在driver中定義,被在rdd中使用的時候,出現了變量的“跨域”問題,也就是閉包問題。

問題解釋:?
對于上面程序中的counter變量,由于在main函數和在rdd對象的foreach函數是屬于不同“閉包”的,所以,傳進foreach中的counter是一個副本,初始值都為0。foreach中疊加的是counter的副本,不管副本如何變化,都不會影響到main函數中的counter,所以最終打印出來的counter為0.

當用戶提交了一個用scala語言寫的Spark程序,Spark框架會調用哪些組件呢?首先,這個Spark程序就是一個“Application”,程序里面的mian函數就是“Driver Program”, 前面已經講到它的作用,只是,dirver程序的可能運行在客戶端,也有可有可能運行在spark集群中,這取決于spark作業(yè)提交時參數的選定,比如,yarn-client和yarn-cluster就是分別運行在客戶端和spark集群中。在driver程序中會有RDD對象的相關代碼操作,比如下面代碼的newRDD.map()

class Test{def main(args: Array[String]) {val sc = new SparkContext(new SparkConf())val newRDD = sc.textFile("")newRDD.map(data => {//do somethingprintln(data.toString)})} }

涉及到RDD的代碼,比如上面RDD的map操作,它們是在Worker節(jié)點上面運行的,所以spark會透明地幫用戶把這些涉及到RDD操作的代碼傳給相應的worker節(jié)點。

如果在RDD map函數中調用了在函數外部定義的對象,因為這些對象需要通過網絡從driver所在節(jié)點傳給其他的worker節(jié)點,所以要求這些類是可序列化的,比如在Java或者scala中實現Serializable類,除了java這種序列化機制,還可以選擇其他方式,使得序列化工作更加高效。

worker節(jié)點接收到程序之后,在spark資源管理器的指揮下運行RDD程序。

不同worker節(jié)點之間的運行操作是并行的。

? 在worker節(jié)點上所運行的RDD中代碼的變量是保存在worker節(jié)點上面的,在spark編程中,很多時候用戶需要在driver程序中進行相關數據操作之后把該數據傳給RDD對象的方法以做進一步處理,這時候,spark框架會自動幫用戶把這些數據通過網絡傳給相應的worker節(jié)點。

除了這種以變量的形式定義傳輸數據到worker節(jié)點之外,spark還另外提供了兩種機制,分別是broadcast和accumulator。

相比于變量的方式,在一定場景下使用broadcast比較有優(yōu)勢,因為所廣播的數據在每一個worker節(jié)點上面只存一個副本,而在spark算子中使用到的外部變量會在每一個用到它的task中保存一個副本,即使這些task在同一個節(jié)點上面。

所以當數據量比較大的時候,建議使用廣播而不是外部變量。
#####################以上是轉載的內容###########################

好了,這里加點東西,

如果是broadcast方式如何使用呢?代碼如下:

import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val broadcastVar = spark.sparkContext.broadcast(Array("orange","apple","pear","orange"))val dictionary = Map(("man"-> "noun"), ("is"->"verb"),("mortal"->"adjective"))def getElementsCount(word :String, dictionary:Map[String,String]):(String,Int) = {dictionary.filter{ case (wording,wordType) => wording.equals((word))}.map(x => (x._2,1)).headOption.getOrElse(("unknown" -> 1))//some dummy logic}val words = spark.sparkContext.parallelize(Array("man","is","mortal","mortal","1234","789","456","is","man"))val grammarElementCounts = words.map( word =>getElementsCount(word,dictionary)).reduceByKey((x,y) => x+y)grammarElementCounts.collect().foreach(println)spark.sparkContext.stop() } }

運行方式是:

scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') broadcast_test.scala

運行結果:

(adjective,2)
(noun,2)
(verb,2)
(unknown,3)
?

?

如果是accumulate的方式如何計數呢?accumulate_test.scala代碼如下:

import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = spark.sparkContext.accumulator(0)//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }

運行方法:

scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') accumulate_test.scala

運行結果:

6

#######################################

另外關于Node數量和Executor數量放個圖

?

參考文獻:

https://blog.csdn.net/liangyihuai/article/details/56840473
https://www.cnblogs.com/sunshisonghit/p/6063296.html?utm_source=itdadao&utm_medium=referral

http://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/spark/chapters/04_acc_broadcast.html

https://blog.knoldus.com/broadcast-variables-in-spark-how-and-when-to-use-them/

?

?

總結

以上是生活随笔為你收集整理的理解spark闭包以及broadcast(转载)的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。