日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

Flink异步io应用场景之流表join维表

發布時間:2024/9/16 编程问答 29 豆豆
生活随笔 收集整理的這篇文章主要介紹了 Flink异步io应用场景之流表join维表 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

簡介

維度表,作為數據倉庫里面的概念,是維度屬性的集合,比如時間維、地點維;可以是一個mysql或者cassandra,redis等存儲,甚至是自己定義的一些api。
流表是kafka等流式數據。
根據流表join維表的字段去異步查詢維表。

舉個例子

流表:kafka id1,id2,id3三列
維表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的結果就是: id1,id2,id3,age,name 流表的字段加上mysql維表的字段。
流表這邊提供id1,給到維表,維表那邊執行的sql是select * from mysql where id=id1

實戰

流表:文本數據csv包含uid、phone
維表:Elasticsearch數據包含uid、username
需要把流表和維表的數據進行join,形成uid、username、phone

第一步從文本獲取流數據

public class Test {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> dataStreamSource = env.readTextFile("/mytextFile.txt");SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> map = dataStreamSource.map(new MapFunction<String, Tuple2<String, String>>() {@Overridepublic Tuple5<String, String, String, String, String> map(String s) throws Exception {String[] splits = s.split("\t");String uid = splits[0];String phone = splits[1];return new Tuple2<>(uid, phone);}});//SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);//renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);env.execute("Test");} }

異步從Elasticsearch獲取數據

public class AsyncEsDataRequest extends RichAsyncFunction<Tuple2<String, String>, Tuple3<String, String, String>> {private transient RestHighLevelClient restHighLevelClient;@Overridepublic void open(Configuration parameters) throws Exception {HttpHost httpHost = new HttpHost("swarm-manager", 9200, "http");//初始化ElasticSearch-ClientrestHighLevelClient = new RestHighLevelClient(RestClient.builder(httpHost));}@Overridepublic void close() throws Exception {restHighLevelClient.close();}@Overridepublic void asyncInvoke(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) throws Exception {search(input, resultFuture);}//異步去讀Es表private void search(Tuple2<String, String> input, ResultFuture<Tuple3<String, String, String>> resultFuture) {SearchRequest searchRequest = new SearchRequest("renyuanku");String uid = input.f0;QueryBuilder builder = QueryBuilders.boolQuery().must(QueryBuilders.termQuery("uid", uid));SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();sourceBuilder.query(builder);searchRequest.source(sourceBuilder);ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {String uid = input.f1;String phone = input.f2;//成功@Overridepublic void onResponse(SearchResponse searchResponse) {SearchHit[] searchHits = searchResponse.getHits().getHits();if (searchHits.length > 0) {JSONObject jsonObject = JSONObject.parseObject(searchHits[0].getSourceAsString());String username = jsonObject.getString("username");}resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone)));}//失敗@Overridepublic void onFailure(Exception e) {System.out.println(e.getMessage());resultFuture.complete(Collections.singleton(Tuple5.of(uid, username, phone));*/}};restHighLevelClient.searchAsync(searchRequest, listener);} }

連接這兩個流,并將結果輸出到文件

SingleOutputStreamOperator<Tuple5<String, Set<String>, Set<String>, Set<String>, Set<String>>> renyuanku = AsyncDataStream.unorderedWait(map, new AsyncEsDataRequest(), 2, TimeUnit.SECONDS, 100);renyuanku.writeAsText("E:/test/renyuanku.txt").setParallelism(1);

這樣就將這兩個流進行合并了。

總結

以上是生活随笔為你收集整理的Flink异步io应用场景之流表join维表的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。