Flink State 误用之痛,你中招了吗?
本文主要討論一個(gè)問(wèn)題:ValueState 中存 Map 與 MapState 有什么區(qū)別?
如果不懂這兩者的區(qū)別,而且使用 ValueState 中存大對(duì)象,生產(chǎn)環(huán)境很可能會(huì)出現(xiàn)以下問(wèn)題:
· CPU 被打滿
· 吞吐上不去
1、 結(jié)論
從性能和 TTL 兩個(gè)維度來(lái)描述區(qū)別。
性能
· RocksDB 場(chǎng)景,MapState 比 ValueState 中存 Map 性能高很多。
· 生產(chǎn)環(huán)境強(qiáng)烈推薦使用 MapState,不推薦 ValueState 中存大對(duì)象
· ValueState 中存大對(duì)象很容易使 CPU 打滿
· Heap State 場(chǎng)景,兩者性能類(lèi)似。
TTL
Flink 中 State 支持設(shè)置 TTL:
· MapState 的 TTL 是基于 UK 級(jí)別的
· ValueState 的 TTL 是基于整個(gè) key 的
舉一反三
能使用 ListState 的場(chǎng)景,不要使用 ValueState 中存 List。大佬們已經(jīng)把 MapState 和 ListState 性能都做了很多優(yōu)化,高性能不香嗎?下文會(huì)詳細(xì)分析 ValueState 和 MapState 底層的實(shí)現(xiàn)原理,通過(guò)分析原理得出上述結(jié)論。
2、 State 中要存儲(chǔ)哪些數(shù)據(jù)
ValueState 會(huì)存儲(chǔ) key、namespace、value,縮寫(xiě)為 。MapState 會(huì)存儲(chǔ) key、namespace、userKey、userValue,縮寫(xiě)為 。
解釋一下上述這些名詞。
Key
ValueState 和 MapState 都是 KeyedState,也就是 keyBy 后才能使用 ValueState 和 MapState。所以 State 中肯定要保存 key。
例如:按照 app 進(jìn)行 keyBy,總共有兩個(gè) app,分別是:app1 和 app2。那么狀態(tài)存儲(chǔ)引擎中肯定要存儲(chǔ) app1 或 app2,用于區(qū)分當(dāng)前的狀態(tài)數(shù)據(jù)到底是 app1 的還是 app2 的。
這里的 app1、app2 也就是所說(shuō)的 key。
Namespace
Namespace 用于區(qū)分窗口。
假設(shè)需要統(tǒng)計(jì) app1 和 app2 每個(gè)小時(shí)的 pv 指標(biāo),則需要使用小時(shí)級(jí)別的窗口。狀態(tài)引擎為了區(qū)分 app1 在 7 點(diǎn)和 8 點(diǎn)的 pv 值,就必須新增一個(gè)維度用來(lái)標(biāo)識(shí)窗口。
Flink 用 Namespace 來(lái)標(biāo)識(shí)窗口,這樣就可以在狀態(tài)引擎中區(qū)分出 app1 在 7 點(diǎn)和 8 點(diǎn)的狀態(tài)信息。
Value、UserKey、UserValue
ValueState 中存儲(chǔ)具體的狀態(tài)值。也就是上述例子中對(duì)應(yīng)的 pv 值。MapState 類(lèi)似于 Map 集合,存儲(chǔ)的是一個(gè)個(gè) KV 鍵值對(duì)。為了與 keyBy 的 key 進(jìn)行區(qū)分,所以 Flink 中把 MapState 的 key、value 分別叫 UserKey、UserValue。
下面講述狀態(tài)引擎是如何存儲(chǔ)這些數(shù)據(jù)的。
3、StateBackend 如何存儲(chǔ)和讀寫(xiě)State 數(shù)據(jù)
Flink 支持三種 StateBackend,分別是:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。
其中 MemoryStateBackend、FsStateBackend 兩種 StateBackend 在任務(wù)運(yùn)行期間都會(huì)將 State 存儲(chǔ)在內(nèi)存中,兩者在 Checkpoint 時(shí)將快照存儲(chǔ)的位置不同。RocksDBStateBackend 在任務(wù)運(yùn)行期間將 State 存儲(chǔ)在本地的 RocksDB 數(shù)據(jù)庫(kù)中。
所以下文將 MemoryStateBackend、FsStateBackend 統(tǒng)稱(chēng)為 heap 模式,RocksDBStateBackend 稱(chēng)為 RocksDB 模式。
3.1 Heap 模式 ValueState 和 MapState 如何存儲(chǔ)
Heap 模式表示所有的狀態(tài)數(shù)據(jù)都存儲(chǔ)在 TM 的堆內(nèi)存中,所有的狀態(tài)都存儲(chǔ)的原始對(duì)象,不會(huì)做序列化和反序列化。(注:Checkpoint 的時(shí)候會(huì)涉及到序列化和反序列化,數(shù)據(jù)的正常讀寫(xiě)并不會(huì)涉及,所以這里先不討論。)
Heap 模式下,無(wú)論是 ValueState 還是 MapState 都存儲(chǔ)在 CopyOnWriteStateMap 中。
· key 、 Namespace 分別對(duì)應(yīng) CopyOnWriteStateMap 的 K、N。
· ValueState 的 value 對(duì)應(yīng) CopyOnWriteStateMap 的 V。
MapState 將會(huì)把整個(gè) Map 作為 CopyOnWriteStateMap 的 V,相當(dāng)于 Flink 引擎創(chuàng)建了一個(gè) HashMap 用于存儲(chǔ) MapState 的 KV 鍵值對(duì)。
具體 CopyOnWriteStateMap 是如何實(shí)現(xiàn)的,可以參考:萬(wàn)字長(zhǎng)文詳解 Flink 中的 CopyOnWriteStateTable。
回到正題:Heap 模式下,ValueState 中存 Map 與 MapState 有什么區(qū)別?
Heap 模式下沒(méi)有區(qū)別。
ValueState 中存 Map,相當(dāng)于用戶手動(dòng)創(chuàng)建了一個(gè) HashMap 當(dāng)做 V 放到了狀態(tài)引擎中。而 MapState 是 Flink 引擎幫用戶創(chuàng)建了一個(gè) HashMap 當(dāng)做 V 放到了狀態(tài)引擎中。
所以實(shí)質(zhì)上 ValueState 中存 Map 與 MapState 都是一樣的,存儲(chǔ)結(jié)構(gòu)都是 CopyOnWriteStateMap。區(qū)別在于 ValueState 是用戶手動(dòng)創(chuàng)建 HashMap,MapState 是 Flink 引擎創(chuàng)建 HashMap。
3.2 RocksDB 模式 ValueState 和 MapState 如何存儲(chǔ)
RocksDB 模式表示所有的狀態(tài)數(shù)據(jù)存儲(chǔ)在 TM 本地的 RocksDB 數(shù)據(jù)庫(kù)中。RocksDB 是一個(gè) KV 數(shù)據(jù)庫(kù),且所有的 key 和 value 都是 byte 數(shù)組。所以無(wú)論是 ValueState 還是 MapState,存儲(chǔ)到 RocksDB 中都必須將對(duì)象序列化成二進(jìn)制當(dāng)前 kv 存儲(chǔ)在 RocksDB 中。
■ 3.2.1 ValueState 如何映射成 RocksDB 的 kv
ValueState 有 key、namespace、value 需要存儲(chǔ),所以最簡(jiǎn)單的思路:
1、將 ValueState 的 key 序列化成 byte 數(shù)組
2、將 ValueState 的 namespace 序列化成 byte 數(shù)組
3、將兩個(gè) byte 數(shù)組拼接起來(lái)做為 RocksDB 的 key
4、將 ValueState 的 value 序列化成 byte 數(shù)組做為 RocksDB 的 value
然后就可以寫(xiě)入到 RocksDB 中。
查詢(xún)數(shù)據(jù)也用相同的邏輯:將 key 和 namespace 序列化后拼接起來(lái)作為 RocksDB 的 key,去 RocksDB 中進(jìn)行查詢(xún),查詢(xún)到的 byte 數(shù)組進(jìn)行反序列化就得到了 ValueState 的 value。
這就是 RocksDB 模式下,ValueState 的讀寫(xiě)流程。
■ 3.2.2 MapState 如何映射成 RocksDB 的 kv
MapState 有 key、namespace、userKey、userValue 需要存儲(chǔ),所以最簡(jiǎn)單的思路:
1、將 MapState 的 key 序列化成 byte 數(shù)組
2、將 MapState 的 namespace 序列化成 byte 數(shù)組
3、將 MapState 的 userKey 序列化成 byte 數(shù)組
4、將三個(gè) byte 數(shù)組拼接起來(lái)做為 RocksDB 的 key
5、將 MapState 的 value 序列化成 byte 數(shù)組做為 RocksDB 的 value
然后就可以寫(xiě)入到 RocksDB 中。
查詢(xún)數(shù)據(jù)也用相同的邏輯:將 key、namespace、userKey 序列化后拼接起來(lái)作為 RocksDB 的 key,去 RocksDB 中進(jìn)行查詢(xún),查詢(xún)到的 byte 數(shù)組進(jìn)行反序列化就得到了 MapState 的 userValue。
這就是 RocksDB 模式下,MapState 的讀寫(xiě)流程。
3.3 RocksDB 模式下,ValueState 中存 Map 與 MapState 有什么區(qū)別?
■ 3.3.1 假設(shè) Map 集合有 100 個(gè) KV 鍵值對(duì),具體兩種方案會(huì)如何存儲(chǔ)數(shù)據(jù)?
ValueState 中存 Map,Flink 引擎會(huì)把整個(gè) Map 當(dāng)做一個(gè)大 Value,存儲(chǔ)在 RocksDB 中。對(duì)應(yīng)到 RocksDB 中,100 個(gè) KV 鍵值對(duì)的 Map 集合會(huì)序列化成一個(gè) byte 數(shù)組當(dāng)做 RocksDB 的 value,存儲(chǔ)在 RocksDB 的 1 行數(shù)據(jù)中。
MapState 會(huì)根據(jù) userKey,將 100 個(gè) KV 鍵值對(duì)分別存儲(chǔ)在 RocksDB 的 100 行中。
■ 3.3.2 修改 Map 中的一個(gè) KV 鍵值對(duì)的流程
ValueState 的情況,雖然要修改 Map 中的一個(gè) KV 鍵值對(duì),但需要將整個(gè) Map 集合從 RocksDB 中讀出來(lái)。具體流程如下:
1、將 key、namespace 序列化成 byte 數(shù)組,生成 RocksDB 的 key
2、從 RocksDB 讀出 key 對(duì)應(yīng) value 的 byte 數(shù)組
3、將 byte 數(shù)組反序列化成整個(gè) Map
4、堆內(nèi)存中修改 Map 集合
5、將 Map 集合寫(xiě)入到 RocksDB 中,需要將整個(gè) Map 集合序列化成 byte 數(shù)組,再寫(xiě)入
MapState 的情況,要修改 Map 中的一個(gè) KV 鍵值對(duì),根據(jù) key、namespace、userKey 即可定位到要修改的那一個(gè) KV 鍵值對(duì)。具體流程如下:
1、將 key、namespace、userKey 序列化成 byte 數(shù)組,生成 RocksDB 的 key
2、從 RocksDB 讀出 key 對(duì)應(yīng) value 的 byte 數(shù)組
3、將 byte 數(shù)組反序列化成 userValue
4、堆內(nèi)存中修改 userValue 的值
5、將 userKey、userValue 寫(xiě)入到 RocksDB 中,需要先序列化,再寫(xiě)入
■ 3.3.3 結(jié)論
要修改 Map 中的一個(gè) KV 鍵值對(duì):
如果使用 ValueState 中存 Map,則每次修改操作需要序列化反序列化整個(gè) Map 集合,每次序列化反序列大對(duì)象會(huì)非常耗 CPU,很容易將 CPU 打滿。
如果使用 MapState,每次修改操作只需要序列化反序列化 userKey 那一個(gè) KV 鍵值對(duì)的數(shù)據(jù),效率較高。
舉一反三:其他使用 ValueState、value 是大對(duì)象且 value 頻繁更新的場(chǎng)景,都容易將 CPU 打滿。
例如:ValueState 中存儲(chǔ)的位圖,如果每條數(shù)據(jù)都需要更新位圖,則可能導(dǎo)致 CPU 被打滿。
為了便于理解,上述忽略了一些實(shí)現(xiàn)細(xì)節(jié),下面補(bǔ)充一下。
3.4 直接拼接 key 和 namespace 可能導(dǎo)致 RocksDB 的 key 沖突
假設(shè) ValueState 中有兩個(gè)數(shù)據(jù):
· key1 序列化后的二進(jìn)制為 0x112233, namespace1 序列化后的二進(jìn)制為0x4455
· key2 序列化后的二進(jìn)制為 0x1122, namespace2 序列化后的二進(jìn)制為0x334455
這兩個(gè)數(shù)據(jù)對(duì)應(yīng)的 RocksDB key 都是 0x1122334455,這樣的話,兩個(gè)不同的 key、namespace 映射到 RocksDB 中變成了相同的數(shù)據(jù),無(wú)法做區(qū)分。
解決方案:
在 key 和 namespace 中間寫(xiě)入 key 的 byte 數(shù)組長(zhǎng)度,在 namespace 后寫(xiě)入 namespace 的 byte 長(zhǎng)度。
寫(xiě)入這兩個(gè)長(zhǎng)度就不可能出現(xiàn) key 沖突了,具體為什么,讀者可以自行思考。
3.5 RocksDB 的 key 中還會(huì)存儲(chǔ) KeyGroupId
對(duì) KeyGroup 不了解的同學(xué)可以參考:Flink 源碼:從 KeyGroup 到 Rescale。
加上 KeyGroupId 也比較簡(jiǎn)單。只需要修改 RocksDB key 的拼接方式,在序列化 key 和 namespace 之前,先序列化 KeyGroupId 即可。
4. State TTL 簡(jiǎn)述
Flink 中 TTL 的實(shí)現(xiàn),都是將用戶的 value 封裝了一層,具體參考下面的 TtlValue 類(lèi):
public class TtlValue<T> implements Serializable {@Nullableprivate final T userValue;private final long lastAccessTimestamp; }TtlValue 類(lèi)中有兩個(gè)字段,封裝了用戶的 value 且有一個(gè)時(shí)間戳字段,這個(gè)時(shí)間戳記錄了這條數(shù)據(jù)寫(xiě)入的時(shí)間。
如果開(kāi)啟了 TTL,則狀態(tài)中存儲(chǔ)的 value 就是 TtlValue 對(duì)象。時(shí)間戳字段也會(huì)保存到狀態(tài)引擎中,之后查詢(xún)數(shù)據(jù)時(shí),就可以通過(guò)該時(shí)間戳判斷數(shù)據(jù)是否過(guò)期。
· ValueState 將 value 封裝為 TtlValue。
· MapState 將 userValue 封裝成 TtlValue。
· ListState 將 element 封裝成 TtlValue。
ValueState 中存 Map 與 MapState 有什么區(qū)別?
如果 ValueState 中存 Map,則整個(gè) Map 被當(dāng)做 value,只維護(hù)一個(gè)時(shí)間戳。所以要么整個(gè) Map 過(guò)期,要么都不過(guò)期。
MapState 中如果存儲(chǔ)了 100 個(gè) KV 鍵值對(duì),則 100 個(gè) KV 鍵值對(duì)都會(huì)存儲(chǔ)各自的時(shí)間戳。因此每個(gè) KV 鍵值對(duì)的 TTL 是相互獨(dú)立的。
5.總結(jié)
本文從實(shí)現(xiàn)原理詳細(xì)分析了 ValueState 中存 Map 與 MapState 有什么區(qū)別?下面將從性能和 TTL 兩個(gè)維度來(lái)描述兩者的區(qū)別。
性能
· RocksDB 場(chǎng)景,MapState 比 ValueState 中存 Map 性能高很多,ValueState 中存大對(duì)象很容易使 CPU 打滿
· Heap State 場(chǎng)景,兩者性能類(lèi)似
TTL
Flink 中 State 支持設(shè)置 TTL,TTL 只是將時(shí)間戳與 userValue 封裝起來(lái)。
· MapState 的 TTL 是基于 UK 級(jí)別的
· ValueState 的 TTL 是基于整個(gè) key 的
不過(guò),其實(shí) ListState 的數(shù)據(jù)映射到 RocksDB 比較復(fù)雜,用到了 RocksDB 的 merge 特性,比較有意思,有興趣的同學(xué)可以閱讀 RocksDB wiki《Merge Operator Implementation》,鏈接:
https://github.com/facebook/rocksdb/wiki/Merge-Operator-Implementation
更多 Flink 技術(shù)交流可加入 Apache Flink 社區(qū)釘釘交流群:
原文鏈接:https://developer.aliyun.com/article/777445?
版權(quán)聲明:本文內(nèi)容由阿里云實(shí)名注冊(cè)用戶自發(fā)貢獻(xiàn),版權(quán)歸原作者所有,阿里云開(kāi)發(fā)者社區(qū)不擁有其著作權(quán),亦不承擔(dān)相應(yīng)法律責(zé)任。具體規(guī)則請(qǐng)查看《阿里云開(kāi)發(fā)者社區(qū)用戶服務(wù)協(xié)議》和《阿里云開(kāi)發(fā)者社區(qū)知識(shí)產(chǎn)權(quán)保護(hù)指引》。如果您發(fā)現(xiàn)本社區(qū)中有涉嫌抄襲的內(nèi)容,填寫(xiě)侵權(quán)投訴表單進(jìn)行舉報(bào),一經(jīng)查實(shí),本社區(qū)將立刻刪除涉嫌侵權(quán)內(nèi)容。總結(jié)
以上是生活随笔為你收集整理的Flink State 误用之痛,你中招了吗?的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 网易云音乐基于 Flink + Kafk
- 下一篇: 云湖共生,下一代数据湖来了?