日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

SparkProgrammingRDDs

發(fā)布時(shí)間:2024/6/21 编程问答 30 豆豆
生活随笔 收集整理的這篇文章主要介紹了 SparkProgrammingRDDs 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

Introduction to Core Spark Concepts

  • driver program:
  • 在集群上啟動(dòng)一系列的并行操作
  • 包含應(yīng)用的main函數(shù),定義集群上的分布式數(shù)據(jù)集,操作數(shù)據(jù)集
  • 通過SparkContext對(duì)象訪問spark,這表示了與計(jì)算集群的連接
    • executors
    • the place to run the operations
    • Spark automatically takes ur function and?ships it to executor nodes.

    Programming with RDDs

    • RDD: spark's core abstraction for working with data.
    • RDD簡(jiǎn)單來說就是元素的分布式集合
    • 在Spark中所有的工作都可以表示成創(chuàng)建一個(gè)新的RDDs,轉(zhuǎn)換已有的RDDs,或者是在RDDs上運(yùn)行operations

    RDD Basics

    • An?immutable?distributed collection of objects.
    • 每個(gè)RDD被split成多個(gè)partitions,每個(gè)partition可能在cluster的不同節(jié)點(diǎn)上被計(jì)算
    • RDD的創(chuàng)建:
    • loading一個(gè)外部數(shù)據(jù)集
    • distributing對(duì)象集合(eg: a list or set)
    • RDD操作:(區(qū)分這兩種操作的原因是Spark的計(jì)算是lazy fashion的
  • transformations:從原RDD創(chuàng)建一個(gè)新的RDD
  • actions:基于RDD計(jì)算一個(gè)result,這個(gè)結(jié)果要么返回給driver program,要么存儲(chǔ)到外部存儲(chǔ)系統(tǒng)(eg: HDFS)
  • RDD.persist():由于缺省情況下,每次運(yùn)行action的時(shí)候RDDs是重新計(jì)算的。如果對(duì)RDD進(jìn)行persist,那么該RDD會(huì)persist到內(nèi)存(或disk),下次action的時(shí)候可以reuse。

    Creating RDDs

    • parallelize()
    val lines = sc.parallelize(List("pandas", "I like pandas"))
    • textFile()
    val lines = sc.textFile("/path/to/README.md")

    RDD Operations

    • Transformation & Action

    Transformations

    • Compute lazily
    • 沒有改變?cè)璕DD(immutable),而是生成了新的RDD,spark會(huì)保存這一系列依賴關(guān)系(lineage)

    Actions

    • Actually do something with our dataset

    Passing Functions to Spark

    • Scala: we can pass in functions defined inline, references to methods, or static functions
    • Scala: 我們所傳送的函數(shù)和其中的數(shù)據(jù)引用需要被序列化(實(shí)現(xiàn)Java的Serializable接口)
    • 如果我們pass一個(gè)對(duì)象中的函數(shù),或者包含了對(duì)象中的字段的引用(eg: self.field),spark會(huì)把整個(gè)對(duì)象發(fā)送給worker nodes,這會(huì)遠(yuǎn)大于你所需要的信息。并且如果你的對(duì)象不能持久化(pickle in python)的話,會(huì)導(dǎo)致是你的程序失敗。舉一個(gè)python的例子:

      錯(cuò)誤示范如下:

    View Code

      正確示范:(提取對(duì)象中你所需的字段為局部變量,然后傳進(jìn)去)

    View Code
    • 同樣的,對(duì)scala我們也要盡量避免上述情況,而且要注意的是在scala中不需要顯示的self.或者this.,所以這種情況顯得很不明顯,但仍然要注意。舉個(gè)栗子
    View Code

      如果在scala中出現(xiàn)了NotSerializableException,那么多半是因?yàn)橐昧艘粋€(gè)不可序列化的類中的變量或字段。所以,傳送一個(gè)局部的可序列化的變量或函數(shù)才是安全的。

    • Any code that is shared by RDD transformations must always be serializable.

    Common Transformations and Actions

    Basic RDDs

    • 我們首先介紹基本的RDD操作,它們可以執(zhí)行在所有RDDs上而不用管數(shù)據(jù)

    Element-wise transformations

    • map() and filter()
    • flatMap(): 為每一個(gè)輸入元素產(chǎn)生多個(gè)輸出元素。返回的是一個(gè)迭代器iterator
    val lines = sc.parallelize(List("hello world", "hi")) val words = lines.flatMap(line => line.split(" "))

    Psedudo set operations

    • 一些簡(jiǎn)單的集合操作:(需要RDDs是同一類型的)
      • RDD1.distinct() ?--> 十分昂貴的操作,需要shuffle all data over the network
      • RDD1.union(RDD2) ?--> 最簡(jiǎn)單的集合操作,會(huì)保留原RDD中的重復(fù)值
      • RDD1.intersection(RDD2) --> 需要去重(來識(shí)別共同元素),因而也需要shuffle
      • RDD1.substract(RDD2) ?--> perform shuffle
      • RDD1.cartesian(RDD2) ?-->?returns all possible pairs of (a, b) where a is in the source RDD and b is in the other RDD .十分昂貴
    • 為什么叫psedudo即假的集合操作呢,因?yàn)檫@里的集合丟失了一個(gè)重要特性:uniqueness即元素的唯一性。因?yàn)槲覀兘?jīng)常有duplicates

    Actions

    • reduce() &?fold() :都需要返回值和RDD中的元素保持同一類型。

      fold()接收與reduce接收的函數(shù)簽名相同的函數(shù),另外再加上一個(gè)初始值作為第一次調(diào)用的結(jié)果.

    val sum = rdd.reduce((x, y) => x + y)
    • aggregate(): frees us from the constraint of having the return be the same types as the RDD we are working on.

      aggregate的函數(shù)原型:

    def aggregate [U: ClassTag] (zeroValue: U) (seqOp: (U,T)=>U,combOp: (U,U)=>U):U

      可以看到,(zeroValue: U)是給定的一個(gè)初值,后半部分有兩個(gè)函數(shù),seqOp相當(dāng)于是在各個(gè)分區(qū)里進(jìn)行的聚合操作,它支持(U, T) => U,也就是支持不同類型的聚合。comOp是將sepOp后的結(jié)果聚合,此時(shí)的結(jié)果全部是U類,也就是只能進(jìn)行同構(gòu)聚合。

      一個(gè)經(jīng)典的例子是求平均值。即先用seqOp求出各個(gè)分區(qū)中的sum和個(gè)數(shù),再將seqOp后的結(jié)果聚合得到總的sum和總的個(gè)數(shù)。

    View Code
    • collect(): 返回整個(gè)RDD中的內(nèi)容,常用于單元測(cè)試,因?yàn)樗枰愕恼麄€(gè)數(shù)據(jù)集能夠fit on a single machine.
    • take(n): 返回RDD中的n個(gè)元素,并且試圖最小化所訪問的partition數(shù),所以它可能會(huì)返回一個(gè)biased?collection。
    • takeSample(withReplacement, num, seed): allows us to take a sample of our data either with or without replacement.
    • foreach(): 可以允許我們?cè)诿總€(gè)元素上執(zhí)行操作or計(jì)算,而不需要把元素送回driver

    ?Converting Between RDD Types

    • 一些functions只在某些特定類型RDD上可用。比如mean(), variance()只用于numericRDDs, join()只用于key/value pair RDDs.
    • 在scala和Java中,這些方法未在標(biāo)準(zhǔn)RDD類中定義,因此為了訪問這些附加的功能,我們需要確保我們得到了正確的specialized class。

    Scala

    • 在scala中。RDDs的轉(zhuǎn)換可以通過使用隱式轉(zhuǎn)換(using implicit conversions)來自動(dòng)進(jìn)行。
    • 看一段RDD.scala源碼中的介紹
    View Code
    • 關(guān)于scala隱式轉(zhuǎn)換:?當(dāng)對(duì)象調(diào)用類中不存在的方法或成員時(shí),編譯器會(huì)自動(dòng)將對(duì)象進(jìn)行隱式轉(zhuǎn)換
    • 隱式轉(zhuǎn)換帶來的confusion:當(dāng)你在RDD上調(diào)用mean()這樣的方法時(shí),你會(huì)發(fā)現(xiàn)在RDDclass 的Scaladocs中找不到mean()方法,但是該方法能成功調(diào)用是由于實(shí)現(xiàn)了RDD[Double]到DoubleRDDFunctions的隱式轉(zhuǎn)換。

    Persistence(Caching)

    • As discussed earlier, Spark RDDs是惰性求值的,如果我們想要多次使用同一個(gè)RDD的話,Spark通常會(huì)每次都重新計(jì)算該RDD和它所有的依賴。這對(duì)于迭代算法是十分昂貴的。
    • 一個(gè)比較直觀的例子如下,每次action的時(shí)候都會(huì)重新計(jì)算:
    View Code
    • 為了避免多次重復(fù)計(jì)算同一個(gè)RDD,我們可以讓Spark來persist數(shù)據(jù)。這樣的話,計(jì)算該RDD的那個(gè)節(jié)點(diǎn)會(huì)保存它們的partition。
    • 如果有數(shù)據(jù)持久化的節(jié)點(diǎn)fail掉了,Spark會(huì)在需要的時(shí)候重新計(jì)算丟失的partitons。當(dāng)然我們也可以通過在多個(gè)節(jié)點(diǎn)保存副本的方式來避免節(jié)點(diǎn)故障時(shí)的slowdown。
    • Spark有很多l(xiāng)evels of persistence供選擇。
    • LevelSpace UsedCPU timeIn MemoryOn DiskComments

      MEMORY_ONLY?

      HighLowYN?

      MEMORY_ONLY_SER?

      LowHighYN?

      MEMORY_AND_DISK?

      HighMediumSomeSomeSpils to disk if there is too much data to fit in memory.

      MEMORY_AND_DISK_SER?

      LowHighSomeSome

      Spills to disk if there is too much data to fit in memory. Stores serialized representation in memory.?

      DISK_ONLY?

      LowHighNY?
    • 在Java和scala中,缺省的情況下persist()回將未序列化的對(duì)象數(shù)據(jù)保存在JVM的堆中。
    • 如果你試圖在內(nèi)存中cache過多的數(shù)據(jù),Spark將會(huì)自動(dòng)驅(qū)逐舊的partitions,使用最少最近使用(Least Recently Used, LRU)緩存策略。對(duì)于MEMORY_ONLY level,下次訪問的時(shí)候會(huì)重新計(jì)算這些被驅(qū)逐的分片。
    • 由于Spark'的各種機(jī)制,無論使用哪種level,你都可以不用擔(dān)心job breaking。但是緩存不必要的數(shù)據(jù)將會(huì)導(dǎo)致有用數(shù)據(jù)被驅(qū)逐,從而增加重計(jì)算的時(shí)間。
    • Spark提供了unpersist()方法可以讓你手工地將RDD移除緩存。
    • Off-heap caching is experimental and uses Tachyon. If you are interested in off-heap caching with Spark, take a look at the Running Spark on Tachyon guide.?

      ?

    轉(zhuǎn)載于:https://www.cnblogs.com/wttttt/p/6826719.html

    總結(jié)

    以上是生活随笔為你收集整理的SparkProgrammingRDDs的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。