日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 人文社科 > 生活经验 >内容正文

生活经验

2021年大数据Spark(四十五):Structured Streaming Sources 输入源

發(fā)布時(shí)間:2023/11/28 生活经验 51 豆豆
生活随笔 收集整理的這篇文章主要介紹了 2021年大数据Spark(四十五):Structured Streaming Sources 输入源 小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

目錄

Sources 輸入源

Socket數(shù)據(jù)源-入門案例

需求

編程實(shí)現(xiàn)

???????文件數(shù)據(jù)源-了解

???????需求

???????代碼實(shí)現(xiàn)

???????Rate source-了解


Sources 輸入源

從Spark 2.0至Spark 2.4版本,目前支持?jǐn)?shù)據(jù)源有4種,其中Kafka 數(shù)據(jù)源使用作為廣泛,其他數(shù)據(jù)源主要用于開發(fā)測(cè)試程序。

文檔http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#input-sources

?

?

?????可以認(rèn)為Structured Streaming = SparkStreaming + SparkSQL,對(duì)流式數(shù)據(jù)處理使用SparkSQL數(shù)據(jù)結(jié)構(gòu),應(yīng)用入口為SparkSession,對(duì)比SparkSQL與SparkStreaming編程:

?Spark Streaming:將流式數(shù)據(jù)按照時(shí)間間隔(BatchInterval)劃分為很多Batch,每批次數(shù)據(jù)封裝在RDD中,底層RDD數(shù)據(jù),構(gòu)建StreamingContext實(shí)時(shí)消費(fèi)數(shù)據(jù);

?Structured Streaming屬于SparkSQL模塊中一部分,對(duì)流式數(shù)據(jù)處理,構(gòu)建SparkSession對(duì)象,指定讀取Stream數(shù)據(jù)和保存Streamn數(shù)據(jù),具體語(yǔ)法格式:

靜態(tài)數(shù)據(jù)

讀取spark.read

保存ds/df.write

?

流式數(shù)據(jù)

讀取spark.readStream

保存ds/df.writeStrem

?

Socket數(shù)據(jù)源-入門案例

需求

http://spark.apache.org/docs/2.4.5/structured-streaming-programming-guide.html#quick-example

實(shí)時(shí)從TCP Socket讀取數(shù)據(jù)(采用nc)實(shí)時(shí)進(jìn)行詞頻統(tǒng)計(jì)WordCount,并將結(jié)果輸出到控制臺(tái)Console。

?

  • Socket 數(shù)據(jù)源

從Socket中讀取UTF8文本數(shù)據(jù)。一般用于測(cè)試,使用nc -lk?端口號(hào)向Socket監(jiān)聽的端口發(fā)送數(shù)據(jù),用于測(cè)試使用,有兩個(gè)參數(shù)必須指定:

1.host

2.port

?

  • Console 接收器

?????將結(jié)果數(shù)據(jù)打印到控制臺(tái)或者標(biāo)準(zhǔn)輸出,通常用于測(cè)試或Bedug使用,三種輸出模式OutputMode(Append、Update、Complete)都支持,兩個(gè)參數(shù)可設(shè)置:

1.numRows,打印多少條數(shù)據(jù),默認(rèn)為20條;

2.truncate,如果某列值字符串太長(zhǎng)是否截取,默認(rèn)為true,截取字符串;

?

編程實(shí)現(xiàn)

?

完整案例代碼如下:

package cn.itcast.structedstreamingimport org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 使用Structured Streaming從TCP Socket實(shí)時(shí)讀取數(shù)據(jù),進(jìn)行詞頻統(tǒng)計(jì),將結(jié)果打印到控制臺(tái)。*/
object StructuredWordCount {def main(args: Array[String]): Unit = {//TODO: 0. 環(huán)境val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2") // 設(shè)置Shuffle分區(qū)數(shù)目.getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO: 1. 從TCP Socket 讀取數(shù)據(jù)val inputStreamDF: DataFrame = spark.readStream.format("socket").option("host", "node1").option("port", 9999).load()//注意:返回的df不是普通的分布式表,而是實(shí)時(shí)流數(shù)據(jù)對(duì)應(yīng)的分布式的無(wú)界表!//df.show()//注意:該寫法是離線的寫法,會(huì)報(bào)錯(cuò),所以應(yīng)使用實(shí)時(shí)的寫法:Queries with streaming sources must be executed with writeStream.start();inputStreamDF.printSchema()// TODO: 2. 業(yè)務(wù)分析:詞頻統(tǒng)計(jì)WordCountval resultStreamDF: DataFrame = inputStreamDF.as[String].filter(StringUtils.isNotBlank(_)).flatMap(_.trim.split("\\s+")).groupBy($"value").count()//.orderBy($"count".desc)resultStreamDF.printSchema()// TODO: 3. 設(shè)置Streaming應(yīng)用輸出及啟動(dòng)val query: StreamingQuery = resultStreamDF.writeStream//- append:默認(rèn)的追加模式,將新的數(shù)據(jù)輸出!只支持簡(jiǎn)單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數(shù)據(jù)輸出,支持聚合和排序//- update:更新模式,將有變化的數(shù)據(jù)輸出,支持聚合但不支持排序,如果沒(méi)有聚合就和append一樣//.outputMode(OutputMode.Append())//.outputMode(OutputMode.Complete()).outputMode(OutputMode.Update()).format("console").option("numRows", "10").option("truncate", "false")// 流式應(yīng)用,需要啟動(dòng)start.start()// 流式查詢等待流式應(yīng)用終止query.awaitTermination()// 等待所有任務(wù)運(yùn)行完成才停止運(yùn)行query.stop()}
}

?

???????文件數(shù)據(jù)源-了解

將目錄中寫入的文件作為數(shù)據(jù)流讀取,支持的文件格式為:text、csv、json、orc、parquet

?

???????需求

監(jiān)聽某一個(gè)目錄,讀取csv格式數(shù)據(jù),統(tǒng)計(jì)年齡小于25歲的人群的愛好排行榜。

測(cè)試數(shù)據(jù)

jack1;23;runningjack2;23;runningjack3;23;runningbob1;20;swimmingbob2;20;swimmingtom1;28;footballtom2;28;footballtom3;28;footballtom4;28;football

?

???????代碼實(shí)現(xiàn)

package cn.itcast.structedstreamingimport org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/*** 使用Structured Streaming從目錄中讀取文件數(shù)據(jù):統(tǒng)計(jì)年齡小于25歲的人群的愛好排行榜*/
object StructuredFileSource {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO: 從文件系統(tǒng),監(jiān)控目錄,讀取CSV格式數(shù)據(jù)// 數(shù)據(jù)格式:// jack;23;runningval csvSchema: StructType = new StructType().add("name", StringType, nullable = true).add("age", IntegerType, nullable = true).add("hobby", StringType, nullable = true)val inputStreamDF: DataFrame = spark.readStream.option("sep", ";").option("header", "false")// 指定schema信息.schema(csvSchema).csv("data/input/persons")// 依據(jù)業(yè)務(wù)需求,分析數(shù)據(jù):統(tǒng)計(jì)年齡小于25歲的人群的愛好排行榜val resultStreamDF: Dataset[Row] = inputStreamDF.filter($"age" < 25).groupBy($"hobby").count().orderBy($"count".desc)// 設(shè)置Streaming應(yīng)用輸出及啟動(dòng)val query: StreamingQuery = resultStreamDF.writeStream//- append:默認(rèn)的追加模式,將新的數(shù)據(jù)輸出!只支持簡(jiǎn)單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數(shù)據(jù)輸出,支持聚合和排序//- update:更新模式,將有變化的數(shù)據(jù)輸出,支持聚合但不支持排序,如果沒(méi)有聚合就和append一樣.outputMode(OutputMode.Complete()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}

???????Rate source-了解

以每秒指定的行數(shù)生成數(shù)據(jù),每個(gè)輸出行包含2個(gè)字段:timestampvalue

其中timestamp是一個(gè)Timestamp含有信息分配的時(shí)間類型,并且value是Long(包含消息的計(jì)數(shù)從0開始作為第一行)類型。此源用于測(cè)試和基準(zhǔn)測(cè)試,可選參數(shù)如下:

?

演示范例代碼如下:

package cn.itcast.structedstreamingimport org.apache.spark.SparkContext
import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, Trigger}
import org.apache.spark.sql.{DataFrame, SparkSession}/*** 數(shù)據(jù)源:Rate Source,以每秒指定的行數(shù)生成數(shù)據(jù),每個(gè)輸出行包含一個(gè)timestamp和value。*/
object StructuredRateSource {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.stripSuffix("$")).master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._import org.apache.spark.sql.functions._// TODO:從Rate數(shù)據(jù)源實(shí)時(shí)消費(fèi)數(shù)據(jù)val rateStreamDF: DataFrame = spark.readStream.format("rate").option("rowsPerSecond", "10") // 每秒生成數(shù)據(jù)條數(shù).option("rampUpTime", "0s") // 每條數(shù)據(jù)生成間隔時(shí)間.option("numPartitions", "2") // 分區(qū)數(shù)目.load()rateStreamDF.printSchema()//root// |-- timestamp: timestamp (nullable = true)// |-- value: long (nullable = true)// 3. 設(shè)置Streaming應(yīng)用輸出及啟動(dòng)val query: StreamingQuery = rateStreamDF.writeStream//- append:默認(rèn)的追加模式,將新的數(shù)據(jù)輸出!只支持簡(jiǎn)單查詢,如果涉及的聚合就不支持了//- complete:完整模式,將完整的數(shù)據(jù)輸出,支持聚合和排序//- update:更新模式,將有變化的數(shù)據(jù)輸出,支持聚合但不支持排序,如果沒(méi)有聚合就和append一樣.outputMode(OutputMode.Append()).format("console").option("numRows", "10").option("truncate", "false").start()query.awaitTermination()query.stop()}
}

?

總結(jié)

以上是生活随笔為你收集整理的2021年大数据Spark(四十五):Structured Streaming Sources 输入源的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。