日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

alibaba sentinel限流组件 源码分析

發布時間:2025/7/14 编程问答 37 豆豆
生活随笔 收集整理的這篇文章主要介紹了 alibaba sentinel限流组件 源码分析 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

如何使用?

maven引入:

<dependency>
  <groupId>com.alibaba.csp</groupId>
  <artifactId>sentinel-core</artifactId>
  <version>1.5.1</version>
</dependency>

該組件是保護資源用,什么資源呢?Conceptually, physical or logical resource,明白了吧。

web應用中大部分情況都是用于保護接口,防止負載過大,支持限流、降級處理,規則可配。

入門級使用方法如下:

public void foo() {Entry entry = null;try {entry = SphU.entry("abc");// resource that need protection} catch (BlockException blockException) {// when goes there, it is blocked// add blocked handle logic here} catch (Throwable bizException) {// business exception Tracer.trace(bizException);} finally {// ensure finally be executedif (entry != null){entry.exit();}}}

或者

public void foo() {
  if (SphO.entry("abc")) {
  try {
    // business logic
  } finally {
    SphO.exit(); // must exit()
  }
  } else {
    // failed to enter the protected resource.
  }
}

入口為com.alibaba.csp.sentinel.SphU和com.alibaba.csp.sentinel.SphO。兩者的區別從使用就可以看出,限流發生時,SphU是通過異常的形式反饋出來,SphO是通過entry的返回值反饋出來的。

查看SphO的代碼,如下圖,其實是內部捕獲了異常,然后返回boolean類型。所以,我們就從SphU開始分析吧。

SphU

最終會調用到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)這個方法。

里面的核心為調用com.alibaba.csp.sentinel.CtSph.lookProcessChain(ResourceWrapper)獲取對應的處理鏈,方法如下:

從上面代碼可知,一個chain關聯一個資源,這一點很重要,后面分析node節點結構時會用到

public static ProcessorSlotChain newSlotChain() {if (builder != null) {return builder.build();}// 構建chainBuilder,具體說明見下面代碼resolveSlotChainBuilder();if (builder == null) {RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");builder = new DefaultSlotChainBuilder();}return builder.build();}private static void resolveSlotChainBuilder() {List<SlotChainBuilder> list = new ArrayList<SlotChainBuilder>();boolean hasOther = false;
     // java.util.ServiceLoader 方式擴展功能,這是jdk的,簡稱spi擴展,sentinel有很多地方用到這種擴展方式
     // 關于ServiceLoader擴展的詳解不在這里討論,后面有時間可以單獨拎出來分析下
for (SlotChainBuilder builder : LOADER) {if (builder.getClass() != DefaultSlotChainBuilder.class) {hasOther = true;list.add(builder);}}if (hasOther) {builder = list.get(0);} else {// No custom builder, using default.builder = new DefaultSlotChainBuilder();}RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "+ builder.getClass().getCanonicalName());}

通過上面代碼可知,一般情況下采用的默認chainBuilder,那我們就來看看這個build:

通過上面com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)方法可知,最終調用方式如下:

通過entry調用把chain中的所有ProcessorSlot串起來,挨個調用。

主體已經清晰了,重點其實也就是chain中添加的那些ProcessorSlot,下面我們就一個個看下這些slot到底干了啥,劃下重點,StatisticSlot這個slot是核心,用于統計各種數量。

?NodeSelectorSlot

構建調用資源調用路徑,最終在內存中會形成一個樹狀結構。

ContextUtil.enter("entrance1", "appA");Entry nodeA = SphU.entry("nodeA");if (nodeA != null) {nodeA.exit();}ContextUtil.exit();

上述代碼對象結構如下:

      machine-root
        /
       /
   EntranceNode1
      /
     /
DefaultNode(nodeA)- - - - - -> ClusterNode(nodeA);

ContextUtil.enter("entrance1", "appA");Entry nodeA = SphU.entry("nodeA");if (nodeA != null) {nodeA.exit();}ContextUtil.exit();ContextUtil.enter("entrance2", "appA");nodeA = SphU.entry("nodeA");if (nodeA != null) {nodeA.exit();} ContextUtil.exit();

上述代碼會形成結構如下:

? ? ? ? ? ? ? ? machine-root
? ? ? ? ? ? ? ? ? /? ? ? ? ? ? \
? ? ? ? ? ? ? ? /? ? ? ? ? ? ? ? \
? ?EntranceNode1? ? ?EntranceNode2
? ? ? ? ? ? ?/? ? ? ? ? ? ? ? ? ? ? ?\
    /     ? ? ? ?  \
DefaultNode(nodeA) DefaultNode(nodeA)
    |          ?|
    +- - - - - - - - - - +- - - - - - -> ClusterNode(nodeA);

ContextUtil.enter("entrance1", "appA");Entry nodeA = null;try {nodeA = SphU.entry("nodeA");Entry nodeB = null;try {nodeB = SphU.entry("nodeB");}finally {if (nodeB != null) {nodeB.exit();}}}finally {if (nodeA != null) {nodeA.exit();}} ContextUtil.exit();

上述代碼會形成結構如下:

? ? ? ? ?   ? ? ? machine-root
? ? ? ? ?   ? ? ? ? /? ? ? ??
? ? ? ?   ? ? ? ? /? ? ? ? ??
?   ?EntranceNode1?
? ?   ? ? ? ? ?/? ? ? ? ??
      /    
  DefaultNode(nodeA)?- - - - - -> ClusterNode(nodeA);

    /

   /

DefaultNode(nodeB)- - - - - -> ClusterNode(nodeB);

說明:

a.一條路徑對應一個Context;如果是入門的那種使用方法,即沒有顯式使用ContextUtil.enter("entrance1", "appA")方式創建context的話,會默認使用MyContextUtil.myEnter創建一個名為sentinel_default_context的context;

源碼釋義:

一次資源的訪問都會走到com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...),方法開始處通過ContextUtil.getContext()從threadlocal中獲取context,ContextUtil.enter會檢查threadlocal有沒有context,沒有就創建,有就直接返回。如果限流時沒有調用ContextUtil.enter顯式開啟context,就會走到下面MyContextUtil.myEnter處創建默認名為

sentinel_default_context的context。最終都會調用到com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String),創建context部分代碼如下,每一個context會先創建一個EntranceNode入口node,然后掛到Constants.ROOT下,結構見上面的樹狀圖。

?

b.一次資源的調用會根據threadlocal獲取Context,同一線程下如果要切換context必須調用ContextUtil.exit();結束上一個context,再ContextUtil.enter("entrance2", "appA")開啟新的context;

源碼釋義:

context的獲取見上面一條釋義,這里看下ContextUtil.exit()方法,要想真正的將threadlocal清空,還得將context的CurEntry清空,怎么做?就是按照entry的調用順序反向依次調用com.alibaba.csp.sentinel.CtEntry.exit(int, Object...):

c.一個DefaultNode對應同一個資源調用在某一個Context下的統計數據;

源碼釋義:

首先,一次資源的entry調用會先去尋找ProcessorSlotChain,查看代碼可知,chain是以ResourceWrapper為key緩存在map中的,由DefaultSlotChainBuilder可以對應到一次entry調用對應一個NodeSelectorSlot實例,而NodeSelectorSlot中緩存了一個以contextName為key,DefaultNode為value的map,一句話總結下就是先去com.alibaba.csp.sentinel.context.ContextUtil.trueEnter(String, String)方法中通過contextNameNodeMap獲取以contextName緩存的該name對應的EntranceNode節點并以該EntranceNode為參數new一個context對象返回,然后在該資源對應的slotChain中的NodeSelectorSlot獲取一contextName為key緩存的DefaultNode。

這里解釋下DefaultNode第一次是怎么掛到context的EntranceNode下的。

代碼如下,重點是紅框中的代碼,獲取context中最末尾的節點,并把當前節點掛在后面。

我們再看下context的getLastNode節點,代碼如下:

由上面代碼自然會想的curEntry是什么時候設置的呢?如下圖所示,是在com.alibaba.csp.sentinel.CtSph.entryWithPriority(ResourceWrapper, int, boolean, Object...)中構建CtEntry時設置的,entry之間的關系在context中是以雙向鏈表結構維護的:

回到上一步,我們看下entry的getLastNode方法,如下圖所示,從parentEntry中獲取curNode,如果是第一次調用的話parent肯定為null,回到上一步的話就是返回該context的entranceNode。

NodeSelectorSlot中就會將當次entry在該context下首次調用創建的DefaultNode掛到剛剛獲取到的entranceNode節點下,也就是該context的入口節點下,如果該context下之前有過別的資源調用,就會順序掛到那次調用產生的DefaultNode下面,形成上面描述的樹狀結構圖。

DefaultNode處理完后會將context的CurNode設置為該DefaultNode,如下圖所示:

實際上是設置的context中當前entry對應的CurNode,如下圖所示,所以上面getLastNode是從parenEntry中獲取的CurNode。

d.ClusterNode對應同一個資源在所有context下的統計數據;

源碼釋義:

ClusterBuilderSlot中維護的一個實例變量實際就是對應一個資源,原因上面已經分析了,一個資源對應一個slotChain,然后將該clusterNode設置到本次調用對應的DefaultNode中,在DefaultNode做加法操作時,會同時調用clusterNode的加法操作,這樣,分布在不同context下的同一個資源對應的所有DefaultNode都會去調用clusterNode去匯總統計相同資源的統計數據。

e.一個資源對應一個ProcessChain,自然就對應一個NodeSelectorSlot實例,所以在NodeSelectorSlot里面DefaultNode node = map.get(context.getName());這行代碼的一維是資源,二維才是context,如果搞反了可能看到這行會蒙圈;

源碼釋義:

見c的分析。

ClusterBuilderSlot

構建ClusterNode并設置到上面slot選中的node中,如果context中有設置調用來源Origin就創建一個StatisticNode針對具體某個調用方統計數據

Note that 'origin' usually is Service Consumer's app name:

StatisticSlot

核心slot,用于統計各種數據的地方,然后被后面的slot應用。

先調用fireEntry執行后面的slot,檢查本次請求能否通過,如果通過,就給對應的node做加法操作。

先給自身node做加法,在給ClusterBuilderSlot中創建并傳入的closterNode做加法。

最終調用ArrayMetric的addPass:

需要申明的是,sentinel統計采用的是滑動窗口的實現方式,這里的重點是com.alibaba.csp.sentinel.slots.statistic.base.LeapArray.currentWindow(long)方法,我們看下如何獲取當前窗口。

// 根據當前時間戳計算當前窗口的索引
private
int calculateTimeIdx(/*@Valid*/ long timeMillis) {
     // 索引從0開始,整除后得到的結果就是當前時間戳所在的窗口,實際上我們的窗口只有array長度的固定幾個,
     // 可以想象一下這是在一個無限延長的虛擬時間線窗口,再對array的length取余數就得到了實際所在的窗口索引
long timeId = timeMillis / windowLengthInMs;// Calculate current index so we can map the timestamp to the leap array.return (int)(timeId % array.length());}protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
     // 計算虛擬窗口對應的開始時間,當前時間減去超出當前窗口的那一段時間就得到開始時間
return timeMillis - timeMillis % windowLengthInMs;}/*** Get bucket item at provided timestamp.** @param timeMillis a valid timestamp in milliseconds* @return current bucket item at provided timestamp if the time is valid; null if time is invalid*/public WindowWrap<T> currentWindow(long timeMillis) {if (timeMillis < 0) {return null;}// 獲取當前窗口索引int idx = calculateTimeIdx(timeMillis);// Calculate current bucket start time.
     // 獲取窗口開始時間
long windowStart = calculateWindowStart(timeMillis);/** Get bucket item at given time from the array.** (1) Bucket is absent, then just create a new bucket and CAS update to circular array.* (2) Bucket is up-to-date, then just return the bucket.* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.*/while (true) {WindowWrap<T> old = array.get(idx);
       // 如果索引對應的窗口還沒有創建就新建窗口對象
if (old == null) {/** B0 B1 B2 NULL B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* bucket is empty, so create new and update** If the old bucket is absent, then we create a new bucket at {@code windowStart},* then try to update circular array via a CAS operation. Only one thread can* succeed to update, while other threads yield its time slice.*/WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
         // 原子操作,如果設置成功就返回,如果設置失敗就讓出cpu,等待再次被翻牌
if (array.compareAndSet(idx, null, window)) {// Successfully updated, return the created bucket.return window;} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
       // 如果當前窗口時間一致,說明當前窗口還未過期,就返回該窗口對象}
else if (windowStart == old.windowStart()) {/** B0 B1 B2 B3 B4* ||_______|_______|_______|_______|_______||___* 200 400 600 800 1000 1200 timestamp* ^* time=888* startTime of Bucket 3: 800, so it's up-to-date** If current {@code windowStart} is equal to the start timestamp of old bucket,* that means the time is within the bucket, so directly return the bucket.*/return old;
       // 如果窗口開始時間過期了,就重置當前窗口的開始時間為最新的時間}
else if (windowStart > old.windowStart()) {/** (old)* B0 B1 B2 NULL B4* |_______||_______|_______|_______|_______|_______||___* ... 1200 1400 1600 1800 2000 2200 timestamp* ^* time=1676* startTime of Bucket 2: 400, deprecated, should be reset** If the start timestamp of old bucket is behind provided time, that means* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.* Note that the reset and clean-up operations are hard to be atomic,* so we need a update lock to guarantee the correctness of bucket update.** The update lock is conditional (tiny scope) and will take effect only when* bucket is deprecated, so in most cases it won't lead to performance loss.*/if (updateLock.tryLock()) {try {// Successfully get the update lock, now we reset the bucket.return resetWindowTo(old, windowStart);} finally {updateLock.unlock();}} else {// Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield();}
       // 正常情況下不會走這里,因為時間是往前走的}
else if (windowStart < old.windowStart()) {// Should not go through here, as the provided time is already behind.return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));}}}

最終是在MetricBucket中用LongAdder做的原子加,LongAdder是jdk8的特性,這里sentinel直接挪了過來,避免要求sentinel用戶必須使用jdk8.為什么是LongAdder而不是AtomicLong,因為前者在并發下表現更優異,具體區別請自行了解。

SystemSlot

通過之前統計節點中Constants.ENTRY_NODE這個node中的數據檢查全局qps等是否滿足要求。

AuthoritySlot

黑白名單匹配。通過com.alibaba.csp.sentinel.slots.block.authority.AuthorityRuleManager.loadRules(List<AuthorityRule>)設置規則。

FlowSlot

流量控制。通過com.alibaba.csp.sentinel.slots.block.flow.FlowRuleManager.loadRules(List<FlowRule>)設置規則。

DegradeSlot

降級處理。通過com.alibaba.csp.sentinel.slots.block.degrade.DegradeRuleManager.loadRules(List<DegradeRule>)設置規則。

轉載于:https://www.cnblogs.com/restart30/p/10796725.html

總結

以上是生活随笔為你收集整理的alibaba sentinel限流组件 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。