理解spark闭包以及broadcast(转载)
什么叫閉包:
跨作用域訪問函數變量。
又指的一個擁有許多變量和綁定了這些變量的環(huán)境的表達式(通常是一個函數),因而這些變量也是該表達式的一部分。
Spark閉包的問題引出:?
在spark中實現統(tǒng)計List(1,2,3)的和。如果使用下面的代碼,程序打印的結果不是6,而是0。這個和我們編寫單機程序的認識有很大不同。為什么呢?
test.scala代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = 0//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }運行方法:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') test.scala
?
問題分析:?
counter是在foreach函數外部定義的,也就是在driver程序中定義,而foreach函數是屬于rdd對象的,rdd函數的執(zhí)行位置是各個worker節(jié)點(或者說worker進程),main函數是在driver節(jié)點上(或者說driver進程上)執(zhí)行的,所以當counter變量在driver中定義,被在rdd中使用的時候,出現了變量的“跨域”問題,也就是閉包問題。
問題解釋:?
對于上面程序中的counter變量,由于在main函數和在rdd對象的foreach函數是屬于不同“閉包”的,所以,傳進foreach中的counter是一個副本,初始值都為0。foreach中疊加的是counter的副本,不管副本如何變化,都不會影響到main函數中的counter,所以最終打印出來的counter為0.
當用戶提交了一個用scala語言寫的Spark程序,Spark框架會調用哪些組件呢?首先,這個Spark程序就是一個“Application”,程序里面的mian函數就是“Driver Program”, 前面已經講到它的作用,只是,dirver程序的可能運行在客戶端,也有可有可能運行在spark集群中,這取決于spark作業(yè)提交時參數的選定,比如,yarn-client和yarn-cluster就是分別運行在客戶端和spark集群中。在driver程序中會有RDD對象的相關代碼操作,比如下面代碼的newRDD.map()
class Test{def main(args: Array[String]) {val sc = new SparkContext(new SparkConf())val newRDD = sc.textFile("")newRDD.map(data => {//do somethingprintln(data.toString)})} }涉及到RDD的代碼,比如上面RDD的map操作,它們是在Worker節(jié)點上面運行的,所以spark會透明地幫用戶把這些涉及到RDD操作的代碼傳給相應的worker節(jié)點。
如果在RDD map函數中調用了在函數外部定義的對象,因為這些對象需要通過網絡從driver所在節(jié)點傳給其他的worker節(jié)點,所以要求這些類是可序列化的,比如在Java或者scala中實現Serializable類,除了java這種序列化機制,還可以選擇其他方式,使得序列化工作更加高效。
worker節(jié)點接收到程序之后,在spark資源管理器的指揮下運行RDD程序。
不同worker節(jié)點之間的運行操作是并行的。
? 在worker節(jié)點上所運行的RDD中代碼的變量是保存在worker節(jié)點上面的,在spark編程中,很多時候用戶需要在driver程序中進行相關數據操作之后把該數據傳給RDD對象的方法以做進一步處理,這時候,spark框架會自動幫用戶把這些數據通過網絡傳給相應的worker節(jié)點。
除了這種以變量的形式定義傳輸數據到worker節(jié)點之外,spark還另外提供了兩種機制,分別是broadcast和accumulator。
相比于變量的方式,在一定場景下使用broadcast比較有優(yōu)勢,因為所廣播的數據在每一個worker節(jié)點上面只存一個副本,而在spark算子中使用到的外部變量會在每一個用到它的task中保存一個副本,即使這些task在同一個節(jié)點上面。
所以當數據量比較大的時候,建議使用廣播而不是外部變量。
#####################以上是轉載的內容###########################
好了,這里加點東西,
如果是broadcast方式如何使用呢?代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val broadcastVar = spark.sparkContext.broadcast(Array("orange","apple","pear","orange"))val dictionary = Map(("man"-> "noun"), ("is"->"verb"),("mortal"->"adjective"))def getElementsCount(word :String, dictionary:Map[String,String]):(String,Int) = {dictionary.filter{ case (wording,wordType) => wording.equals((word))}.map(x => (x._2,1)).headOption.getOrElse(("unknown" -> 1))//some dummy logic}val words = spark.sparkContext.parallelize(Array("man","is","mortal","mortal","1234","789","456","is","man"))val grammarElementCounts = words.map( word =>getElementsCount(word,dictionary)).reduceByKey((x,y) => x+y)grammarElementCounts.collect().foreach(println)spark.sparkContext.stop() } }運行方式是:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') broadcast_test.scala
運行結果:
(adjective,2)
(noun,2)
(verb,2)
(unknown,3)
?
?
如果是accumulate的方式如何計數呢?accumulate_test.scala代碼如下:
import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.log4j.Logger import org.apache.log4j.Level object Test {def main(args:Array[String]):Unit = {Logger.getLogger("org").setLevel(Level.OFF)Logger.getLogger("akka").setLevel(Level.OFF)Logger.getRootLogger().setLevel(Level.ERROR)val spark = SparkSession.builder.appName("Intro").config("spark.master", "local").getOrCreate();val rdd = spark.sparkContext.parallelize(List(1,2,3))var counter = spark.sparkContext.accumulator(0)//warn: don't do thisrdd.foreach(x => counter += x)println("Counter value: "+counter)spark.sparkContext.stop() } }運行方法:
scala -classpath $(echo *.jar ~/bigdata/spark-2.3.
1-bin-hadoop2.7/jars/*.jar| tr ' ' ':') accumulate_test.scala
運行結果:
6
#######################################
另外關于Node數量和Executor數量放個圖
?
參考文獻:
https://blog.csdn.net/liangyihuai/article/details/56840473
https://www.cnblogs.com/sunshisonghit/p/6063296.html?utm_source=itdadao&utm_medium=referral
http://www.huaxiaozhuan.com/%E5%B7%A5%E5%85%B7/spark/chapters/04_acc_broadcast.html
https://blog.knoldus.com/broadcast-variables-in-spark-how-and-when-to-use-them/
?
?
總結
以上是生活随笔為你收集整理的理解spark闭包以及broadcast(转载)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ubuntu下面的背光键盘的使用
- 下一篇: 一句话讲清楚RDD是什么?