flink的CEP调研与使用
CEP定義
Flink的Complex Event Processing庫
允許我們在事件流中檢測事件的模式
CEP調研
Reference調研了網上所有講解CEP的例子,只有4篇文章是有相對完整的代碼的。
[4]中的代碼已經得到驗證,分為這么幾個步驟:
?
先理一下flink cep的代碼流程[4]
1.先定義Pattern
?Pattern.begin[X]("start").where(...).next("middle").where(...)?
2.通過CEP.pattern()方法將DataStream轉化為PatternStream
? val cepResult: PatternStream[Event] = CEP.pattern(inputDataStream, pattern) ?
3.將符合pattern的數據調用select方法對數據進行處理
cepResult.select(new PatternSelectFunction[X, String] {
? ? ? override def select(pattern: util.Map[String, util.List[X]]): String = {
? ? ? ?? ?logic code
? ? ? ?? ?""
? ? ? }
}
?
?
最終我完成了調研文獻中的[3][4][5],完整代碼鏈接是:
https://gitee.com/appleyuchi/Flink_Code/tree/master/flink_cep
Reference:
[1]Apache Flink CEP學習總結(評論區中說代碼存在一些問題)
[2]Apache FlinkCEP實現超時狀態監控(沒有意義,超時的情況可以由waterMark進行sideOutput處理)
[3]Flink - CEP分析攻擊行為(自己補充完整了)
[4]【Flink】flink cep對于復雜事件的處理(完成)
[5]Flink cep的初步使用(完成)
?
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的flink的CEP调研与使用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hbase协处理器与二级索引的设计(还没
- 下一篇: 螺丝孔槽中的螺丝拧花了的物理原理分析