日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

azkaban获取上游的节点结果_Flink任务实时获取并更新规则

發布時間:2025/3/19 编程问答 16 豆豆
生活随笔 收集整理的這篇文章主要介紹了 azkaban获取上游的节点结果_Flink任务实时获取并更新规则 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

背景

Flink通常被用來處理流式數據,有著眾多的應用場景,比方說實時的ETL、檢測報警等業務場景。這些場景通常會涉及到規則的更新,比如對解析規則和報警規則進行更改后,流任務應能夠實時感知到,并用新的規則繼續檢測,避免因為規則更改而重啟任務造成的開銷。一般來說流式任務的重啟是比較重的。

方案選擇

接下來分別介紹下兩種可行的方案與選型

1.廣播變量與廣播流

廣播變量通常被運用到以下場景中:一個流中的一些數據需要被廣播到所有的下游任務,被下游任務緩存在本地并用于處理另一個流上的所有傳入數據。例如,一個低吞吐量的流包含了一組規則,我們希望根據這些規則對另一個流的所有數據進行檢測。因此,廣播變量(broadcast state)和其他的state相比有以下不同:(1)目前只支持map格式(2)算子需要同時包含廣播流和普通的數據流才可用(3)一個算子可以使用多個廣播變量并用名稱進行區分

2.異步IO

流計算中經常會需要跟外部存儲系統交互,如mysql、redis等。眾所周知,在流處理中查詢外部存儲系統并等結果返回等待時間相對來說是比較長的,若數據量較大則導致會吞吐量會大大降低,使流任務基本處于阻塞不可用狀態。

同步處理與異步處理

Flink的異步IO可以使得流任務在不阻塞運算的情況下異步請求外部系統,并且支持超時處理,以及返回結果有序或無序處理等。關于異步IO的詳細原理介紹已存在較多資料不在展開。通過異步IO獲取規則的原理就是在數據到來之后查詢外部系統獲取規則,并根據規則檢測或解析數據。比方將規則存放在redis、mysql等等。

理論上兩種方式都是可以完成從外部系統獲取規則并進行更新的。現分析下量兩種方式的差別與特點。對于異步IO來說若對于每條數據查一次外部系統,當數據量比較大的時候,外部系統的查詢性能會比較容易出現瓶頸,導致流處理能力達到上限。通常的優化方法是設置本地內存緩存規則,并設置過期時間,每次處理數據時判斷規則是否過期,若過期則重新查詢規則,這也導致了規則的獲取可能有一定的延遲性,也就是說需要在數據量較大的情況下需要對處理性能和規則實時性作出一定的平衡,另外若任務運行在分布式架構上,同一算子可能在不同的機器或者容器中運行,則可能導致多個節點查詢同一外部數據源的情況。

接下來分析下廣播變量和廣播流的方法,該方法通常定義一個數據源作為規則流,該數據源可以利用flink已有的connector,如kafka或者其他的消息隊列,也可以繼承flink的RichSourceFunction自定義數據源,比如利用Mysql作為數據源并設置線程定期獲取規則。若將變更的規則放入消息隊列作為規則流則可以做到實時更新,若利用外部存儲如Mysql、Redis作為數據源定時刷新,可以準實時的更新規則,無論利用那種方式,都可以只設置并行度為1的規則流獲取算子,并將獲取到的規則廣播到下游所有算子。

因為筆者遇到的情況如下:需要更新規則的flink任務數大概在300個以上,并且還會持續增多,另外用戶對于規則的更新最多可以容忍分鐘級別的延遲,但最好是可以實時更新,對規則更新的實時性要求不是特別高,另外流任務的數據量比較大,單個任務每秒處理數據條數可能達到幾萬或幾十萬。因此使用廣播變量并將Mysql作為數據源的方式獲取規則,這樣可以滿足目前的需求,而且利于后期利用消息隊列作為規則流實時更新規則擴展。

實踐

背景及方案介紹完了,接下來直接上代碼吧,talk is cheap, show me the code.

首先是繼承RichSourceFunction類并自定義規則流,從msyql定時拉取配置。

public class GetJobConfig extends RichSourceFunction<Map<String,JobConfig>> {private volatile boolean isRunning = true;/*** 規則配置id*/private Long jobHistoryId;public GetJobConfig(Long jobHistoryId) {this.jobHistoryId = jobHistoryId;}/*** 解析規則查詢周期為1分鐘*/private Long duration = 1 * 60 * 1000L;@Overridepublic void run(SourceContext<Map<String,JobConfig>> ctx) throws Exception {while (isRunning) {//從Mysql數據庫獲取配置String jobConfigStr = DBService.getInstance().getJobConfig(jobHistoryId);//解析規則與業務邏輯相關請忽略if (!StringUtils.isEmpty(jobConfigStr)) {JobConfig jobConfig = JsonUtil.string2Object(jobConfigStr, JobConfig.class);Map<String,JobConfig> jobConfigMap = new HashMap<>(12);jobConfigMap.put("jobConfig",jobConfig);//輸出規則流ctx.collect(jobConfigMap);}//線程睡眠Thread.sleep(duration);}}@Overridepublic void cancel() {isRunning = false;} }

接下來根據規則的數據類型定義MapState,用來描述如何存儲規則流。之后將規則流通過broadcast方法廣播出去。

//定義MapState MapStateDescriptor<String, JobConfig> etlRules = new MapStateDescriptor<String, JobConfig>("etl_rules",//Key類型BasicTypeInfo.STRING_TYPE_INFO,//Value類型TypeInformation.of(new TypeHint<JobConfig>() {})); //將規則流廣播到下游算子 BroadcastStream<Map<String, JobConfig>> etlRuleBroadcastStream = jobConfigStream.broadcast(etlRules);

然后在下游算子將規則流和數據流結合起來作檢測或報警匹配,利用connect實現規則流和數據流的結合,并在process方法中定義具體處理邏輯,其中source為從kafka獲取的要處理的數據流。

//規則流和數據流的結合 DataStream<Tuple2<Integer, Map<String, Object>>> outputStream = source.connect(etlRuleBroadcastStream).process(new EtlBroadcastProcessFunction()).setParallelism(etlParallelism).name("etl").uid("etl");

最后看一下具體的處理邏輯,通過繼承BroadcastProcessFunction類并重寫processBroadcastElement與processElement方法,其中processBroadcastElement為處理規則流的邏輯,從該方法中可以獲取到上游廣播出的規則數據,processElement用來處理數據流并將獲取到的規則作用在數據流上。

public class EtlBroadcastProcessFunction extends BroadcastProcessFunction<String,Map<String, JobConfig>,Tuple2<Integer, Map<String, Object>>> {//解析規則private JobConfig jobConfig;/*** 處理數據流* @param record* @param ctx* @param out* @throws Exception*/@Overridepublic void processElement(String record, ReadOnlyContext ctx, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {//record為要處理的數據,可利用獲取到的規則jobConfig來檢測收到的數據}/*** 獲取規則流并緩存* @param value* @param ctx* @param out* @throws Exception*/@Overridepublic void processBroadcastElement(Map<String, JobConfig> value, Context ctx, Collector<Tuple2<Integer, Map<String, Object>>> out) throws Exception {//value中為獲取到的規則數據//緩存規則數據this.jobConfig = value.get(“jobConfig“);}}

最終提交到yarn上運行,可以看到DAG圖如下所示。可以清楚的看到該任務存在兩個輸入流,其中一個為規則數據源mysql,另外一個為消息隊列kafka。

踩坑

在測試的過程中出現了一點小問題,運行過程中使用的時間類型為EventTime,卻發現加上規則流之后下游的時間窗口始終未觸發,而不加規則流則可以正常觸發窗口計算。最后通過觀察watermark找到了問題所在,發現加了規則流之后下游的watermark都為空,于是不由的想到了flink watermark的原理。

watermark機制

若一個算子存在兩個上游,則該算子watermark會選擇較小的一個,若一個上游不存在watermark則該算子會獲取不到。由此問題的原因就很清晰了,是由上游規則流未定義watermark導致的,因此將assignTimestampsAndWatermarks方法后置,放在兩個流結合之后,則下游的算子可以正常得到watermark并觸發窗口計算了。

參考資料

1.

The Broadcast State Pattern?ci.apache.org

2.

Generating Timestamps / Watermarks?ci.apache.org

3.

Asynchronous I/O for External Data Access?ci.apache.org

總結

以上是生活随笔為你收集整理的azkaban获取上游的节点结果_Flink任务实时获取并更新规则的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。

主站蜘蛛池模板: 亚洲九九九九 | 天堂www中文在线资源 | 欧美日韩在线免费 | 精品国产99一区二区乱码综合 | 91精品国产一区二区三竹菊影视 | 国产91久久精品一区二区 | 欧美日韩精品网站 | 极品美女无套呻吟啪啪 | 日本视频h | 亚洲精品乱码久久久久久久久久久久 | 少妇又色又紧又爽又刺激视频 | 久久久www成人免费精品 | 国产麻豆剧传媒精品国产av | 日韩成人在线观看 | 黄色av影视 | 91精品国产一区二区三竹菊影视 | 免费看欧美大片 | 日日操网| 风流少妇一区二区三区91 | 性欧美丰满熟妇xxxx性久久久 | 精品国产九九九 | 国产精品中文字幕在线观看 | av在线资源观看 | 欧美精品福利 | 亚洲一区二区播放 | 国产精品99无码一区二区视频 | 国产h视频在线观看 | 色av色| 精品国产黄色片 | 国产精品自拍第一页 | 亚洲涩涩视频 | 久久老司机 | 国产女主播喷水视频在线观看 | 日日摸日日干 | 国产精品久久久久久久午夜 | 日韩电影一区 | 欧美日韩一区二区三区四区五区六区 | 99国内揄拍国内精品人妻免费 | 意大利少妇愉情理伦片 | 麻豆成人久久精品一区二区三区 | 六月色丁香| 日本孰妇毛茸茸xxxx | 中文字幕免费在线 | 在线看黄色片 | 亚洲 激情| 麻豆久久久久久久久久 | 污污网站在线观看视频 | 国产中文字幕二区 | 国产精品国产三级国产Av车上的 | 久久22| 91久久久久| 1000部啪啪 | 毛片无码免费无码播放 | 99精品国产99久久久久久97 | 青春草视频在线免费观看 | 男女黄色片 | 亚洲天堂影视 | 超碰美女在线 | 亚洲精品久久一区二区三区777 | 色妞网| 18xxxx日本| 麻豆com| 色天堂视频 | 婷婷色小说| 亚洲色图10p | 性欧美视频在线观看 | 瑟瑟综合网 | 在线网站黄 | 国产精品一区二区在线 | 欧美激情久久久久久久 | 蜜臀av性久久久久蜜臀aⅴ | 壮汉被书生c到合不拢腿 | 国产色99| 永久免费看黄 | 色偷偷av一区二区三区 | 老头老太吃奶xb视频 | 农村妇女av | 成人欧美一区二区三区黑人免费 | 欧美巨乳美女 | wwwwxxxx欧美 | 无码一区二区波多野结衣播放搜索 | 国产偷自拍 | 久久精品国产免费看久久精品 | 日本啪啪网站 | 在线aaa| 国产特黄级aaaaa片免 | 日韩一区二区中文字幕 | 人妻熟女一区二区aⅴ水野 91在线观看视频 | 99福利视频导航 | 天天综合网天天综合色 | 噜噜噜久久,亚洲精品国产品 | 精品无码久久久久久国产 | 日韩福利在线播放 | 精国产品一区二区三区a片 国产精品第一 | 神马午夜91 | 日韩视频在线观看一区二区三区 | 91视频入口 | 自拍偷自拍亚洲精品播放 | 久久蜜臀 |