Apache Spark学习:利用Scala语言开发Spark应用程序
本文將介紹3個(gè)Scala Spark編程實(shí)例,分別是WordCount、TopK和SparkJoin,分別代表了Spark的三種典型應(yīng)用。
1. WordCount編程實(shí)例
WordCount是一個(gè)最簡(jiǎn)單的分布式應(yīng)用實(shí)例,主要功能是統(tǒng)計(jì)輸入目錄中所有單詞出現(xiàn)的總次數(shù),編寫步驟如下:
步驟1:創(chuàng)建一個(gè)SparkContext對(duì)象,該對(duì)象有四個(gè)參數(shù):Spark master位置、應(yīng)用程序名稱,Spark安裝目錄和jar存放位置,對(duì)于Spark On YARN而言,最重要的是前兩個(gè)參數(shù),第一個(gè)參數(shù)指定為“yarn-standalone”,第二個(gè)參數(shù)是自定義的字符串,舉例如下:
| 1 2 | val sc = new SparkContext(args(0), "WordCount", ????System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) |
步驟2:讀取輸入數(shù)據(jù)。我們要從HDFS上讀取文本數(shù)據(jù),可以使用SparkContext中的textFile函數(shù)將輸入文件轉(zhuǎn)換為一個(gè)RDD,該函數(shù)采用的是Hadoop中的TextInputFormat解析輸入數(shù)據(jù),舉例如下:
| 1 | val textFile = sc.textFile(args(1)) |
當(dāng)然,Spark允許你采用任何Hadoop InputFormat,比如二進(jìn)制輸入格式SequenceFileInputFormat,此時(shí)你可以使用SparkContext中的hadoopRDD函數(shù),舉例如下:
| 1 2 | val inputFormatClass = classOf[SequenceFileInputFormat[Text,Text]] var hadoopRdd = sc.hadoopRDD(conf, inputFormatClass, classOf[Text], classOf[Text]) |
或者直接創(chuàng)建一個(gè)HadoopRDD對(duì)象:
| 1 2 | var hadoopRdd = new HadoopRDD(sc, conf, ?????classOf[SequenceFileInputFormat[Text,Text, classOf[Text], classOf[Text]) |
步驟3:通過RDD轉(zhuǎn)換算子操作和轉(zhuǎn)換RDD,對(duì)于WordCount而言,首先需要從輸入數(shù)據(jù)中每行字符串中解析出單詞,然后將相同單詞放到一個(gè)桶中,最后統(tǒng)計(jì)每個(gè)桶中每個(gè)單詞出現(xiàn)的頻率,舉例如下:
| 1 2 3 | ????val result = hadoopRdd.flatMap{ ????????case(key, value)? => value.toString().split("\\s+"); }.map(word => (word, 1)). reduceByKey (_ + _) |
其中,flatMap函數(shù)可以將一條記錄轉(zhuǎn)換成多條記錄(一對(duì)多關(guān)系),map函數(shù)將一條記錄轉(zhuǎn)換為另一條記錄(一對(duì)一關(guān)系),reduceByKey函數(shù)將key相同的數(shù)據(jù)劃分到一個(gè)桶中,并以key為單位分組進(jìn)行計(jì)算,這些函數(shù)的具體含義可參考:Spark Transformation。
步驟4:將產(chǎn)生的RDD數(shù)據(jù)集保存到HDFS上。可以使用SparkContext中的saveAsTextFile哈數(shù)將數(shù)據(jù)集保存到HDFS目錄下,默認(rèn)采用Hadoop提供的TextOutputFormat,每條記錄以“(key,value)”的形式打印輸出,你也可以采用saveAsSequenceFile函數(shù)將數(shù)據(jù)保存為SequenceFile格式等,舉例如下:
| 1 | result.saveAsSequenceFile(args(2)) |
當(dāng)然,一般我們寫Spark程序時(shí),需要包含以下兩個(gè)頭文件:
| 1 2 | import org.apache.spark._ import SparkContext._ |
WordCount完整程序已在“Apache Spark學(xué)習(xí):利用Eclipse構(gòu)建Spark集成開發(fā)環(huán)境”一文中進(jìn)行了介紹,在次不贅述。
需要注意的是,指定輸入輸出文件時(shí),需要指定hdfs的URI,比如輸入目錄是hdfs://hadoop-test/tmp/input,輸出目錄是hdfs://hadoop-test/tmp/output,其中,“hdfs://hadoop-test”是由Hadoop配置文件core-site.xml中參數(shù)fs.default.name指定的,具體替換成你的配置即可。
2. TopK編程實(shí)例
TopK程序的任務(wù)是對(duì)一堆文本進(jìn)行詞頻統(tǒng)計(jì),并返回出現(xiàn)頻率最高的K個(gè)詞。如果采用MapReduce實(shí)現(xiàn),則需要編寫兩個(gè)作業(yè):WordCount和TopK,而使用Spark則只需一個(gè)作業(yè),其中WordCount部分已由前面實(shí)現(xiàn)了,接下來順著前面的實(shí)現(xiàn),找到Top K個(gè)詞。注意,本文的實(shí)現(xiàn)并不是最優(yōu)的,有很大改進(jìn)空間。
步驟1:首先需要對(duì)所有詞按照詞頻排序,如下:
| 1 2 3 | val sorted = result.map { ??case(key, value) => (value, key); //exchange key and value }.sortByKey(true, 1) |
步驟2:返回前K個(gè):
| 1 | val topK = sorted.top(args(3).toInt) |
步驟3:將K各詞打印出來:
| 1 | topK.foreach(println) |
注意,對(duì)于應(yīng)用程序標(biāo)準(zhǔn)輸出的內(nèi)容,YARN將保存到Container的stdout日志中。在YARN中,每個(gè)Container存在三個(gè)日志文件,分別是stdout、stderr和syslog,前兩個(gè)保存的是標(biāo)準(zhǔn)輸出產(chǎn)生的內(nèi)容,第三個(gè)保存的是log4j打印的日志,通常只有第三個(gè)日志中有內(nèi)容。
本程序完整代碼、編譯好的jar包和運(yùn)行腳本可以從這里下載。下載之后,按照“Apache Spark學(xué)習(xí):利用Eclipse構(gòu)建Spark集成開發(fā)環(huán)境”一文操作流程運(yùn)行即可。
3. SparkJoin編程實(shí)例
在推薦領(lǐng)域有一個(gè)著名的開放測(cè)試集是movielens給的,下載鏈接是:http://grouplens.org/datasets/movielens/,該測(cè)試集包含三個(gè)文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt,本節(jié)給出的SparkJoin實(shí)例則通過連接ratings.dat和movies.dat兩個(gè)文件得到平均得分超過4.0的電影列表,采用的數(shù)據(jù)集是:ml-1m。程序代碼如下:
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | import org.apache.spark._ import SparkContext._ object SparkJoin { ??def main(args: Array[String]) { ????if (args.length != 4 ){ ??????println("usage is org.test.WordCount <master> <rating> <movie> <output>") ??????return ????} ????val sc = new SparkContext(args(0), "WordCount", ????System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) ????// Read rating from HDFS file ????val textFile = sc.textFile(args(1)) ????//extract (movieid, rating) ????val rating = textFile.map(line => { ????????val fileds = line.split("::") ????????(fileds(1).toInt, fileds(2).toDouble) ???????}) ????val movieScores = rating ???????.groupByKey() ???????.map(data => { ?????????val avg = data._2.sum / data._2.size ?????????(data._1, avg) ???????}) ?????// Read movie from HDFS file ?????val movies = sc.textFile(args(2)) ?????val movieskey = movies.map(line => { ???????val fileds = line.split("::") ????????(fileds(0).toInt, fileds(1)) ?????}).keyBy(tup => tup._1) ?????// by join, we get <movie, averageRating, movieName> ?????val result = movieScores ???????.keyBy(tup => tup._1) ???????.join(movieskey) ???????.filter(f => f._2._1._2 > 4.0) ???????.map(f => (f._1, f._2._1._2, f._2._2._2)) ????result.saveAsTextFile(args(3)) ??} } |
你可以從這里下載代碼、編譯好的jar包和運(yùn)行腳本。
這個(gè)程序直接使用Spark編寫有些麻煩,可以直接在Shark上編寫HQL實(shí)現(xiàn),Shark是基于Spark的類似Hive的交互式查詢引擎,具體可參考:Shark。
4. 總結(jié)
Spark 程序設(shè)計(jì)對(duì)Scala語言的要求不高,正如Hadoop程序設(shè)計(jì)對(duì)Java語言要求不高一樣,只要掌握了最基本的語法就能編寫程序,且常見的語法和表達(dá)方式是很少的。通常,剛開始仿照官方實(shí)例編寫程序,包括Scala、Java和Python三種語言實(shí)例。
原創(chuàng)文章,轉(zhuǎn)載請(qǐng)注明:?轉(zhuǎn)載自董的博客
本文鏈接地址:?http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/
總結(jié)
以上是生活随笔為你收集整理的Apache Spark学习:利用Scala语言开发Spark应用程序的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: YARN编程实例—Unmanaged A
- 下一篇: 利用Hadoop Streaming处理