日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問(wèn) 生活随笔!

生活随笔

當(dāng)前位置: 首頁(yè) > 编程资源 > 编程问答 >内容正文

编程问答

Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime

發(fā)布時(shí)間:2024/9/27 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime 小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.

一、Flink流處理簡(jiǎn)介

Flink流處理的API叫做DataStream,可以在保證Exactly-Once的前提下提供高吞吐、低延時(shí)的實(shí)時(shí)流處理。

二、Flink中的Time模型

Flink中提供了3種時(shí)間模型:EventTime、ProcessingTime、與Ingestion Time。底層實(shí)現(xiàn)上分為2種:Processing Time與Event Time,而Ingestion Time本質(zhì)上也是一種Event Time,可以通過(guò)官方文檔上的一張圖展現(xiàn)是3者的區(qū)別:

Event Time:事件產(chǎn)生的時(shí)間,即數(shù)據(jù)產(chǎn)生時(shí)自帶時(shí)間戳,例如‘2016/06/17 11:04:00.960’
Ingestion Time:數(shù)據(jù)進(jìn)入到Flink的時(shí)間,即數(shù)據(jù)進(jìn)入source operator時(shí)獲取時(shí)間戳
Processing Time:系統(tǒng)時(shí)間,與數(shù)據(jù)本身的時(shí)間戳無(wú)關(guān),即在window窗口內(nèi)計(jì)算完成的時(shí)間(默認(rèn)的Time)

關(guān)于Event Time,需要指出的是:數(shù)據(jù)產(chǎn)生的時(shí)間,編程時(shí)首先就是要告訴Flink,哪一列作為Event Time列,同時(shí)分配時(shí)間戳(TimeStamp)并發(fā)出水位線(WaterMark),來(lái)跟蹤Event Time。簡(jiǎn)單理解,就是以Event Time列作為時(shí)間。水位線既然是用來(lái)標(biāo)記Event Time的,那么Event Time在產(chǎn)生時(shí)有可能因?yàn)榫W(wǎng)絡(luò)或程序錯(cuò)誤導(dǎo)致的時(shí)間亂序,即Late Element的產(chǎn)生,因此WaterMark分為有序與無(wú)序2種:

關(guān)于Late Element,舉個(gè)例子說(shuō)明:數(shù)據(jù)隨著時(shí)間的流逝而產(chǎn)生,即數(shù)據(jù)的產(chǎn)生本是升序的,當(dāng)Flink采用Event Time作為時(shí)間模型時(shí),理論上也應(yīng)該是升序的數(shù)據(jù)不斷的進(jìn)行計(jì)算。但是突然有個(gè)“延遲的”數(shù)據(jù)進(jìn)入到了Flink,此時(shí)時(shí)間窗口已過(guò),那么這個(gè)“延遲的”數(shù)據(jù)就不會(huì)被正確的計(jì)算。
對(duì)于這些數(shù)據(jù),流處理的可能無(wú)法實(shí)時(shí)正確計(jì)算,因?yàn)閃arterMark不可能無(wú)限制的等待Late Element的到來(lái),所以可以通過(guò)之后的批處理(batch)對(duì)已經(jīng)計(jì)算的數(shù)據(jù)進(jìn)行更正。

三、Flink流處理編程的步驟

共5個(gè)步驟:
1、獲取DataStream的運(yùn)行環(huán)境
2、從Source中創(chuàng)建DataStream
3、在DataStream上進(jìn)行transformation操作
4、將結(jié)果輸出
5、執(zhí)行流處理程序

四、程序說(shuō)明

說(shuō)明: IDE:IntelliJ IDEA Community Edition(From JetBrains) 開(kāi)發(fā)語(yǔ)言: Scala 2.10 運(yùn)行環(huán)境:Flink 1.0.3 集群(1個(gè)JobManager+2個(gè)TaskManager) 程序提交:客戶端CLI 管理工具:maven 3.3.9

五、程序演示–體會(huì)Event Time

關(guān)鍵點(diǎn):
設(shè)置Event time characteristic:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

復(fù)寫(xiě)map方法,實(shí)現(xiàn)稍微復(fù)雜點(diǎn)的map transaction:

map(new EventTimeFunction)

分配timestamp以及watermark:

val timeValue = parsedStream.assignAscendingTimestamps(_._2)

在dataStream上運(yùn)行keyBy操作,產(chǎn)生keyedStream,繼而在keyedStream上運(yùn)行window操作,產(chǎn)生windowedStream,此時(shí),windowedStream包含的元素主要包含3方面:K->key,W->window,T->Iterable[(…)],即每個(gè)key在特定窗口內(nèi)的元素的集合。不同的stream之間的互相調(diào)用,可以參考:

在windowedStream上聚合sum:

val sumVolumePerMinute = timeValue.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.minutes(1))).sum(3).name("sum volume per minute")

運(yùn)行程序,測(cè)試結(jié)果:
輸入:

600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085 600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085 600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085 ......

輸出如下:

(60000,20160520093000960,600) (60000,20160520093101000,300) ...

可以看出,結(jié)果就是按照Event Time的時(shí)間窗口計(jì)算得出的,而無(wú)關(guān)系統(tǒng)的時(shí)間(包括輸入的快慢)。
Event Time Test的詳細(xì)完整代碼如下:

import java.text.SimpleDateFormat import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows import org.apache.flink.streaming.api.windowing.time.Time/*** 這是一個(gè)簡(jiǎn)單的Flink DataStream程序,實(shí)現(xiàn)每分鐘的累計(jì)成交量* source:通過(guò)SocketStream模擬kafka消費(fèi)數(shù)據(jù)* sink:直接print輸出到local,以后要實(shí)現(xiàn)sink到HDFS以及寫(xiě)到Redis* 技術(shù)點(diǎn):* 1、采用EventTime統(tǒng)計(jì)每分鐘的累計(jì)成交量,而不是系統(tǒng)時(shí)鐘(processing Time)* 2、將輸入的時(shí)間合并并生成Long類型的毫秒時(shí)間,以此作為T(mén)imestamp,生成Timestamp和WaterMark* 3、采用TumblingEventTimeWindow作為窗口,即翻滾窗口,不重疊的范圍內(nèi)實(shí)現(xiàn)統(tǒng)計(jì)*/ object TransactionSumVolume1 {case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long)def main(args: Array[String]): Unit = {/*** when Running the program, you should input 2 parameters: hostname and port of Socket*/if (args.length != 2) {System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName = args(0)val port = args(1).toInt/*** Step 1. Obtain an execution environment for DataStream operation* set EventTime instead of Processing Time*/val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)/*** Step 2. Create DataStream from socket*/val input = env.socketTextStream(hostName,port)/*** Step 3. Implement '分鐘成交量' logic*//*** parse input stream to a new Class which is implement the Map function*/val parsedStream = input.map(new EventTimeFunction)/*** assign Timestamp and WaterMark for Event time: eventTime(params should be a Long type)*/val timeValue = parsedStream.assignAscendingTimestamps(_._2)val sumVolumePerMinute = timeValue.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.minutes(1))).sum(3).name("sum volume per minute")/*** Step 4. Sink the final result to standard output(.out file)*/sumVolumePerMinute.map(value => (value._1,value._3,value._4)).print()/*** Step 5. program execution*/env.execute("SocketTextStream for sum of volume Example")}class EventTimeFunction extends MapFunction[String, (Long, Long, String, Long)] {def map(s: String): (Long, Long, String, Long) = {val columns = s.split(",")val transaction : Transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,columns(12).toLong,columns(13).toLong)val format = new SimpleDateFormat("yyyyMMddHHmmssSSS")val volume : Long = transaction.nVolumeval szCode : Long = transaction.szCodeif (transaction.nTime.length == 8 ) {val eventTimeString = transaction.nAction + '0' + transaction.nTimeval eventTime : Long= format.parse(eventTimeString).getTime(szCode, eventTime, eventTimeString, volume)}else {val eventTimeString = transaction.nAction + transaction.nTimeval eventTime = format.parse(eventTimeString).getTime(szCode, eventTime, eventTimeString, volume)}}}}

六、程序演示–體會(huì)Processing Time

關(guān)鍵點(diǎn):
設(shè)置TumblingProcessingTimeWindow,由于默認(rèn)的Time Characteristic就是Processing Time,因此不用特別指定,在windowed assign時(shí),只需指定系統(tǒng)自帶的timeWindow即可:

timeWindow(Time.seconds(15))

在windowedStream之后,需要進(jìn)行聚合操作,產(chǎn)生新的DataStream。系統(tǒng)提供了sum、reduce、fold等操作,但是如果遇到窗口內(nèi)的計(jì)算非常復(fù)雜的情況,則需要采用apply{…}方法。windowedStream.apply{}的方法可參考源碼:org.apache.flink.streaming.api.scala.WindowedStream.scala
提供了6種不同的方法,詳情見(jiàn):
WindowedStream.scala

這個(gè)測(cè)試就是在windowedStream上調(diào)用了apply方法,實(shí)現(xiàn)了稍微復(fù)雜的運(yùn)算:

.apply{ (k : Long, w : TimeWindow, T: Iterable[(Long, Long, Long)], out : Collector[(Long,String,String,Double)]) =>var sumVolume : Long = 0var sumTurnover : Long = 0for(elem <- T){sumVolume = sumVolume + elem._2sumTurnover = sumTurnover + elem._3}val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")val vwap : Double = BigDecimal(String.valueOf(sumTurnover))./ (BigDecimal(String.valueOf(sumVolume))).setScale(2,BigDecimal.RoundingMode.HALF_UP).toDoubleout.collect((k,format.format(w.getStart),format.format(w.getEnd),vwap))}

運(yùn)行程序,測(cè)試結(jié)果:
輸入(由于是Processing Time,輸入時(shí)要注意時(shí)間間隔,超過(guò)15秒的就會(huì)產(chǎn)生新窗口,我的操作是前2條數(shù)據(jù)同時(shí)輸入,隔一段時(shí)間后輸入第3條數(shù)據(jù)):

600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085 600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085 600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085 ......

結(jié)果如下:

(60000,2016-06-16 17:56:00.000,2016-06-16 17:56:15.000,3.0) (60000,2016-06-16 17:58:15.000,2016-06-16 17:58:30.000,4.0)

可以看到,這個(gè)結(jié)果跟事件的時(shí)間沒(méi)有任何關(guān)系,只跟系統(tǒng)處理完成的時(shí)間有關(guān)。
完整的代碼如下:

import java.text.SimpleDateFormatimport org.apache.flink.api.common.functions.MapFunction import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.util.Collector/*** 這個(gè)Flink DataStream程序,實(shí)現(xiàn)“每15秒的加權(quán)平均價(jià)--VWAP”* source:通過(guò)SocketStream模擬kafka消費(fèi)數(shù)據(jù)* sink:直接print輸出到local,以后要實(shí)現(xiàn)sink到HDFS以及寫(xiě)到Redis* 技術(shù)點(diǎn):* 1、采用默認(rèn)的Processing Time統(tǒng)計(jì)每15秒鐘的加權(quán)平均價(jià)* 2、采用TumblingProcessingTimeWindow作為窗口,即翻滾窗口,系統(tǒng)時(shí)鐘,不重疊的范圍內(nèi)實(shí)現(xiàn)統(tǒng)計(jì)* 3、在WindowedStream上實(shí)現(xiàn)自定義的apply算法,即加權(quán)平均價(jià),而非簡(jiǎn)單的Aggregation*/object TransactionVWap {case class Transaction(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long)def main(args: Array[String]): Unit = {/*** when Running the program, you should input 2 parameters: hostname and port of Socket*/if (args.length != 2) {System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName = args(0)val port = args(1).toInt/*** Step 1. Obtain an execution environment for DataStream operation* set EventTime instead of Processing Time*/val env = StreamExecutionEnvironment.getExecutionEnvironment//Processing time is also the Default TimeCharacteristicenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)/*** Step 2. Create DataStream from socket*/val input = env.socketTextStream(hostName,port)/*** Step 3. Implement '每15秒加權(quán)平均價(jià)-VWAP' logic* Note: windowedStream contains 3 attributes: T=>elements, K=>key, W=>window*/val sumVolumePerMinute = input//transform Transaction to tuple(szCode, volume, turnover).map(new VwapField)//partition by szCode.keyBy(_._1)//building Tumbling window for 15 seconds.timeWindow(Time.seconds(15))//compute VWAP in window.apply{ (k : Long, w : TimeWindow, T: Iterable[(Long, Long, Long)], out : Collector[(Long,String,String,Double)]) =>var sumVolume : Long = 0var sumTurnover : Long = 0for(elem <- T){sumVolume = sumVolume + elem._2sumTurnover = sumTurnover + elem._3}val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")val vwap : Double = BigDecimal(String.valueOf(sumTurnover))./ (BigDecimal(String.valueOf(sumVolume))).setScale(2,BigDecimal.RoundingMode.HALF_UP).toDoubleout.collect((k,format.format(w.getStart),format.format(w.getEnd),vwap))}.name("VWAP per 15 seconds")/*** Step 4. Sink the final result to standard output(.out file)*/sumVolumePerMinute.print()/*** Step 5. program execution*/env.execute("SocketTextStream for sum of volume Example")}class VwapField extends MapFunction[String, (Long, Long, Long)] {def map(s: String): (Long, Long, Long) = {val columns = s.split(",")val transaction : Transaction = Transaction(columns(0),columns(1).toLong,columns(2),columns(3),columns(4).toLong,columns(5).toLong,columns(6).toLong,columns(7).toLong,columns(8).toLong,columns(9).toInt,columns(9),columns(10),columns(11).toLong,columns(12).toLong,columns(13).toLong)val volume : Long = transaction.nVolumeval szCode : Long = transaction.szCodeval turnover : Long = transaction.nTurnover(szCode, volume, turnover)}} }

七、總結(jié)

何時(shí)用Event Time,何時(shí)用Processing Time,這個(gè)要看具體的業(yè)務(wù)場(chǎng)景。
同時(shí),對(duì)于Event Time中的Late Element,大家可以自己模擬輸入,看看結(jié)果如何。
自定義function與operator都應(yīng)該是有狀態(tài)的,以便恢復(fù),這里簡(jiǎn)化,并沒(méi)有設(shè)置state。

參考文檔 1.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101 2.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102 3.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html 4.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html 5.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html 6.http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-9/ 7.http://www.cnblogs.com/fxjwind/p/5434572.html 8.https://github.com/apache/flink/ 9.http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/ 10.http://data-artisans.com/blog/ 11.http://dataartisans.github.io/flink-training/

總結(jié)

以上是生活随笔為你收集整理的Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。

如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。