Flink Operator之CoGroup、Join以及Connect
Flink 雙數(shù)據(jù)流轉(zhuǎn)換為單數(shù)據(jù)流操作的運(yùn)算有cogroup, join和coflatmap。下面為大家對比介紹下這3個(gè)運(yùn)算的功能和用法。
- Join:只輸出條件匹配的元素對。
- CoGroup: 除了輸出匹配的元素對以外,未能匹配的元素也會輸出。
- CoFlatMap:沒有匹配條件,不進(jìn)行匹配,分別處理兩個(gè)流的元素。在此基礎(chǔ)上完全可以實(shí)現(xiàn)join和cogroup的功能,比他們使用上更加自由。
對于join和cogroup來說,代碼結(jié)構(gòu)大致如下:
val stream1 = ... val stream2 = ...stream1.join(stream2).where(_._1).equalTo(_._1) //join的條件stream1中的某個(gè)字段和stream2中的字段值相等.window(...) // 指定window,stream1和stream2中的數(shù)據(jù)會進(jìn)入到該window中。只有該window中的數(shù)據(jù)才會被后續(xù)操作join.apply((t1, t2, out: Collector[String]) => {out.collect(...) // 捕獲到匹配的數(shù)據(jù)t1和t2,在這里可以進(jìn)行組裝等操作}).print()CoGroup操作
該操作是將兩個(gè)數(shù)據(jù)流/集合按照key進(jìn)行g(shù)roup,然后將相同key的數(shù)據(jù)進(jìn)行處理,但是它和join操作稍有區(qū)別,它在一個(gè)流/數(shù)據(jù)集中沒有找到與另一個(gè)匹配的數(shù)據(jù)還是會輸出。
在DataStream中
下面看一個(gè)簡單的例子,這個(gè)例子中從兩個(gè)不同的端口來讀取數(shù)據(jù),模擬兩個(gè)流,我們使用CoGroup來處理這兩個(gè)數(shù)據(jù)流,觀察輸出結(jié)果:
public class CogroupFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source1 = env.socketTextStream("localhost", 9091);DataStreamSource<String> source2 = env.socketTextStream("localhost", 9092);SingleOutputStreamOperator<Tuple2<String, String>> input1 = source1.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return Tuple2.of(value.split(" ")[0], value.split(" ")[1]);}});SingleOutputStreamOperator<Tuple2<String, String>> input2 = source2.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple2<String, String> map(String value) throws Exception {return Tuple2.of(value.split(" ")[0], value.split(" ")[1]);}});DataStream<String> apply = input1.coGroup(input2).where(new KeySelector<Tuple2<String, String>, Object>() {@Overridepublic Object getKey(Tuple2<String, String> value) throws Exception {return value.f0;}}).equalTo(new KeySelector<Tuple2<String, String>, Object>() {@Overridepublic Object getKey(Tuple2<String, String> value) throws Exception {return value.f1;}}).window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).trigger(CountTrigger.of(1)).apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, String>() {@Overridepublic void coGroup(Iterable<Tuple2<String, String>> first, Iterable<Tuple2<String, String>> second, Collector<String> out) throws Exception {StringBuffer buffer = new StringBuffer();buffer.append("input1:");Iterator<Tuple2<String, String>> iterator1 = first.iterator();while (iterator1.hasNext()) {Tuple2<String, String> next = iterator1.next();buffer.append(next.f0 + "=>" + next.f1);}buffer.append("input2:");Iterator<Tuple2<String, String>> iterator2 = second.iterator();while (iterator2.hasNext()) {Tuple2<String, String> next = iterator2.next();buffer.append(next.f0 + "=>" + next.f1);}out.collect(buffer.toString());}});apply.print();env.execute("CogroupFunction");} }在DataSet中
下面的例子中,key代表學(xué)生班級ID,value為學(xué)生name,使用cogroup操作將兩個(gè)集合中key相同數(shù)據(jù)合并:
public class CogroupFunction {public static void main(String[] args) throws Exception {ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();DataSet<Tuple2<Long, String>> source1=env.fromElements(Tuple2.of(1L,"tom"),Tuple2.of(2L,"jerry"));DataSet<Tuple2<Long, String>> source2=env.fromElements(Tuple2.of(2L,"jack"),Tuple2.of(1L,"rose"),Tuple2.of(3L,"sofia"));source1.coGroup(source2).where(0).equalTo(0).with(new CoGroupFunction<Tuple2<Long,String>, Tuple2<Long,String>, Object>() {@Overridepublic void coGroup(Iterable<Tuple2<Long, String>> iterable,Iterable<Tuple2<Long, String>> iterable1, Collector<Object> collector) throws Exception {Map<Long,String> map=new HashMap<Long,String>();for(Tuple2<Long,String> tuple:iterable){String str=map.get(tuple.f0);if(str==null){map.put(tuple.f0,tuple.f1);}else{if(!str.equals(tuple.f1))map.put(tuple.f0,str+" "+tuple.f1);}}for(Tuple2<Long,String> tuple:iterable1){String str=map.get(tuple.f0);if(str==null){map.put(tuple.f0,tuple.f1);}else{if(!str.equals(tuple.f1))map.put(tuple.f0,str+" "+tuple.f1);}}collector.collect(map);}}).print();} }輸出結(jié)果如下:
{3=sofia} {1=tom rose} {2=jerry jack}CoGroup的作用和join基本相同,但有一點(diǎn)不一樣的是,如果未能找到新到來的數(shù)據(jù)與另一個(gè)流在window中存在的匹配數(shù)據(jù),仍會將其輸出。如果未能找到新到來的數(shù)據(jù)與另一個(gè)流在window中存在的匹配數(shù)據(jù),仍會將其輸出。
Join操作
Flink中的Join操作類似于SQL中的join,按照一定條件分別取出兩個(gè)流中匹配的元素,返回給下游處理。
示例代碼如下:
- 為測試方便,這里使用session window。只有兩個(gè)元素到來時(shí)間前后相差不大于30秒之時(shí)才會被匹配。(Session window的特點(diǎn)為,沒有固定的開始和結(jié)束時(shí)間,只要兩個(gè)元素之間的時(shí)間間隔不大于設(shè)定值,就會分配到同一個(gè)window中,否則后來的元素會進(jìn)入新的window)。
- 將window默認(rèn)的trigger修改為count trigger。這里的含義為每到來一個(gè)元素,都會立刻觸發(fā)計(jì)算。
- 處理匹配到的兩個(gè)數(shù)據(jù),例如到來的數(shù)據(jù)為(1, “a”)和(1, “b”),輸出到下游則為"a<=>b"
下面我們測試下程序。
打開兩個(gè)terminal,分別輸入 nc -lk 127.0.0.1 9000 和 nc -lk 127.0.0.1 9001。
在terminal1中輸入,1 a,然后在terminal2中輸入2 b。觀察程序console,發(fā)現(xiàn)沒有輸出。這兩條數(shù)據(jù)不滿足匹配條件,因此沒有輸出。
在30秒之內(nèi)輸入1 c,發(fā)現(xiàn)程序控制臺輸出了結(jié)果a<=>c。再輸入1 d,控制臺輸出a<=>c和a<=>d兩個(gè)結(jié)果。
等待30秒之后,在terminal2中輸入1 e,發(fā)現(xiàn)控制臺無輸出。由于session window的效果,該數(shù)據(jù)和之前stream1中的數(shù)據(jù)不在同一個(gè)window中。因此沒有匹配結(jié)果,控制臺不會有輸出。
綜上我們得出結(jié)論:
- join只返回匹配到的數(shù)據(jù)對。若在window中沒有能夠與之匹配的數(shù)據(jù),則不會有輸出。
- join會輸出window中所有的匹配數(shù)據(jù)對。
- 不在window內(nèi)的數(shù)據(jù)不會被匹配到。
CoFlatMap操作
相比之下CoFlatMap操作就比以上兩個(gè)簡單多了。CoFlatMap操作主要在CoFlatMapFunction中進(jìn)行。
以下是CoFlatMapFunction的代碼:
簡單理解就是當(dāng)stream1數(shù)據(jù)到來時(shí),會調(diào)用flatMap1方法,stream2收到數(shù)據(jù)之時(shí),會調(diào)用flatMap2方法。
Connect操作
只適用操作DataStream,它會將兩個(gè)流中匹配的數(shù)據(jù)進(jìn)行處理,不匹配不會進(jìn)行處理,它會分別處理兩個(gè)流,相比于join和Cogroup操作更加自由,
總結(jié)
Join、CoGroup和CoFlatMap這三個(gè)運(yùn)算符都能夠?qū)㈦p數(shù)據(jù)流轉(zhuǎn)換為單個(gè)數(shù)據(jù)流。Join和CoGroup會根據(jù)指定的條件進(jìn)行數(shù)據(jù)配對操作,不同的是Join只輸出匹配成功的數(shù)據(jù)對,CoGroup無論是否有匹配都會輸出。CoFlatMap沒有匹配操作,只是分別去接收兩個(gè)流的輸入。大家可以根據(jù)具體的業(yè)務(wù)需求,選擇不同的雙流操作。
總結(jié)
以上是生活随笔為你收集整理的Flink Operator之CoGroup、Join以及Connect的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Flink 分别读取kafka和mysq
- 下一篇: Elasticsearch kibana