日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程语言 > java >内容正文

java

Spark Java API:broadcast、accumulator

發布時間:2024/1/17 java 28 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark Java API:broadcast、accumulator 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

broadcast


官方文檔描述:

Broadcast?a?read-only?variable?to?the?cluster,?returning?a [[org.apache.spark.broadcast.Broadcast]]?object?for?reading?it?in?distributed?functions. The?variable?will?be?sent?to?each?cluster?only?once.
  • 1
  • 2
  • 3
  • 4

函數原型:

def?broadcast[T](value:?T):?Broadcast[T]
  • 1

廣播變量允許程序員將一個只讀的變量緩存在每臺機器上,而不用在任務之間傳遞變量。廣播變量可被用于有效地給每個節點一個大輸入數據集的副本。Spark還嘗試使用高效地廣播算法來分發變量,進而減少通信的開銷。?Spark的動作通過一系列的步驟執行,這些步驟由分布式的洗牌操作分開。Spark自動地廣播每個步驟每個任務需要的通用數據。這些廣播數據被序列化地緩存,在運行任務之前被反序列化出來。這意味著當我們需要在多個階段的任務之間使用相同的數據,或者以反序列化形式緩存數據是十分重要的時候,顯式地創建廣播變量才有用。

源碼分析:

def?broadcast[T:?ClassTag](value:?T):?Broadcast[T]?=?{? assertNotStopped()? if?(classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass))?{? ? //?This?is?a?warning?instead?of?an?exception?in?order?to?avoid?breaking?user?programs?that? ? //?might?have?created?RDD?broadcast?variables?but?not?used?them:? ? logWarning("Can?not?directly?broadcast?RDDs;?instead,?call?collect()?and?"? ? ? +?"broadcast?the?result?(see?SPARK-5063)")? }? val?bc?=?env.broadcastManager.newBroadcast[T](value,?isLocal)? val?callSite?=?getCallSite? logInfo("Created?broadcast?"?+?bc.id?+?"?from?"?+?callSite.shortForm)? cleaner.foreach(_.registerBroadcastForCleanup(bc))? bc }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

實例:

List<Integer>?data?=?Arrays.asList(5,?1,?1,?4,?4,?2,?2); JavaRDD<Integer>?javaRDD?=?javaSparkContext.parallelize(data,5); final?Broadcast<List<Integer>>?broadcast?=?javaSparkContext.broadcast(data); JavaRDD<Integer>?result?=?javaRDD.map(new?Function<Integer,?Integer>()?{? ? List<Integer>?iList?=?broadcast.value();? ? @Override? ? public?Integer?call(Integer?v1)?throws?Exception?{? ? ? ? Integer?isum?=?0;? ? ? ? for(Integer?i?:?iList)? ? ? ? ? ? isum?+=?i;? ? ? ? return?v1?+?isum;? ? } }); System.out.println(result.collect());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

accumulator


官方文檔描述:

? Create?an?[[org.apache.spark.Accumulator]]?variable?of?a?given?type,?which?tasks?can?“add”?
? values?to?using?the?add?method.?Only?the?master?can?access?the?accumulator’s?value.

函數原型:

def?accumulator[T](initialValue:?T,?accumulatorParam:?AccumulatorParam[T]):?Accumulator[T] def?accumulator[T](initialValue:?T,?name:?String,?accumulatorParam:?AccumulatorParam[T])? ?:?Accumulator[T]
  • 1
  • 2
  • 3

累加器是僅僅被相關操作累加的變量,因此可以在并行中被有效地支持。它可以被用來實現計數器和sum。Spark原生地只支持數字類型的累加器,開發者可以添加新類型的支持。如果創建累加器時指定了名字,可以在Spark的UI界面看到。這有利于理解每個執行階段的進程(對于Python還不支持)?。?
累加器通過對一個初始化了的變量v調用SparkContext.accumulator(v)來創建。在集群上運行的任務可以通過add或者”+=”方法在累加器上進行累加操作。但是,它們不能讀取它的值。只有驅動程序能夠讀取它的值,通過累加器的value方法。

源碼分析:

def?accumulator[T](initialValue:?T,?name:?String)(implicit?param:?AccumulatorParam[T])? :?Accumulator[T]?=?{? val?acc?=?new?Accumulator(initialValue,?param,?Some(name))? cleaner.foreach(_.registerAccumulatorForCleanup(acc))? acc }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

實例:

class VectorAccumulatorParam implements AccumulatorParam<Vector> { @Override //合并兩個累加器的值。//參數r1是一個累加數據集合//參數r2是另一個累加數據集合public Vector addInPlace(Vector r1, Vector r2) {r1.addAll(r2);return r1; } @Override //初始值 public Vector zero(Vector initialValue) { return initialValue; } @Override//添加額外的數據到累加值中//參數t1是當前累加器的值//參數t2是被添加到累加器的值 public Vector addAccumulator(Vector t1, Vector t2) { t1.addAll(t2); return t1; } } List<Integer>?data?=?Arrays.asList(5,?1,?1,?4,?4,?2,?2); JavaRDD<Integer>?javaRDD?=?javaSparkContext.parallelize(data,5);final?Accumulator<Integer>?accumulator?=?javaSparkContext.accumulator(0); Vector?initialValue?=?new?Vector(); for(int?i=6;i<9;i++)? ? initialValue.add(i); //自定義累加器 final?Accumulator?accumulator1?=?javaSparkContext.accumulator(initialValue,new?VectorAccumulatorParam()); JavaRDD<Integer>?result?=?javaRDD.map(new?Function<Integer,?Integer>()?{? ? @Override? ? public?Integer?call(Integer?v1)?throws?Exception?{? ? ? ? accumulator.add(1);? ? ? ? Vector?term?=?new?Vector();? ? ? ? term.add(v1);? ? ? ? accumulator1.add(term);? ? ? ? return?v1;? ? } }); System.out.println(result.collect()); System.out.println("~~~~~~~~~~~~~~~~~~~~~"?+?accumulator.value()); System.out.println("~~~~~~~~~~~~~~~~~~~~~"?+?accumulator1.value());

總結

以上是生活随笔為你收集整理的Spark Java API:broadcast、accumulator的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。