一、Flink流處理簡(jiǎn)介
Flink流處理的API叫做DataStream,可以在保證Exactly-Once的前提下提供高吞吐、低延時(shí)的實(shí)時(shí)流處理。
二、Flink中的Time模型
Flink中提供了3種時(shí)間模型:EventTime、ProcessingTime、與Ingestion Time。底層實(shí)現(xiàn)上分為2種:Processing Time與Event Time,而Ingestion Time本質(zhì)上也是一種Event Time,可以通過(guò)官方文檔上的一張圖展現(xiàn)是3者的區(qū)別:
Event Time:事件產(chǎn)生的時(shí)間,即數(shù)據(jù)產(chǎn)生時(shí)自帶時(shí)間戳,例如‘2016/06/17 11:04:00.960’
Ingestion Time:數(shù)據(jù)進(jìn)入到Flink的時(shí)間,即數(shù)據(jù)進(jìn)入source operator時(shí)獲取時(shí)間戳
Processing Time:系統(tǒng)時(shí)間,與數(shù)據(jù)本身的時(shí)間戳無(wú)關(guān),即在window窗口內(nèi)計(jì)算完成的時(shí)間(默認(rèn)的Time)
關(guān)于Event Time,需要指出的是:數(shù)據(jù)產(chǎn)生的時(shí)間,編程時(shí)首先就是要告訴Flink,哪一列作為Event Time列,同時(shí)分配時(shí)間戳(TimeStamp)并發(fā)出水位線(WaterMark),來(lái)跟蹤Event Time。簡(jiǎn)單理解,就是以Event Time列作為時(shí)間。水位線既然是用來(lái)標(biāo)記Event Time的,那么Event Time在產(chǎn)生時(shí)有可能因?yàn)榫W(wǎng)絡(luò)或程序錯(cuò)誤導(dǎo)致的時(shí)間亂序,即Late Element的產(chǎn)生,因此WaterMark分為有序與無(wú)序2種:
關(guān)于Late Element,舉個(gè)例子說(shuō)明:數(shù)據(jù)隨著時(shí)間的流逝而產(chǎn)生,即數(shù)據(jù)的產(chǎn)生本是升序的,當(dāng)Flink采用Event Time作為時(shí)間模型時(shí),理論上也應(yīng)該是升序的數(shù)據(jù)不斷的進(jìn)行計(jì)算。但是突然有個(gè)“延遲的”數(shù)據(jù)進(jìn)入到了Flink,此時(shí)時(shí)間窗口已過(guò),那么這個(gè)“延遲的”數(shù)據(jù)就不會(huì)被正確的計(jì)算。
對(duì)于這些數(shù)據(jù),流處理的可能無(wú)法實(shí)時(shí)正確計(jì)算,因?yàn)閃arterMark不可能無(wú)限制的等待Late Element的到來(lái),所以可以通過(guò)之后的批處理(batch)對(duì)已經(jīng)計(jì)算的數(shù)據(jù)進(jìn)行更正。
三、Flink流處理編程的步驟
共5個(gè)步驟:
1、獲取DataStream的運(yùn)行環(huán)境
2、從Source中創(chuàng)建DataStream
3、在DataStream上進(jìn)行transformation操作
4、將結(jié)果輸出
5、執(zhí)行流處理程序
四、程序說(shuō)明
說(shuō)明:
IDE:IntelliJ IDEA Community Edition(From JetBrains)
開(kāi)發(fā)語(yǔ)言: Scala 2.10
運(yùn)行環(huán)境:Flink 1.0.3 集群(1個(gè)JobManager+2個(gè)TaskManager)
程序提交:客戶端CLI
管理工具:maven 3.3.9
五、程序演示–體會(huì)Event Time
關(guān)鍵點(diǎn):
設(shè)置Event time characteristic:
env.setStreamTimeCharacteristic
(TimeCharacteristic.EventTime
)
復(fù)寫(xiě)map方法,實(shí)現(xiàn)稍微復(fù)雜點(diǎn)的map transaction:
map
(new EventTimeFunction
)
分配timestamp以及watermark:
val timeValue
= parsedStream.assignAscendingTimestamps
(_._2
)
在dataStream上運(yùn)行keyBy操作,產(chǎn)生keyedStream,繼而在keyedStream上運(yùn)行window操作,產(chǎn)生windowedStream,此時(shí),windowedStream包含的元素主要包含3方面:K->key,W->window,T->Iterable[(…)],即每個(gè)key在特定窗口內(nèi)的元素的集合。不同的stream之間的互相調(diào)用,可以參考:
在windowedStream上聚合sum:
val sumVolumePerMinute
= timeValue.keyBy
(_._1
).window
(TumblingEventTimeWindows.of
(Time.minutes
(1
))).sum
(3
).name
("sum volume per minute")
運(yùn)行程序,測(cè)試結(jié)果:
輸入:
600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......
輸出如下:
(60000,20160520093000960,600)
(60000,20160520093101000,300)
...
可以看出,結(jié)果就是按照Event Time的時(shí)間窗口計(jì)算得出的,而無(wú)關(guān)系統(tǒng)的時(shí)間(包括輸入的快慢)。
Event Time Test的詳細(xì)完整代碼如下:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time/*** 這是一個(gè)簡(jiǎn)單的Flink DataStream程序,實(shí)現(xiàn)每分鐘的累計(jì)成交量* source:通過(guò)SocketStream模擬kafka消費(fèi)數(shù)據(jù)* sink:直接print輸出到local,以后要實(shí)現(xiàn)sink到HDFS以及寫(xiě)到Redis* 技術(shù)點(diǎn):* 1、采用EventTime統(tǒng)計(jì)每分鐘的累計(jì)成交量,而不是系統(tǒng)時(shí)鐘(processing Time)* 2、將輸入的時(shí)間合并并生成Long類型的毫秒時(shí)間,以此作為T(mén)imestamp,生成Timestamp和WaterMark* 3、采用TumblingEventTimeWindow作為窗口,即翻滾窗口,不重疊的范圍內(nèi)實(shí)現(xiàn)統(tǒng)計(jì)*/
object TransactionSumVolume1
{case class Transaction
(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long
)def main
(args: Array
[String
]): Unit
= {/*** when Running the program, you should input 2 parameters:
hostname and port of Socket*/
if (args.length
!= 2
) {System.err.println
("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName
= args
(0
)val port
= args
(1
).toInt/*** Step 1. Obtain an execution environment
for DataStream operation*
set EventTime instead of Processing Time*/val
env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic
(TimeCharacteristic.EventTime
)/*** Step 2. Create DataStream from socket*/val input
= env.socketTextStream
(hostName,port
)/*** Step 3. Implement
'分鐘成交量' logic*//*** parse input stream to a new Class
which is implement the Map
function*/val parsedStream
= input.map
(new EventTimeFunction
)/*** assign Timestamp and WaterMark
for Event time: eventTime
(params should be a Long type
)*/val timeValue
= parsedStream.assignAscendingTimestamps
(_._2
)val sumVolumePerMinute
= timeValue.keyBy
(_._1
).window
(TumblingEventTimeWindows.of
(Time.minutes
(1
))).sum
(3
).name
("sum volume per minute")/*** Step 4. Sink the final result to standard output
(.out file
)*/sumVolumePerMinute.map
(value
=> (value._1,value._3,value._4
)).print
()/*** Step 5. program execution*/env.execute
("SocketTextStream for sum of volume Example")}class EventTimeFunction extends MapFunction
[String,
(Long, Long, String, Long
)] {def map
(s: String
):
(Long, Long, String, Long
) = {val columns
= s.split
(",")val transaction
: Transaction
= Transaction
(columns
(0
),columns
(1
).toLong,columns
(2
),columns
(3
),columns
(4
).toLong,columns
(5
).toLong,columns
(6
).toLong,columns
(7
).toLong,columns
(8
).toLong,columns
(9
).toInt,columns
(9
),columns
(10
),columns
(11
).toLong,columns
(12
).toLong,columns
(13
).toLong
)val
format = new SimpleDateFormat
("yyyyMMddHHmmssSSS")val volume
: Long
= transaction.nVolumeval szCode
: Long
= transaction.szCode
if (transaction.nTime.length
== 8
) {val eventTimeString
= transaction.nAction +
'0' + transaction.nTimeval eventTime
: Long
= format.parse
(eventTimeString
).getTime
(szCode, eventTime, eventTimeString, volume
)}else
{val eventTimeString
= transaction.nAction + transaction.nTimeval eventTime
= format.parse
(eventTimeString
).getTime
(szCode, eventTime, eventTimeString, volume
)}}}}
六、程序演示–體會(huì)Processing Time
關(guān)鍵點(diǎn):
設(shè)置TumblingProcessingTimeWindow,由于默認(rèn)的Time Characteristic就是Processing Time,因此不用特別指定,在windowed assign時(shí),只需指定系統(tǒng)自帶的timeWindow即可:
timeWindow
(Time.seconds
(15
))
在windowedStream之后,需要進(jìn)行聚合操作,產(chǎn)生新的DataStream。系統(tǒng)提供了sum、reduce、fold等操作,但是如果遇到窗口內(nèi)的計(jì)算非常復(fù)雜的情況,則需要采用apply{…}方法。windowedStream.apply{}的方法可參考源碼:org.apache.flink.streaming.api.scala.WindowedStream.scala
提供了6種不同的方法,詳情見(jiàn):
WindowedStream.scala
這個(gè)測(cè)試就是在windowedStream上調(diào)用了apply方法,實(shí)現(xiàn)了稍微復(fù)雜的運(yùn)算:
.apply
{ (k
: Long, w
: TimeWindow, T: Iterable
[(Long, Long, Long
)], out
: Collector
[(Long,String,String,Double
)]) =>var sumVolume
: Long
= 0var sumTurnover
: Long
= 0for
(elem
<- T
){sumVolume
= sumVolume + elem._2sumTurnover
= sumTurnover + elem._3
}val
format = new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss.SSS")val vwap
: Double
= BigDecimal
(String.valueOf
(sumTurnover
))./
(BigDecimal
(String.valueOf
(sumVolume
))).setScale
(2,BigDecimal.RoundingMode.HALF_UP
).toDoubleout.collect
((k,format.format(w.getStart),format.format(w.getEnd),vwap))}
運(yùn)行程序,測(cè)試結(jié)果:
輸入(由于是Processing Time,輸入時(shí)要注意時(shí)間間隔,超過(guò)15秒的就會(huì)產(chǎn)生新窗口,我的操作是前2條數(shù)據(jù)同時(shí)輸入,隔一段時(shí)間后輸入第3條數(shù)據(jù)):
600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......
結(jié)果如下:
(60000,2016-06-16 17:56:00.000,2016-06-16 17:56:15.000,3.0)
(60000,2016-06-16 17:58:15.000,2016-06-16 17:58:30.000,4.0)
可以看到,這個(gè)結(jié)果跟事件的時(shí)間沒(méi)有任何關(guān)系,只跟系統(tǒng)處理完成的時(shí)間有關(guān)。
完整的代碼如下:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/*** 這個(gè)Flink DataStream程序,實(shí)現(xiàn)“每15秒的加權(quán)平均價(jià)--VWAP”* source:通過(guò)SocketStream模擬kafka消費(fèi)數(shù)據(jù)* sink:直接print輸出到local,以后要實(shí)現(xiàn)sink到HDFS以及寫(xiě)到Redis* 技術(shù)點(diǎn):* 1、采用默認(rèn)的Processing Time統(tǒng)計(jì)每15秒鐘的加權(quán)平均價(jià)* 2、采用TumblingProcessingTimeWindow作為窗口,即翻滾窗口,系統(tǒng)時(shí)鐘,不重疊的范圍內(nèi)實(shí)現(xiàn)統(tǒng)計(jì)* 3、在WindowedStream上實(shí)現(xiàn)自定義的apply算法,即加權(quán)平均價(jià),而非簡(jiǎn)單的Aggregation*/object TransactionVWap
{case class Transaction
(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long
)def main
(args: Array
[String
]): Unit
= {/*** when Running the program, you should input 2 parameters:
hostname and port of Socket*/
if (args.length
!= 2
) {System.err.println
("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName
= args
(0
)val port
= args
(1
).toInt/*** Step 1. Obtain an execution environment
for DataStream operation*
set EventTime instead of Processing Time*/val
env = StreamExecutionEnvironment.getExecutionEnvironment//Processing
time is also the Default TimeCharacteristicenv.setStreamTimeCharacteristic
(TimeCharacteristic.ProcessingTime
)/*** Step 2. Create DataStream from socket*/val input
= env.socketTextStream
(hostName,port
)/*** Step 3. Implement
'每15秒加權(quán)平均價(jià)-VWAP' logic* Note: windowedStream contains 3 attributes: T
=>elements, K
=>key, W
=>window*/val sumVolumePerMinute
= input//transform Transaction to tuple
(szCode, volume, turnover
).map
(new VwapField
)//partition by szCode.keyBy
(_._1
)//building Tumbling window
for 15 seconds.timeWindow
(Time.seconds
(15
))//compute VWAP
in window.apply
{ (k
: Long, w
: TimeWindow, T: Iterable
[(Long, Long, Long
)], out
: Collector
[(Long,String,String,Double
)]) =>var sumVolume
: Long
= 0var sumTurnover
: Long
= 0for
(elem
<- T
){sumVolume
= sumVolume + elem._2sumTurnover
= sumTurnover + elem._3
}val
format = new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss.SSS")val vwap
: Double
= BigDecimal
(String.valueOf
(sumTurnover
))./
(BigDecimal
(String.valueOf
(sumVolume
))).setScale
(2,BigDecimal.RoundingMode.HALF_UP
).toDoubleout.collect
((k,format.format(w.getStart),format.format(w.getEnd),vwap))}.name
("VWAP per 15 seconds")/*** Step 4. Sink the final result to standard output
(.out file
)*/sumVolumePerMinute.print
()/*** Step 5. program execution*/env.execute
("SocketTextStream for sum of volume Example")}class VwapField extends MapFunction
[String,
(Long, Long, Long
)] {def map
(s: String
):
(Long, Long, Long
) = {val columns
= s.split
(",")val transaction
: Transaction
= Transaction
(columns
(0
),columns
(1
).toLong,columns
(2
),columns
(3
),columns
(4
).toLong,columns
(5
).toLong,columns
(6
).toLong,columns
(7
).toLong,columns
(8
).toLong,columns
(9
).toInt,columns
(9
),columns
(10
),columns
(11
).toLong,columns
(12
).toLong,columns
(13
).toLong
)val volume
: Long
= transaction.nVolumeval szCode
: Long
= transaction.szCodeval turnover
: Long
= transaction.nTurnover
(szCode, volume, turnover
)}}
}
七、總結(jié)
何時(shí)用Event Time,何時(shí)用Processing Time,這個(gè)要看具體的業(yè)務(wù)場(chǎng)景。
同時(shí),對(duì)于Event Time中的Late Element,大家可以自己模擬輸入,看看結(jié)果如何。
自定義function與operator都應(yīng)該是有狀態(tài)的,以便恢復(fù),這里簡(jiǎn)化,并沒(méi)有設(shè)置state。
參考文檔
1.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
2.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
3.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
4.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html
5.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html
6.http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-9/
7.http://www.cnblogs.com/fxjwind/p/5434572.html
8.https://github.com/apache/flink/
9.http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/
10.http://data-artisans.com/blog/
11.http://dataartisans.github.io/flink-training/
總結(jié)
以上是生活随笔為你收集整理的Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
如果覺(jué)得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。