日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當(dāng)前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

MongoDB 4.2 内核解析 - Change Stream

發(fā)布時間:2024/8/23 编程问答 43 豆豆
生活随笔 收集整理的這篇文章主要介紹了 MongoDB 4.2 内核解析 - Change Stream 小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.

MongoDB 從3.6版本開始支持了 Change Stream 能力(4.0、4.2 版本在能力上做了很多增強),用于訂閱 MongoDB 內(nèi)部的修改操作,change stream 可用于 MongoDB 之間的增量數(shù)據(jù)遷移、同步,也可以將 MongoDB 的增量訂閱應(yīng)用到其他的關(guān)聯(lián)系統(tǒng);比如電商場景里,MongoDB 里存儲新的訂單信息,業(yè)務(wù)需要根據(jù)新增的訂單信息去通知庫存管理系統(tǒng)發(fā)貨。

Change Stream 與 Tailing Oplog 對比

在 change stream 功能之前,如果要獲取 MongoDB 增量的修改,可以通過不斷?tailing oplog? 的方式來?拉取增量的 oplog?,然后針對拉取到的 oplog 集合,來過濾滿足條件的 oplog。這種方式也能滿足絕大部分場景的需求,但存在如下的不足。

  • 使用門檻較高,用戶需要針對 oplog 集合,打開特殊選項的的 tailable cursor? ("tailable": true, "awaitData" : true)。
  • 用戶需要自己管理增量續(xù)傳,當(dāng)拉取應(yīng)用 crash 時,用戶需要記錄上一條拉取oplog的 ts、h 等字段,在下一次先定位到指定 oplog 再繼續(xù)拉取。
  • 結(jié)果過濾必須在拉取側(cè)完成,但只需要訂閱部分 oplog 時,比如針對某個 DB、某個 Collection、或某種類型的操作,必須要把左右的 oplog 拉取到再進行過濾。
  • 對于 update 操作,oplog 只包含操作的部分內(nèi)容,比如?{$set: {x: 1}}?,而應(yīng)用經(jīng)常需要獲取到完整的文檔內(nèi)容。
  • 不支持 Sharded Cluster 的訂閱,用戶必須針對每個 shard 進行 tailing oplog,并且這個過程中不能有 moveChunk 操作,否則結(jié)果可能亂序。
  • MongoDB Change Stream 解決了 Tailing oplog 存在的不足

  • 簡單易用,提供統(tǒng)一的 Change Stream API,一次 API 調(diào)用,即可從 MongoDB Server 側(cè)獲取增量修改。
  • 統(tǒng)一的進度管理,通過 resume token 來標(biāo)識拉取位置,只需在 API 調(diào)用時,帶上上次結(jié)果的 resume token,即可從上次的位置接著訂閱。
  • 支持對結(jié)果在 Server 端進行 pipeline 過濾,減少網(wǎng)絡(luò)傳輸,支持針對 DB、Collection、OperationType 等維度進行結(jié)果過濾。
  • 支持 fullDocument: "updateLookup" 選項,對于 update,返回當(dāng)時對應(yīng)文檔的完整內(nèi)容。
  • 支持 Sharded Cluster 的修改訂閱,相同的 API 請求發(fā)到 mongos ,即可獲取集群維度全局有序的修改。
  • Change Stream 實戰(zhàn)

    以 Mongo shell 為例,使用 Change Stream 非常簡單,mongo shell 封裝了針對整個實例、DB、Collection 級別的訂閱操作。

    db.getMongo().watch() 訂閱整個實例的修改 db.watch() 訂閱指定DB的修改 db.collection.watch() 訂閱指定Collection的修改
  • 新建連接1發(fā)起訂閱操作
  • mytest:PRIMARY>db.coll.watch([], {maxAwaitTimeMS: 60000}) 最多阻塞等待 1分鐘
  • 新建連接2寫入新數(shù)據(jù)
  • ? ? ? ? ?

    mytest:PRIMARY> db.coll.insert({x: 100}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 101}) WriteResult({ "nInserted" : 1 }) mytest:PRIMARY> db.coll.insert({x: 102}) WriteResult({ "nInserted" : 1 })
  • 連接1上收到 Change Stream 更新
  • mytest:PRIMARY> db.watch([], {maxAwaitTimeMS: 60000}) { "_id" : { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934389, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9"), "x" : 100 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e353be5c36d695042c9") } } { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

    ?

  • 上述 ChangeStream 結(jié)果里,_id 字段的內(nèi)容即為 resume token,標(biāo)識著 oplog 的某個位置,如果想從某個位置繼續(xù)訂閱,在 watch 時,通過 resumeAfter 指定即可。比如每個應(yīng)用訂閱了上述3條修改,但只有第一條已經(jīng)成功消費了,下次訂閱時指定第一條的 resume token 即可再次訂閱到接下來的2條。
  • ? ??

    mytest:PRIMARY> db.coll.watch([], {maxAwaitTimeMS: 60000, resumeAfter: { "_data" : "825E0D5E35000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E353BE5C36D695042C90004" }}) { "_id" : { "_data" : "825E0D5E37000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E373BE5C36D695042CA0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934391, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca"), "x" : 101 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e373be5c36d695042ca") } } { "_id" : { "_data" : "825E0D5E39000000012B022C0100296E5A1004EA4E00977BCC482FB44DEED9A3C2999946645F696400645E0D5E393BE5C36D695042CB0004" }, "operationType" : "insert", "clusterTime" : Timestamp(1577934393, 1), "fullDocument" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb"), "x" : 102 }, "ns" : { "db" : "test", "coll" : "coll" }, "documentKey" : { "_id" : ObjectId("5e0d5e393be5c36d695042cb") } }

    Change Stream 內(nèi)部實現(xiàn)

    watch() wrapper

    db.watch() 實際上是一個 API wrapper,實際上 Change Stream 在 MongoDB 內(nèi)部實際上是一個 aggregation 命令,只是加了一個特殊的?$changestream? 階段,在發(fā)起 change stream 訂閱操作后,可通過 db.currentOp() 看到對應(yīng)的 aggregation/getMore 操作的詳細(xì)參數(shù)。

    {"op" : "getmore","ns" : "test.coll","command" : {"getMore" : NumberLong("233479991942333714"),"collection" : "coll","maxTimeMS" : 50000,"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},},"planSummary" : "COLLSCAN","cursor" : {"cursorId" : NumberLong("233479991942333714"),"createdDate" : ISODate("2019-12-31T06:35:52.479Z"),"lastAccessDate" : ISODate("2019-12-31T06:36:09.988Z"),"nDocsReturned" : NumberLong(1),"nBatchesReturned" : NumberLong(1),"noCursorTimeout" : false,"tailable" : true,"awaitData" : true,"originatingCommand" : {"aggregate" : "coll","pipeline" : [{"$changeStream" : {"fullDocument" : "default"}}],"cursor" : {},"lsid" : {"id" : UUID("e4fffa71-e168-4527-be61-f0918849d107")},"$clusterTime" : {"clusterTime" : Timestamp(1577774144, 1),"signature" : {"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),"keyId" : NumberLong(0)}},"$db" : "test"},"operationUsingCursorId" : NumberLong(7019500)},"numYields" : 2,"locks" : {}}

    resume token

    resume token 用來描述一個訂閱點,本質(zhì)上是 oplog 信息的一個封裝,包含 clusterTime、uuid、documentKey等信息,當(dāng)訂閱 API 帶上 resume token 時,MongoDB Server 會將 token 轉(zhuǎn)換為對應(yīng)的信息,并定位到 oplog 起點繼續(xù)訂閱操作。

    struct ResumeTokenData {Timestamp clusterTime;int version = 0;size_t applyOpsIndex = 0;Value documentKey;boost::optional<UUID> uuid; };

    ResumeTokenData 結(jié)構(gòu)里包含 version 信息,在 4.0.7 以前的版本,version 均為0; 4.0.7 引入了一種新的 resume token 格式,version 為 1; 另外在 3.6 版本里,Resume Token 的編碼與 4.0 也有所不同;所以在版本升級后,有可能出現(xiàn)不同版本 token 無法識別的問題,所以盡量要讓 MongoDB Server 所有組件(Replica Set 各個成員,ConfigServer、Mongos)都保持相同的內(nèi)核版本。

    updateLookup

    Change Stream 支持針對 update 操作,獲取當(dāng)前的文檔完整內(nèi)容,而不是僅更新操作本身,比如

    mytest:PRIMARY> db.coll.find({_id: 101}) { "_id" : 101, "name" : "jack", "age" : 18 } mytest:PRIMARY> db.coll.update({_id: 101}, {$set: {age: 20}}) WriteResult({ "nMatched" : 1, "nUpserted" : 0, "nModified" : 1 })

    上面的 update 操作,默認(rèn)情況下,change stream 會收到??{_id: 101}, {$set: {age: 20}? 的內(nèi)容,而并不會包含這個文檔其他未更新字段的信息;而加上 fullDocument: "updateLookup" 選項后,Change Stream 會根據(jù)文檔 _id 去查找文檔當(dāng)前的內(nèi)容并返回。

    需要注意的是,updateLookup 選項只能保證最終一致性,比如針對上述文檔,如果連續(xù)更新100次,update 的 change stream 并不會按順序收到中間每一次的更新,因為每次都是去查找文檔當(dāng)前的內(nèi)容,而當(dāng)前的內(nèi)容可能已經(jīng)被后續(xù)的修改覆蓋。

    Sharded cluster

    Change Stream 支持針對 sharded cluster 進行訂閱,會保證全局有序的返回結(jié)果;為了達(dá)到全局有序這個目標(biāo),mongos 需要從每個 shard 都返回訂閱結(jié)果按時間戳進行排序合并返回。

    在極端情況下,如果某些 shard 寫入量很少或者沒有寫入,change stream 的返回延時會受到影響,因為需要等到所有 shard 都返回訂閱結(jié)果;默認(rèn)情況下,mongod server 每10s會產(chǎn)生一條 Noop 的特殊oplog,這個機制會間接驅(qū)動 sharded cluster 在寫入量不高的情況下也能持續(xù)運轉(zhuǎn)下去。

    由于需要全局排序,在 sharded cluster 寫入量很高時,Change Stream 的性能很可能跟不上;如果對性能要求非常高,可以考慮關(guān)閉 Balancer,在每個 shard 上各自建立 Change Stream。


    原文鏈接
    本文為阿里云原創(chuàng)內(nèi)容,未經(jīng)允許不得轉(zhuǎn)載。

    ?

    總結(jié)

    以上是生活随笔為你收集整理的MongoDB 4.2 内核解析 - Change Stream的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。

    如果覺得生活随笔網(wǎng)站內(nèi)容還不錯,歡迎將生活随笔推薦給好友。

    主站蜘蛛池模板: 人妻无码久久精品人妻 | 欧美一区免费看 | 久久不射网站 | 色94色欧美sute亚洲线路二 | 精品无码久久久久 | 天天干夜夜看 | 99国产成人精品 | 播金莲一级淫片aaaaaaa | 高清国产一区二区 | 亚洲国产精品一 | 午夜精品极品粉嫩国产尤物 | 污的视频在线观看 | japanese在线观看 | 免费a在线| 少妇导航av| 色综网| 欧洲女女同videos | 中文字幕亚洲日本 | 男生和女生差差的视频 | 欧美性猛交ⅹ乱大交3 | 亚洲午夜精品一区 | 国产成人在线精品 | 1000部拍拍拍18勿入免费视频 | 日韩精品免费在线 | 久久人人添人人爽添人人片 | 欧美日韩影院 | 国产一级做a爱片久久毛片a | 精品欧美一区二区久久久 | 国产亚州av| 欧美日韩黄色一级片 | 麻豆影视国产在线观看 | 成人在线观看免费爱爱 | 亚洲欧美精品一区二区三区 | 久久偷拍免费视频 | 欧洲美女粗暴牲交免费观看 | 久久亚洲av午夜福利精品一区 | 一卡二卡三卡四卡 | 亚洲精品国产精品乱码不99热 | 在线激情网站 | 一级在线视频 | 打开免费观看视频在线播放 | 天堂网站 | 97在线播放免费观看 | 性视频一区 | 亚洲欧美精品 | 久久亚洲免费视频 | 男性裸体全身精光gay | 午夜国产免费 | 人人妻人人玩人人澡人人爽 | 成人网免费视频 | 热久久在线 | 美女超碰在线 | 国产成人av无码精品 | av香蕉 | 黄色小视频在线观看免费 | 亚洲免费看av | 成人午夜激情网 | 无码精品人妻一区二区三区漫画 | 麻豆黄色网址 | 欧美在线观看免费高清 | 国产精品国产三级国产专区51区 | 人操人操 | 韩国三级bd高清中字2021 | 1024毛片| 人人澡超碰碰 | 找个毛片看看 | 午夜第一页 | 免费三片在线播放 | 久久久久久久久久一区二区三区 | 国产精品羞羞答答 | 贝利弗山的秘密1985版免费观看 | 国产成人av一区二区三区不卡 | 户外露出一区二区三区 | 玖玖爱在线观看 | 欧美日韩精 | 国产免费一区二区三区在线播放 | 中文字幕15页 | 黄色小视频在线免费观看 | 青青草原在线免费观看视频 | www亚洲精品 | 久操免费在线视频 | 国产精品久久久久久亚洲调教 | 国产成人a∨ | 色欲久久久天天天精品综合网 | 九九九久久久精品 | 逼特逼在线视频 | 欧美大片一级 | 欧美激情国产一区 | 一级视频在线免费观看 | 成人h动漫精品一区二区无码 | 日日噜夜夜噜 | 精品视频久久久 | 98在线视频 | 日韩三级a | 亚洲男女视频在线观看 | 欧美黑吊大战白妞欧美大片 | 精品免费国产一区二区三区 | 亚洲第一成人在线 | 日本美女啪啪 |