Flink-CEP快速入门
Flink-CEP快速入門
更新時(shí)間:2022-09-12 10:58:28發(fā)布時(shí)間:2小時(shí)前朗讀
文章目錄
0. 簡(jiǎn)介 & 使用步驟
簡(jiǎn)介
使用步驟
模式API(Pattern API:匹配規(guī)則)
單個(gè)模式
量詞
條件
限定子類型
簡(jiǎn)單條件(SimpleCondition)
迭代條件(IterativeCondition)
組合條件
終止條件
模式操作列舉
組合模式
連續(xù)性
循環(huán)模式中的近鄰條件
模式組
匹配后跳過(guò)策略
檢測(cè)模式(檢測(cè)滿足規(guī)則的復(fù)雜事件)
將模式應(yīng)用到流上
處理匹配事件
匹配事件的選擇提取(select)
PatternSelectFunction
PatternFlatSelectFunction
匹配事件的通用處理(process)
處理超時(shí)事件
Maven
簡(jiǎn)介 & 使用步驟 簡(jiǎn)介
所謂 CEP,其實(shí)就是“復(fù)雜事件處理(Complex Event Processing)”的縮寫;而 Flink CEP,就是 Flink 實(shí)現(xiàn)的一個(gè)用于復(fù)雜事件處理的庫(kù)(library)
把事件流中的一個(gè)個(gè)簡(jiǎn)單事件,通過(guò)一定的規(guī)則匹配組合起來(lái),這就是“復(fù)雜事件”;然后基于這些滿足規(guī)則的一組組復(fù)雜事件進(jìn)行轉(zhuǎn)換處理,得到想要的結(jié)果進(jìn)行輸出
使用步驟
復(fù)雜事件處理(CEP)的流程可以分成三個(gè)步驟:
定義一個(gè)匹配規(guī)則
將匹配規(guī)則應(yīng)用到事件流上,檢測(cè)滿足規(guī)則的復(fù)雜事件
對(duì)檢測(cè)到的復(fù)雜事件進(jìn)行處理,得到結(jié)果進(jìn)行輸出
// 實(shí)體類
public class LoginEvent {
public String userId;
public String ipAddress;
public String eventType;
public Long timestamp;
public LoginEvent(String userId, String ipAddress, String eventType, Long timestamp) {
this.userId = userId;
this.ipAddress = ipAddress;
this.eventType = eventType;
this.timestamp = timestamp;
}
}
// CEP Demo
public class Demo003 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
}
</string,></loginevent,></loginevent,>
一個(gè)模式可以是一個(gè)單例或者循環(huán)模式。單例模式只接受一個(gè)事件,循環(huán)模式可以接受多個(gè)事件。 在模式匹配表達(dá)式中,模式"a b+ c? d"(或者"a",后面跟著一個(gè)或者多個(gè)"b",再往后可選擇的跟著一個(gè)"c",最后跟著一個(gè)"d"), a,c?,和 d都是單例模式,b+是一個(gè)循環(huán)模式
量詞
單個(gè)模式后面可以跟一個(gè)“量詞”,用來(lái)指定循環(huán)的次數(shù),單個(gè)模式可以包括“單例(singleton)模式”和“循環(huán)(looping)模式”,默認(rèn)是“單例(singleton)模式”,當(dāng)定義了量詞之后,就變成了“循環(huán)模式”,可以匹配接收多個(gè)事件
循環(huán)模式的方法:
.oneOrMore()
匹配事件出現(xiàn)一次或多次,假設(shè) a 是一個(gè)個(gè)體模式,a.oneOrMore()表示可以匹配 1 個(gè)或多個(gè) a 的事件組合。我們有時(shí)會(huì)用 a+來(lái)簡(jiǎn)單表示
.times(times)
匹配事件發(fā)生特定次數(shù)(times),例如 a.times(3)表示 aaa
.times(fromTimes,toTimes)
指定匹配事件出現(xiàn)的次數(shù)范圍,最小次數(shù)為fromTimes,最大次數(shù)為toTimes。例如a.times(2, 4)可以匹配 aa,aaa 和 aaaa
.greedy()
只能用在循環(huán)模式后,使當(dāng)前循環(huán)模式變得“貪心”(greedy),也就是總是盡可能多地去匹配。例如 a.times(2, 4).greedy(),如果出現(xiàn)了連續(xù) 4 個(gè) a,那么會(huì)直接把 aaaa 檢測(cè)出來(lái)進(jìn)行處理,其他任意 2 個(gè) a 是不算匹配事件的
.optional()
使當(dāng)前模式成為可選的,也就是說(shuō)可以滿足這個(gè)匹配條件,也可以不滿足
// 期望出現(xiàn)4次
start.times(4);
// 期望出現(xiàn)0或者4次
start.times(4).optional();
// 期望出現(xiàn)2、3或者4次
start.times(2, 4);
// 期望出現(xiàn)2、3或者4次,并且盡可能的重復(fù)次數(shù)多
start.times(2, 4).greedy();
// 期望出現(xiàn)0、2、3或者4次
start.times(2, 4).optional();
// 期望出現(xiàn)0、2、3或者4次,并且盡可能的重復(fù)次數(shù)多
start.times(2, 4).optional().greedy();
// 期望出現(xiàn)1到多次
start.oneOrMore();
// 期望出現(xiàn)1到多次,并且盡可能的重復(fù)次數(shù)多
start.oneOrMore().greedy();
// 期望出現(xiàn)0到多次
start.oneOrMore().optional();
// 期望出現(xiàn)0到多次,并且盡可能的重復(fù)次數(shù)多
start.oneOrMore().optional().greedy();
// 期望出現(xiàn)2到多次
start.timesOrMore(2);
// 期望出現(xiàn)2到多次,并且盡可能的重復(fù)次數(shù)多
start.timesOrMore(2).greedy();
// 期望出現(xiàn)0、2或多次
start.timesOrMore(2).optional();
// 期望出現(xiàn)0、2或多次,并且盡可能的重復(fù)次數(shù)多
start.timesOrMore(2).optional().greedy();
條件 限定子類型
調(diào)用.subtype()方法可以為當(dāng)前模式增加子類型限制條件
// 這里 SubEvent 是流中數(shù)據(jù)類型 Event 的子類型。只有事件是 SubEvent 類型時(shí),才可以滿足當(dāng)前模式 pattern 的匹配條件
pattern.subtype(SubEvent.class);
簡(jiǎn)單條件(SimpleCondition)
簡(jiǎn)單條件是最簡(jiǎn)單的匹配規(guī)則,只根據(jù)當(dāng)前事件的特征來(lái)決定是否接受它。這在本質(zhì)上其實(shí)就是一個(gè) filter 操作
start.where(new SimpleCondition() {
@Override
public boolean filter(MyEvent myEvent) throws Exception {
return … // 一些判斷條件
}
})
迭代條件(IterativeCondition)
在實(shí)際應(yīng)用中,我們可能需要將當(dāng)前事件跟之前的事件做對(duì)比,才能判斷出要不要接受當(dāng)前事件。這種需要依靠之前事件來(lái)做判斷的條件,就叫作“迭代條件”(Iterative Condition)
Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
if (!“event1001”.equals(myEvent.getEvent())) {
return false;
}
組合條件
可以多個(gè)條件一起使用,當(dāng)有多個(gè)判斷邏輯的時(shí)候我們可能會(huì)用if-else的方式,但組合條件可以在 where()方法后繼續(xù)接or()方法來(lái)組合使用
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判斷條件
}
}).or(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判斷條件
}
});
</myevent,>
終止條件
終止條件的定義是通過(guò)調(diào)用模式對(duì)象的.until()方法來(lái)實(shí)現(xiàn)的
??終止條件只與oneOrMore()或者oneOrMore().optional()結(jié)合使用
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判斷條件
}
}).oneOrMore()
.until(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判斷條件
}
});
</myevent,>
模式操作列舉
模式操作 描述
where(condition) 為當(dāng)前模式定義一個(gè)條件。為了匹配這個(gè)模式,一個(gè)事件必須滿足某些條件。 多個(gè)連續(xù)的where()語(yǔ)句取與組成判斷條件:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判斷條件 } });
or(condition) 增加一個(gè)新的判斷,和當(dāng)前的判斷取或。一個(gè)事件只要滿足至少一個(gè)判斷條件就匹配到模式:java pattern.where(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 一些判斷條件 } }).or(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代條件 } });
until(condition) 為循環(huán)模式指定一個(gè)停止條件。意思是滿足了給定的條件的事件出現(xiàn)后,就不會(huì)再有事件被接受進(jìn)入模式了。只適用于和oneOrMore()同時(shí)使用。NOTE: 在基于事件的條件中,它可用于清理對(duì)應(yīng)模式的狀態(tài)。java pattern.oneOrMore().until(new IterativeCondition() { @Override public boolean filter(Event value, Context ctx) throws Exception { return … // 替代條件 } });
subtype(subClass) 為當(dāng)前模式定義一個(gè)子類型條件。一個(gè)事件只有是這個(gè)子類型的時(shí)候才能匹配到模式:java pattern.subtype(SubEvent.class);
oneOrMore() 指定模式期望匹配到的事件至少出現(xiàn)一次。.默認(rèn)(在子事件間)使用松散的內(nèi)部連續(xù)性。 NOTE: 推薦使用until()或者within()來(lái)清理狀態(tài)。java pattern.oneOrMore();
timesOrMore(#times) 指定模式期望匹配到的事件至少出現(xiàn)**#times次。.默認(rèn)(在子事件間)使用松散的內(nèi)部連續(xù)性。 java pattern.timesOrMore(2);
times(#ofTimes) 指定模式期望匹配到的事件正好出現(xiàn)的次數(shù)。默認(rèn)(在子事件間)使用松散的內(nèi)部連續(xù)性。 java pattern.times(2);
times(#fromTimes, #toTimes) 指定模式期望匹配到的事件出現(xiàn)次數(shù)在#fromTimes和#toTimes**之間。默認(rèn)(在子事件間)使用松散的內(nèi)部連續(xù)性。 java pattern.times(2, 4);
optional() 指定這個(gè)模式是可選的,也就是說(shuō),它可能根本不出現(xiàn)。這對(duì)所有之前提到的量詞都適用。java pattern.oneOrMore().optional();
greedy() 指定這個(gè)模式是貪心的,也就是說(shuō),它會(huì)重復(fù)盡可能多的次數(shù)。這只對(duì)量詞適用,現(xiàn)在還不支持模式組。java pattern.oneOrMore().greedy();
組合模式 連續(xù)性
將多個(gè)個(gè)體模式組合起來(lái)的完整模式,就叫作“組合模式”
FlinkCEP支持事件之間如下形式的連續(xù)策略:
嚴(yán)格連續(xù): next()期望所有匹配的事件嚴(yán)格的一個(gè)接一個(gè)出現(xiàn),中間沒有任何不匹配的事件。
松散連續(xù): followedBy()忽略匹配的事件之間的不匹配的事件。
不確定的松散連續(xù): followedByAny()更進(jìn)一步的松散連續(xù),允許忽略掉一些匹配事件的附加匹配。
notNext():如果不想后面直接連著一個(gè)特定事件
notFollowedBy(),如果不想一個(gè)特定事件發(fā)生在兩個(gè)事件之間的任何地方
// 嚴(yán)格連續(xù)
Pattern<event, ?=“”> strict = start.next(“middle”).where(…);
// 松散連續(xù)
Pattern<event, ?=“”> relaxed = start.followedBy(“middle”).where(…);
// 不確定的松散連續(xù)
Pattern<event, ?=“”> nonDetermin = start.followedByAny(“middle”).where(…);
// 嚴(yán)格連續(xù)的NOT模式
Pattern<event, ?=“”> strictNot = start.notNext(“not”).where(…);
// 松散連續(xù)的NOT模式
Pattern<event, ?=“”> relaxedNot = start.notFollowedBy(“not”).where(…);
</event,></event,></event,></event,></event,>
within()方法:指定一個(gè)模式應(yīng)該在一定時(shí)間內(nèi)發(fā)生
// 在十秒鐘內(nèi),從 event1001 開始到 event1004 結(jié)束才算
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1001”.equals(myEvent.getEvent());
}
})
.followedBy(“second”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1004”.equals(myEvent.getEvent());
}
})
.within(Time.seconds(10L));
</myevent,>
循環(huán)模式中的近鄰條件
oneOrMore()、times()等循環(huán)模式的默認(rèn)是松散連續(xù),也就是followedBy()模式
.consecutive():在oneOrMore()、times()等循環(huán)模式后面跟上consecutive()表示嚴(yán)格連續(xù)(next())
// 1. 定義 Pattern,登錄失敗事件,循環(huán)檢測(cè) 3 次
Pattern<loginevent, loginevent=“”> pattern = Pattern
.begin(“fails”)
.where(new SimpleCondition() {
@Override
public boolean filter(LoginEvent loginEvent) throws Exception {
return loginEvent.eventType.equals(“fail”);
}
}).times(3).consecutive();
</loginevent,>
.allowCombinations():在oneOrMore()、times()等循環(huán)模式后面跟上allowCombinations()表示不確定的松散連續(xù)(followedByAny())
模式組
也可以定義一個(gè)模式序列作為begin,followedBy,followedByAny和next的條件。這個(gè)模式序列在邏輯上會(huì)被當(dāng)作匹配的條件, 并且返回一個(gè)GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。
Pattern<event, ?=“”> start = Pattern.begin(
Pattern.begin(“start”).where(…).followedBy(“start_middle”).where(…)
);
// 嚴(yán)格連續(xù)
Pattern<event, ?=“”> strict = start.next(
Pattern.begin(“next_start”).where(…).followedBy(“next_middle”).where(…)
).times(3);
// 松散連續(xù)
Pattern<event, ?=“”> relaxed = start.followedBy(
Pattern.begin(“followedby_start”).where(…).followedBy(“followedby_middle”).where(…)
).oneOrMore();
// 不確定松散連續(xù)
Pattern<event, ?=“”> nonDetermin = start.followedByAny(
Pattern.begin(“followedbyany_start”).where(…).followedBy(“followedbyany_middle”).where(…)
).optional();
</event,></event,></event,></event,>
模式操作 描述
begin(#name) 定義一個(gè)開始的模式:java Pattern start = Pattern.begin(“start”);
begin(#pattern_sequence) 定義一個(gè)開始的模式:java Pattern start = Pattern.begin( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
next(#name) 增加一個(gè)新的模式。匹配的事件必須是直接跟在前面匹配到的事件后面(嚴(yán)格連續(xù)):java Pattern next = start.next(“middle”);
next(#pattern_sequence) 增加一個(gè)新的模式。匹配的事件序列必須是直接跟在前面匹配到的事件后面(嚴(yán)格連續(xù)):java Pattern next = start.next( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
followedBy(#name) 增加一個(gè)新的模式。可以有其他事件出現(xiàn)在匹配的事件和之前匹配到的事件中間(松散連續(xù)):java Pattern followedBy = start.followedBy(“middle”);
followedBy(#pattern_sequence) 增加一個(gè)新的模式。可以有其他事件出現(xiàn)在匹配的事件序列和之前匹配到的事件中間(松散連續(xù)):java Pattern followedBy = start.followedBy( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
followedByAny(#name) 增加一個(gè)新的模式。可以有其他事件出現(xiàn)在匹配的事件和之前匹配到的事件中間, 每個(gè)可選的匹配事件都會(huì)作為可選的匹配結(jié)果輸出(不確定的松散連續(xù)):java Pattern followedByAny = start.followedByAny(“middle”);
followedByAny(#pattern_sequence) 增加一個(gè)新的模式。可以有其他事件出現(xiàn)在匹配的事件序列和之前匹配到的事件中間, 每個(gè)可選的匹配事件序列都會(huì)作為可選的匹配結(jié)果輸出(不確定的松散連續(xù)):java Pattern followedByAny = start.followedByAny( Pattern.begin(“start”).where(…).followedBy(“middle”).where(…) );
notNext() 增加一個(gè)新的否定模式。匹配的(否定)事件必須直接跟在前面匹配到的事件之后(嚴(yán)格連續(xù))來(lái)丟棄這些部分匹配:java Pattern notNext = start.notNext(“not”);
notFollowedBy() 增加一個(gè)新的否定模式。即使有其他事件在匹配的(否定)事件和之前匹配的事件之間發(fā)生, 部分匹配的事件序列也會(huì)被丟棄(松散連續(xù)):java Pattern notFollowedBy = start.notFollowedBy(“not”);
within(time) 定義匹配模式的事件序列出現(xiàn)的最大時(shí)間間隔。如果未完成的事件序列超過(guò)了這個(gè)事件,就會(huì)被丟棄:java pattern.within(Time.seconds(10));
匹配后跳過(guò)策略
對(duì)于一個(gè)給定的模式,同一個(gè)事件可能會(huì)分配到多個(gè)成功的匹配上。為了控制一個(gè)事件會(huì)分配到多少個(gè)匹配上,你需要指定跳過(guò)策略AfterMatchSkipStrategy。 有五種跳過(guò)策略,如下:
NO_SKIP: 不跳過(guò)
SKIP_TO_NEXT: 跳至下一個(gè)
SKIP_PAST_LAST_EVENT: 跳過(guò)所有子匹配
SKIP_TO_FIRST: 跳至第一個(gè)
SKIP_TO_LAST: 跳至最后一個(gè)
例如,給定一個(gè)模式b+ c和一個(gè)數(shù)據(jù)流b1 b2 b3 c,不同跳過(guò)策略之間的不同如下:
跳過(guò)策略 結(jié)果 描述
NO_SKIP b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,不會(huì)丟棄任何結(jié)果。
SKIP_TO_NEXT b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,不會(huì)丟棄任何結(jié)果,因?yàn)闆]有以b1開始的其他匹配。
SKIP_PAST_LAST_EVENT b1 b2 b3 c 找到匹配b1 b2 b3 c之后,會(huì)丟棄其他所有的部分匹配。
SKIP_TO_FIRST[b] b1 b2 b3 c
b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,會(huì)嘗試丟棄所有在b1之前開始的部分匹配,但沒有這樣的匹配,所以沒有任何匹配被丟棄。
SKIP_TO_LAST[b] b1 b2 b3 c
b3 c 找到匹配b1 b2 b3 c之后,會(huì)嘗試丟棄所有在b3之前開始的部分匹配,有一個(gè)這樣的b2 b3 c被丟棄。
方法 描述
AfterMatchSkipStrategy.noSkip() 創(chuàng)建NO_SKIP策略
AfterMatchSkipStrategy.skipToNext() 創(chuàng)建SKIP_TO_NEXT策略
AfterMatchSkipStrategy.skipPastLastEvent() 創(chuàng)建SKIP_PAST_LAST_EVENT策略
AfterMatchSkipStrategy.skipToFirst(patternName) 創(chuàng)建引用模式名稱為patternName的SKIP_TO_FIRST策略
AfterMatchSkipStrategy.skipToLast(patternName) 創(chuàng)建引用模式名稱為patternName的SKIP_TO_LAST策略
skipToNext
// 配置跳過(guò)策略:skipToNext模式
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToNext();
// 將跳過(guò)策略加入到模式中
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return … // 一些判斷條件
}
});
</myevent,>
skipToFirst(patternName)
// 配置跳過(guò)策略:skipToFirst模式,參數(shù)傳模式名稱
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst(“first”);
Pattern<myevent, myevent=“”> pattern = Pattern.begin(“first”, skipStrategy)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1001”.equals(myEvent.getEvent());
}
}).oneOrMore()
.followedBy(“second”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1003”.equals(myEvent.getEvent());
}
})
.followedBy(“thrid”)
.where(new IterativeCondition() {
@Override
public boolean filter(MyEvent myEvent, Context context) throws Exception {
return “event1004”.equals(myEvent.getEvent());
}
});
</myevent,>
2. 檢測(cè)模式(檢測(cè)滿足規(guī)則的復(fù)雜事件) 將模式應(yīng)用到流上
調(diào)用 CEP 類的靜態(tài)方法.pattern(),將數(shù)據(jù)流(DataStream)和模式(Pattern)作為兩個(gè)參數(shù)傳入
DataStream,也可以通過(guò) keyBy 進(jìn)行按鍵分區(qū)得到 KeyedStream,接下來(lái)對(duì)復(fù)雜事件的檢測(cè)就會(huì)針對(duì)不同的 key 單獨(dú)進(jìn)行了
DataStream inputStream = …
Pattern<event, ?=“”> pattern = …
PatternStream patternStream = CEP.pattern(inputStream, pattern);
</event,>
處理匹配事件 匹配事件的選擇提取(select) PatternSelectFunction
處理匹配事件最簡(jiǎn)單的方式,就是從 PatternStream 中直接把匹配的復(fù)雜事件提取出來(lái),包裝成想要的信息輸出,這個(gè)操作就是“選擇”(select)
Pattern.begin(“first”).where(…);
// 處理匹配事件
cepPattern.select(new PatternSelectFunction<myevent, string=“”>() {
@Override
public String select(Map<string, list<myevent=“”>> map) throws Exception {
// first 是 Pattern 的 name 字符串
List first = map.get(“first”);
return … // 處理匹配事件邏輯
}
});
</string,></myevent,>
PatternFlatSelectFunction
.flatSelect(),傳入的參數(shù)是一個(gè)PatternFlatSelectFunction。這是 PatternSelectFunction 的“扁平化”版本;內(nèi)部需要實(shí)現(xiàn)一個(gè) flatSelect()方法,
它與之前 select()的不同就在于沒有返回值,而是多了一個(gè)收集器(Collector)參數(shù) collector,通過(guò)調(diào)用 collector.collet()方法就可以實(shí)現(xiàn)多次發(fā)送輸出數(shù)據(jù)了
cepPattern.flatSelect(new PatternFlatSelectFunction<myevent, string=“”>() {
@Override
public void flatSelect(Map<string, list<myevent=“”>> map, Collector collector) throws Exception {
// 處理匹配事件邏輯
}
});
</string,></myevent,>
匹配事件的通用處理(process)
自 1.8 版本之后,Flink CEP 引入了對(duì)于匹配事件的通用檢測(cè)處理方式,那就是直接調(diào)用PatternStream 的.process()方法,傳入一個(gè) PatternProcessFunction。這看起來(lái)就像是我們熟悉的處理函數(shù)(process function),它也可以訪問(wèn)一個(gè)上下文(Context),進(jìn)行更多的操作。
PatternProcessFunction 功能更加豐富、調(diào)用更加靈活,可以完全覆蓋其他接口,也就成為了目前官方推薦的處理方式。事實(shí)上,PatternSelectFunction 和 PatternFlatSelectFunction在 CEP 內(nèi)部執(zhí)行時(shí)也會(huì)被轉(zhuǎn)換成 PatternProcessFunction
Context context:上下文
collector.collect():調(diào)用此方法實(shí)現(xiàn)發(fā)送輸出數(shù)據(jù)
cepPattern.process(new PatternProcessFunction<myevent, string=“”>() {
@Override
public void processMatch(Map<string, list<myevent=“”>> map, Context context, Collector collector) throws Exception {
// 處理匹配事件邏輯
}
});
</string,></myevent,>
處理超時(shí)事件
在 Flink CEP 中 , 提 供 了 一 個(gè) 專 門 捕 捉 超 時(shí) 的 部 分 匹 配 事 件 的 接 口 , 叫 作TimedOutPartialMatchHandler。這個(gè)接口需要實(shí)現(xiàn)一個(gè) processTimedOutMatch()方法,可以將超時(shí)的、已檢測(cè)到的部分匹配事件放在一個(gè) Map 中,作為方法的第一個(gè)參數(shù);方法的第二個(gè)參數(shù)則是 PatternProcessFunction 的上下文 Context。所以這個(gè)接口必須與 PatternProcessFunction結(jié)合使用,對(duì)處理結(jié)果的輸出則需要利用側(cè)輸出流來(lái)進(jìn)行
PatternStream cepPattern = CEP.pattern(myEventData.keyBy(myEvent -> myEvent.getUserId()), pattern);
// 測(cè)流
OutputTag outputTag = new OutputTag(“time_out”){};
// 超時(shí)數(shù)據(jù)處理
SingleOutputStreamOperator processData = cepPattern.process(new MyPatternProcessFunction());
// 數(shù)據(jù)處理,處理匹配成功數(shù)據(jù),處理超時(shí)數(shù)據(jù)
public static class MyPatternProcessFunction extends PatternProcessFunction<myevent, string=“”>
implements TimedOutPartialMatchHandler {
}
</string,></string,></myevent,>
Maven
<flink.version>1.13.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
文章轉(zhuǎn)自:Flink-CEP快速入門_Java-答學(xué)網(wǎng)
作者:答學(xué)網(wǎng),轉(zhuǎn)載請(qǐng)注明原文鏈接:http://www.dxzl8.com/
總結(jié)
以上是生活随笔為你收集整理的Flink-CEP快速入门的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: html5做文字颜色渐变代码,神奇!js
- 下一篇: 蚂蚁链发布BTN,有什么用?