日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 人文社科 > 生活经验 >内容正文

生活经验

Spark 分布式计算原理

發布時間:2023/11/27 生活经验 34 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Spark 分布式计算原理 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Spark 分布式計算原理

Spark Shuffle

1)在數據之間重新分配數據
2)(將父RDD重新定義進入子RDD)每一個分區里面的數據要重新進入新的分區
3)每一個shuffle階段盡量保存在內存里面,如果保存不下到磁盤
4)在每個shuffle階段不會改變分區的數量

RDD的依賴關系-1(lineage)

1) 寬依賴:一個夫RDD的分區被子RDD的多個分區使用
發生寬依賴一定shuffle()
(相當于超生)
2) 窄依賴:一個父RDD的分區被子RDD的一個分區使用

?

RDD的依賴關系-2(lineage)?? 寬依賴對比窄依賴

?

DAG工作原理

  • 根據RDD之間的依賴關系,形成一個DAG(有向無環)
1)從后往前,遇到寬依賴切割為新的Stage
2)每個Stage由分區一組并行的Task組成
每個Task共享歸類內存,堆外內存Task數據在進行交換
提前聚合,避免shuffle,將數據先進行去重

RDD持久化-1

cache:

  • 間數據寫入緩存
  • cache()不能再有其他的算子
val rdd=sc.makeRDD(1 to 10)
val rdd2=rdd.map(x=>{println(x);x}
rdd2.cache
rdd2.collect

RDD共享變量-1

  • 廣播變量(要定義的是Array)
val rdd=sc.makeRDD(1 to 10)
val j=sc.broadcast(Array(0))
rdd.map(x=>{j.value(0)=j.value(0)+1;println(j.value(0));x}).collect

RDD共享變量-2

  • 累加器:只允許added操作,常用于實現計數
val accum = sc.accumulator(0,"My Accumulator")
sc.parallelize(Array(1,2,3,4)).foreach(x=>accum+=x)
accum.value

RDD分區設計

  • 分區大小限制為2GB
分區太小
1)分區承擔的責任越大,內存壓力越大
分區過多
1)shuffle開銷越大
2)創建任務開銷越大

裝載CSV數據源

方法一:使用SparkContext
System.setProperty("hadoop.home.dir", "D:\\hadoop-2.7.3")//防止hadoop報錯val conf=new SparkConf().setMaster("local[2]").setAppName("hello")val sc=SparkContext.getOrCreate(conf)
val lines = sc.textFile("file:///d:/users.csv")
val fields = lines.mapPartitionsWithIndex((idx, iter) => if (idx == 0) iter.drop(1) else iter).map(l => l.split(",")).foreach(x=>println(x.toList))
————————————————————————————————————————————————————————————————————————————————————————————————
方法二:使用SparkSession
val df = spark.read.format("csv").option("header", "true").load("file:///home/kgc/data/users.csv")

裝載JSON數據源

方法一:使用SparkContext
val lines = sc.textFile("file:///home/kgc/data/users.json")
//scala內置的JSON庫
import scala.util.parsing.json.JSON
val result=lines.map(l=>JSON.parseFull(l))
————————————————————————————————————————————————————————————————————————————————————————————————
還有一種使用SparkSession方法:API
var spark=SparkSession.builder().master("local[2]").appName("hello").getOrCreate();val rdd=spark.read.json("file:///d:/date.json")print(rdd)
————————————————————————————————————————————————————————————————————————————————————————————————
方法二:使用SparkSession
val df = spark.read.format("json").option("header", "true").load("file:///home/kgc/data/users.json")

RDD數據傾斜*

  • 數據分配的不均勻
  • 通常發生在groupBy,join等之后
1)在執行shuffle操作的時候,是按照key,來進行values的輸出、拉取和聚合2)同一個key的values,一定是分配到一個reduce task進行處理的
3)如果是很多相同的key對應的values被分配到了一個task上面去執行,而另外的task,可能只分配了一些
4)這樣就會出現數據傾斜問題

解決方法:

方案一:聚合源數據
通過一些聚合的操作,比如grouByKey、reduceByKey就是拿到每個key對應的value,對每個key對應的values執行一定的計算
方案二:過濾導致傾斜的key
在sql中用where條件,過濾某幾個會導致數據傾斜的key

——————————————————————————————————————————————————

為什么要劃分Stage

spark劃分stage思路:
1)從后往前推,一個job拆分為多組task,每組的任務被稱為一個stage
2)stage里面的Task的數量對應一個partition,而stage又分為兩類,
一類是shuffleMapTask,一類是resltTask,DAG的最后一個階段為每個partition生成一個resultask,
其余階段都會生成ShuffleMapTask,他將自己的計算結果通過shuffle傳到下一個stage中。

——————————————————————————————————————————————————

?

轉載于:https://www.cnblogs.com/tudousiya/p/11285866.html

總結

以上是生活随笔為你收集整理的Spark 分布式计算原理的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。