日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

分布式实时计算—Spark—Spark Core

發(fā)布時(shí)間:2024/4/15 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 分布式实时计算—Spark—Spark Core 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

原文作者:bingoabin

原文地址:Spark Core

目錄

一、Spark Core

1. 主要功能

2. Spark Core子框架

3. Spark架構(gòu)

4. Spark計(jì)算模型

二、組件

1. 介紹

2. RDD

3. DataFrame

4. DataSet

6. RDD和DataSet比較

7. DataFrame和DataSet比較

8. 應(yīng)用場景


一、Spark Core

Apache Spark 是加州大學(xué)伯克利分校的 AMP Labs 開發(fā)的開源分布式輕量級通用計(jì)算框架。由于 Spark 基于內(nèi)存設(shè)計(jì),使得它擁有比 Hadoop 更高的性能(極端情況下可以達(dá)到 100x),并且對多語言(Scala、Java、Python)提供支持。其一棧式的設(shè)計(jì)特點(diǎn)使得我們的學(xué)習(xí)和維護(hù)成本大大地減少,而且其提供了很好的容錯(cuò)解決方案。

1. 主要功能

Spark Core提供Spark最基礎(chǔ)與最核心的功能,主要包括以下功能:

  • SparkContext:通常而言,Driver Application的執(zhí)行與輸出都是通過SparkContext來完成的。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隱藏了網(wǎng)絡(luò)通信、分布式部署、消息通信、存儲(chǔ)能力、計(jì)算能力、緩存、測量系統(tǒng)、文件服務(wù)、Web服務(wù)等內(nèi)容,應(yīng)用程序開發(fā)者只需要使用SparkContext提供的API完成功能開發(fā)。SparkContext內(nèi)置的DAGScheduler負(fù)責(zé)創(chuàng)建Job,將DAG中的RDD劃分到不同的Stage,提交Stage等功能。內(nèi)置的TaskScheduler負(fù)責(zé)資源的申請,任務(wù)的提交及請求集群對任務(wù)的調(diào)度等工作。?
  • 存儲(chǔ)體系:Spark優(yōu)先考慮使用各節(jié)點(diǎn)的內(nèi)存作為存儲(chǔ),當(dāng)內(nèi)存不足時(shí)才會(huì)考慮使用磁盤,這極大地減少了磁盤IO,提升了任務(wù)執(zhí)行的效率,使得Spark適用于實(shí)時(shí)計(jì)算、流式計(jì)算等場景。此外,Spark還提供了以內(nèi)存為中心的高容錯(cuò)的分布式文件系統(tǒng)Tachyon供用戶進(jìn)行選擇。Tachyon能夠?yàn)镾park提供可靠的內(nèi)存級的文件共享服務(wù)。?
  • 計(jì)算引擎:計(jì)算引擎由SparkContext中的DAGScheduler、RDD以及具體節(jié)點(diǎn)上的Executor負(fù)責(zé)執(zhí)行的Map和Reduce任務(wù)組成。DAGScheduler和RDD雖然位于SparkContext內(nèi)部,但是在任務(wù)正式提交與執(zhí)行之前會(huì)將Job中的RDD組織成有向無環(huán)圖(DAG),并對Stage進(jìn)行劃分,決定了任務(wù)執(zhí)行階段任務(wù)的數(shù)量、迭代計(jì)算、shuffle等過程。?
  • 部署模式:由于單節(jié)點(diǎn)不足以提供足夠的存儲(chǔ)和計(jì)算能力,所以作為大數(shù)據(jù)處理的Spark在SparkContext的TaskScheduler組件中提供了對Standalone部署模式的實(shí)現(xiàn)和Yarn、Mesos等分布式資源管理系統(tǒng)的支持。通過使用Standalone、Yarn、Mesos等部署模式為Task分配計(jì)算資源,提高任務(wù)的并發(fā)執(zhí)行效率。
  • 2. Spark Core子框架

    (1)、Spark SQL:首先使用SQL語句解析器(SqlParser)將SQL轉(zhuǎn)換為語法樹(Tree),并且使用規(guī)則執(zhí)行器(RuleExecutor)將一系列規(guī)則(Rule)應(yīng)用到語法樹,最終生成物理執(zhí)行計(jì)劃并執(zhí)行。其中,規(guī)則執(zhí)行器包括語法分析器(Analyzer)和優(yōu)化器(Optimizer)。?

    (2)、Spark Streaming:用于流式計(jì)算。Spark Streaming支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和簡單的TCP套接字等多種數(shù)據(jù)輸入源。輸入流接收器(Receiver)負(fù)責(zé)接入數(shù)據(jù),是接入數(shù)據(jù)流的接口規(guī)范。Dstream是Spark Streaming中所有數(shù)據(jù)流的抽象,Dstream可以被組織為Dstream Graph。Dstream本質(zhì)上由一系列連續(xù)的RDD組成。?

    (3)、GraphX:Spark提供的分布式圖計(jì)算框架。GraphX主要遵循整體同步并行(bulk Synchronous parallel,BSP)計(jì)算模式下的Pregel模型實(shí)現(xiàn)。GraphX提供了對圖的抽象Graph,Graph由頂點(diǎn)(Vertex),邊(Edge)及繼承了Edge的EdgeTriplet三種結(jié)構(gòu)組成。GraphX目前已經(jīng)封裝了最短路徑,網(wǎng)頁排名,連接組件,三角關(guān)系統(tǒng)計(jì)等算法的實(shí)現(xiàn),用戶可以選擇使用。?

    (4)、MLlib:Spark提供的機(jī)器學(xué)習(xí)框架。機(jī)器學(xué)習(xí)是一門設(shè)計(jì)概率論、統(tǒng)計(jì)學(xué)、逼近論、凸分析、算法復(fù)雜度理論等多領(lǐng)域的交叉學(xué)科。MLlib目前已經(jīng)提供了基礎(chǔ)統(tǒng)計(jì)、分析、回歸、決策樹、隨機(jī)森林、樸素貝葉斯、保序回歸、協(xié)同過濾、聚類、維數(shù)縮減、特征提取與轉(zhuǎn)型、頻繁模式挖掘、預(yù)言模型標(biāo)記語言、管道等多種數(shù)理統(tǒng)計(jì)、概率論、數(shù)據(jù)挖掘方面的數(shù)學(xué)算法。

    3. Spark架構(gòu)

    Spark采用了分布式計(jì)算中的Master-Slave模型。Master作為整個(gè)集群的控制器,負(fù)責(zé)整個(gè)集群的正常運(yùn)行;Worker是計(jì)算節(jié)點(diǎn),接受主節(jié)點(diǎn)命令以及進(jìn)行狀態(tài)匯報(bào);Executor負(fù)責(zé)任務(wù)(Tast)的調(diào)度和執(zhí)行;Client作為用戶的客戶端負(fù)責(zé)提交應(yīng)用;Driver負(fù)責(zé)控制一個(gè)應(yīng)用的執(zhí)行。

    ?

    Spark集群啟動(dòng)時(shí),需要從主節(jié)點(diǎn)和從節(jié)點(diǎn)分別啟動(dòng)Master進(jìn)程和Worker進(jìn)程,對整個(gè)集群進(jìn)行控制。在一個(gè)Spark應(yīng)用的執(zhí)行過程中,Driver是應(yīng)用的邏輯執(zhí)行起點(diǎn),運(yùn)行Application的main函數(shù)并創(chuàng)建SparkContext,DAGScheduler把對Job中的RDD有向無環(huán)圖根據(jù)依賴關(guān)系劃分為多個(gè)Stage,每一個(gè)Stage是一個(gè)TaskSet, TaskScheduler把Task分發(fā)給Worker中的Executor;Worker啟動(dòng)Executor,Executor啟動(dòng)線程池用于執(zhí)行Task。

    4. Spark計(jì)算模型

    RDD:彈性分布式數(shù)據(jù)集,是一種內(nèi)存抽象,可以理解為一個(gè)大數(shù)組,數(shù)組的元素是RDD的分區(qū)Partition,分布在集群上;在物理數(shù)據(jù)存儲(chǔ)上,RDD的每一個(gè)Partition對應(yīng)的就是一個(gè)數(shù)據(jù)塊Block,Block可以存儲(chǔ)在內(nèi)存中,當(dāng)內(nèi)存不夠時(shí)可以存儲(chǔ)在磁盤上。


    RDD邏輯物理結(jié)構(gòu)

    Hadoop將Mapreduce計(jì)算的結(jié)果寫入磁盤,在機(jī)器學(xué)習(xí)、圖計(jì)算、PageRank等迭代計(jì)算下,重用中間結(jié)果導(dǎo)致的反復(fù)I/O耗時(shí)過長,成為了計(jì)算性能的瓶頸。為了提高迭代計(jì)算的性能和分布式并行計(jì)算下共享數(shù)據(jù)的容錯(cuò)性,伯克利的設(shè)計(jì)者依據(jù)兩個(gè)特性而設(shè)計(jì)了RDD:

  • 數(shù)據(jù)集分區(qū)存儲(chǔ)在節(jié)點(diǎn)的內(nèi)存中,減少迭代過程(如機(jī)器學(xué)習(xí)算法)反復(fù)的I/O操作從而提高性能。?
  • 數(shù)據(jù)集不可變,并記錄其轉(zhuǎn)換過程,從而實(shí)現(xiàn)無共享數(shù)據(jù)讀寫同步問題、以及出錯(cuò)的可重算性。
  • Operations:算子

    算子是RDD中定義的函數(shù),可以對RDD中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換和操作。如下圖,Spark從外部空間(HDFS)讀取數(shù)據(jù)形成RDD_0,Tranformation算子對數(shù)據(jù)進(jìn)行操作(如fliter)并轉(zhuǎn)化為新的RDD_1、RDD_2,通過Action算子(如collect/count)觸發(fā)Spark提交作業(yè)。如上的分析過程可以看出,Tranformation算子并不會(huì)觸發(fā)Spark提交作業(yè),直至Action算子才提交作業(yè),這是一個(gè)延遲計(jì)算的設(shè)計(jì)技巧,可以避免內(nèi)存過快被中間計(jì)算占滿,從而提高內(nèi)存的利用率。

    下圖是算子的列表,分三大類:Value數(shù)據(jù)類型的Tranformation算子;Key-Value數(shù)據(jù)類型的Tranformation算子;Action算子。

    Lineage Graph:血統(tǒng)關(guān)系圖

    下圖的第一階段生成RDD的有向無環(huán)圖,即是血統(tǒng)關(guān)系圖,記錄了RDD的更新過程,當(dāng)這個(gè)RDD的部分分區(qū)數(shù)據(jù)丟失時(shí),它可以通過Lineage獲取足夠的信息來重新運(yùn)算和恢復(fù)丟失的數(shù)據(jù)分區(qū)。DAGScheduler依據(jù)RDD的依賴關(guān)系將有向無環(huán)圖劃分為多個(gè)Stage,一個(gè)Stage對應(yīng)著一系列的Task,由TashScheduler分發(fā)給Worker計(jì)算。

    二、組件

    1. 介紹

    spark生態(tài)系統(tǒng)中,Spark Core,包括各種Spark的各種核心組件,它們能夠?qū)?nèi)存和硬盤進(jìn)行操作,或者調(diào)用CPU進(jìn)行計(jì)算。spark core定義了RDD、DataFrame和DataSet

    spark最初只有RDD,DataFrame在Spark 1.3中被首次發(fā)布,DataSet在Spark1.6版本中被加入。

    2. RDD

    RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一個(gè)只讀的,可分區(qū)的分布式數(shù)據(jù)集,這個(gè)數(shù)據(jù)集的全部或部分可以緩存在內(nèi)存中,在多次計(jì)算間重用。

    優(yōu)點(diǎn):

    • 編譯時(shí)類型安全?
    • 編譯時(shí)就能檢查出類型錯(cuò)誤?
    • 面向?qū)ο蟮木幊田L(fēng)格?
    • 直接通過類名點(diǎn)的方式來操作數(shù)據(jù)

    缺點(diǎn):

    • 序列化和反序列化的性能開銷?
    • 無論是集群間的通信, 還是IO操作都需要對對象的結(jié)構(gòu)和數(shù)據(jù)進(jìn)行序列化和反序列化.?
    • GC的性能開銷?
    • 頻繁的創(chuàng)建和銷毀對象, 勢必會(huì)增加GC
    <span style="color:#000000"><code>import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkConf, SparkContext}object Run {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)/*** id age* 1 30* 2 29* 3 21*/case class Person(id: Int, age: Int)val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21)))// 優(yōu)點(diǎn)1// idAge.filter(_.age > "") // 編譯時(shí)報(bào)錯(cuò), int不能跟String比// 優(yōu)點(diǎn)2idAgeRDDPerson.filter(_.age > 25) // 直接操作一個(gè)個(gè)的person對象} } </code></span>

    3. DataFrame

    在Spark中,DataFrame是一種以RDD為基礎(chǔ)的分布式數(shù)據(jù)集,類似于傳統(tǒng)數(shù)據(jù)庫中的二維表格。DataFrame與RDD的主要區(qū)別在于,前者帶有schema元信息,即DataFrame所表示的二維表數(shù)據(jù)集的每一列都帶有名稱和類型。這使得Spark SQL得以洞察更多的結(jié)構(gòu)信息,從而對藏于DataFrame背后的數(shù)據(jù)源以及作用于DataFrame之上的變換進(jìn)行了針對性的優(yōu)化,最終達(dá)到大幅提升運(yùn)行時(shí)效率的目標(biāo)。反觀RDD,由于無從得知所存數(shù)據(jù)元素的具體內(nèi)部結(jié)構(gòu),Spark Core只能在stage層面進(jìn)行簡單、通用的流水線優(yōu)化。

    DataFrame引入了schema和off-heap

    schema : RDD每一行的數(shù)據(jù), 結(jié)構(gòu)都是一樣的.
    這個(gè)結(jié)構(gòu)就存儲(chǔ)在schema中。 Spark通過schame就能夠讀懂?dāng)?shù)據(jù), 因此在通信和IO時(shí)就只需要序列化和反序列化數(shù)據(jù),而結(jié)構(gòu)的部分就可以省略了。 off-heap : 意味著JVM堆以外的內(nèi)存,這些內(nèi)存直接受操作系統(tǒng)管理(而不是JVM)。Spark能夠以二進(jìn)制的形式序列化數(shù)據(jù)(不包括結(jié)構(gòu))到off-heap中,當(dāng)要操作數(shù)據(jù)時(shí),就直接操作off-heap內(nèi)存。由于Spark理解schema,所以知道該如何操作。

    off-heap就像地盤,schema就像地圖, Spark有地圖又有自己地盤了, 就可以自己說了算了, 不再受JVM的限制,也就不再收GC的困擾了。通過schema和off-heap,DataFrame解決了RDD的缺點(diǎn),但是卻丟了RDD的優(yōu)點(diǎn)。 DataFrame不是類型安全的, API也不是面向?qū)ο箫L(fēng)格的。

    <span style="color:#000000"><code>import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Run {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")val sqlContext = new SQLContext(sc)/*** id age* 1 30* 2 29* 3 21*/val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)// API不是面向?qū)ο蟮膇dAgeDF.filter(idAgeDF.col("age") > 25) // 不會(huì)報(bào)錯(cuò), DataFrame不是編譯時(shí)類型安全的idAgeDF.filter(idAgeDF.col("age") > "") } } </code></span>

    4. DataSet

    Dataset是一個(gè)強(qiáng)類型的特定領(lǐng)域的對象,這種對象可以函數(shù)式或者關(guān)系操作并行地轉(zhuǎn)換。每個(gè)Dataset也有一個(gè)被稱為一個(gè)DataFrame的類型化視圖,這種DataFrame是Row類型的Dataset,即Dataset[Row]Dataset是“懶惰”的,只在執(zhí)行行動(dòng)操作時(shí)觸發(fā)計(jì)算。本質(zhì)上,數(shù)據(jù)集表示一個(gè)邏輯計(jì)劃,該計(jì)劃描述了產(chǎn)生數(shù)據(jù)所需的計(jì)算。當(dāng)執(zhí)行行動(dòng)操作時(shí),Spark的查詢優(yōu)化程序優(yōu)化邏輯計(jì)劃,并生成一個(gè)高效的并行和分布式物理計(jì)劃。DataSet結(jié)合了RDD和DataFrame的優(yōu)點(diǎn),,并帶來的一個(gè)新的概念Encoder 當(dāng)序列化數(shù)據(jù)時(shí),Encoder產(chǎn)生字節(jié)碼與off-heap進(jìn)行交互,能夠達(dá)到按需訪問數(shù)據(jù)的效果, 而不用反序列化整個(gè)對象。 Spark還沒有提供自定義Encoder的API,但是未來會(huì)加入。下面看DataFrame和DataSet在2.0.0-preview中的實(shí)現(xiàn)

    <span style="color:#000000"><code>下面這段代碼, 在1.6.x中創(chuàng)建的是DataFrame // 上文DataFrame示例中提取出來的 val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema) </code></span> <span style="color:#000000"><code>但是同樣的代碼在2.0.0-preview中, 創(chuàng)建的雖然還叫DataFrame// sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的實(shí)現(xiàn), 返回值依然是DataFrame def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = { sparkSession.createDataFrame(rowRDD, schema) } </code></span> <span style="color:#000000"><code>但是其實(shí)卻是DataSet, 因?yàn)镈ataFrame被聲明為Dataset[Row]package object sql {// ...省略了不相關(guān)的代碼type DataFrame = Dataset[Row] } </code></span> <span style="color:#000000"><code>因此當(dāng)我們從1.6.x遷移到2.0.0的時(shí)候, 無需任何修改就直接用上了DataSet.下面是一段DataSet的示例代碼import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext}object Test {def main(args: Array[String]) {val conf = new SparkConf().setAppName("test").setMaster("local") // 調(diào)試的時(shí)候一定不要用local[*]val sc = new SparkContext(conf)val sqlContext = new SQLContext(sc)import sqlContext.implicits._val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))// 在2.0.0-preview中這行代碼創(chuàng)建出的DataFrame, 其實(shí)是DataSet[Row]val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)// 在2.0.0-preview中, 還不支持自定的Encoder, Row類型不行, 自定義的bean也不行// 官方文檔也有寫通過bean創(chuàng)建Dataset的例子,但是我運(yùn)行時(shí)并不能成功// 所以目前需要用創(chuàng)建DataFrame的方法, 來創(chuàng)建DataSet[Row]// sqlContext.createDataset(idAgeRDDRow)// 目前支持String, Integer, Long等類型直接創(chuàng)建DatasetSeq(1, 2, 3).toDS().show()sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()} } </code></span>

    5. RDD和DataFrame比較

    DataFrame與RDD相同之處,都是不可變分布式彈性數(shù)據(jù)集。不同之處在于,DataFrame的數(shù)據(jù)集都是按指定列存儲(chǔ),即結(jié)構(gòu)化數(shù)據(jù)。類似于傳統(tǒng)數(shù)據(jù)庫中的表。DataFrame的設(shè)計(jì)是為了讓大數(shù)據(jù)處理起來更容易。DataFrame允許開發(fā)者把結(jié)構(gòu)化數(shù)據(jù)集導(dǎo)入DataFrame,并做了higher-level的抽象; DataFrame提供特定領(lǐng)域的語言(DSL)API來操作你的數(shù)據(jù)集。上圖直觀地體現(xiàn)了DataFrame和RDD的區(qū)別。左側(cè)的RDD[Person]雖然以Person為類型參數(shù),但Spark框架本身不了解Person類的內(nèi)部結(jié)構(gòu)。而右側(cè)的DataFrame卻提供了詳細(xì)的結(jié)構(gòu)信息,使得Spark SQL可以清楚地知道該數(shù)據(jù)集中包含哪些列,每列的名稱和類型各是什么。DataFrame多了數(shù)據(jù)的結(jié)構(gòu)信息,即schema。RDD是分布式的Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的算子以外,更重要的特點(diǎn)是提升執(zhí)行效率、減少數(shù)據(jù)讀取以及執(zhí)行計(jì)劃的優(yōu)化,比如filter下推、裁剪等。

    6. RDD和DataSet比較

    DataSet以Catalyst邏輯執(zhí)行計(jì)劃表示,并且數(shù)據(jù)以編碼的二進(jìn)制形式被存儲(chǔ),不需要反序列化就可以執(zhí)行sorting、shuffle等操作。

    DataSet創(chuàng)立需要一個(gè)顯式的Encoder,把對象序列化為二進(jìn)制,可以把對象的scheme映射為Spark SQl類型,然而RDD依賴于運(yùn)行時(shí)反射機(jī)制。

    通過上面兩點(diǎn),DataSet的性能比RDD的要好很多

    7. DataFrame和DataSet比較

    Dataset可以認(rèn)為是DataFrame的一個(gè)特例,主要區(qū)別是Dataset每一個(gè)record存儲(chǔ)的是一個(gè)強(qiáng)類型值而不是一個(gè)Row。因此具有如下三個(gè)特點(diǎn):

  • DataSet可以在編譯時(shí)檢查類型?
  • 是面向?qū)ο蟮木幊探涌凇S脀ordcount舉例:?
  • 后面版本DataFrame會(huì)繼承DataSet,DataFrame是面向Spark SQL的接口。?
  • DataFrame和DataSet可以相互轉(zhuǎn)化,?df.as[ElementType]這樣可以把DataFrame轉(zhuǎn)化為DataSet,ds.toDF()這樣可以把DataSet轉(zhuǎn)化為DataFrame。

    <span style="color:#000000"><code>//DataFrame// Load a text file and interpret each line as a java.lang.String val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String] val result = ds.flatMap(_.split(" ")) // Split on whitespace.filter(_ != "") // Filter empty words.toDF() // Convert to DataFrame to perform aggregation / sorting.groupBy($"value") // Count number of occurences of each word.agg(count("*") as "numOccurances").orderBy($"numOccurances" desc) // Show most common words first//DataSet,完全使用scala編程,不要切換到DataFrameval wordCount =ds.flatMap(_.split(" ")).filter(_ != "").groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function.count()</code></span>

    8. 應(yīng)用場景

    什么時(shí)候用RDD?使用RDD的一般場景:

    • 你需要使用low-level的transformation和action來控制你的數(shù)據(jù)集;?
    • 你得數(shù)據(jù)集非結(jié)構(gòu)化,比如,流媒體或者文本流;?
    • 你想使用函數(shù)式編程來操作你得數(shù)據(jù),而不是用特定領(lǐng)域語言(DSL)表達(dá);?
    • 你不在乎schema,比如,當(dāng)通過名字或者列處理(或訪問)數(shù)據(jù)屬性不在意列式存儲(chǔ)格式;?
    • 你放棄使用DataFrame和Dataset來優(yōu)化結(jié)構(gòu)化和半結(jié)構(gòu)化數(shù)據(jù)集?

    RDD在Apache Spark 2.0中慘遭拋棄??答案當(dāng)然是 NO !?通過后面的描述你會(huì)得知:Spark用戶可以在RDD,DataFrame和Dataset三種數(shù)據(jù)集之間無縫轉(zhuǎn)換,而是只需使用超級簡單的API方法。

    什么時(shí)候使用DataFrame或者Dataset?

    • 你想使用豐富的語義,high-level抽象,和特定領(lǐng)域語言API,那你可DataFrame或者Dataset;?
    • 你處理的半結(jié)構(gòu)化數(shù)據(jù)集需要high-level表達(dá), filter,map,aggregation,average,sum ,SQL 查詢,列式訪問和使用lambda函數(shù),那你可DataFrame或者Dataset;?
    • 你想利用編譯時(shí)高度的type-safety,Catalyst優(yōu)化和Tungsten的code生成,那你可DataFrame或者Dataset;?
    • 你想統(tǒng)一和簡化API使用跨Spark的Library,那你可DataFrame或者Dataset;?

    如果你是一個(gè)R使用者,那你可DataFrame或者Dataset;?如果你是一個(gè)Python使用者,那你可DataFrame或者Dataset;

    <span style="color:#000000"><code>你可以無縫的把DataFrame或者Dataset轉(zhuǎn)化成一個(gè)RDD,只需簡單的調(diào)用 .rdd:// select specific fields from the Dataset, apply a predicate // using the where() method, convert to an RDD, and show first 10 // RDD rowsval deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300) // convert to RDDs and take the first 10 rowsval eventsRDD = deviceEventsDS.rdd.take(10)</code></span>

    總結(jié)

    以上是生活随笔為你收集整理的分布式实时计算—Spark—Spark Core的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。