日韩性视频-久久久蜜桃-www中文字幕-在线中文字幕av-亚洲欧美一区二区三区四区-撸久久-香蕉视频一区-久久无码精品丰满人妻-国产高潮av-激情福利社-日韩av网址大全-国产精品久久999-日本五十路在线-性欧美在线-久久99精品波多结衣一区-男女午夜免费视频-黑人极品ⅴideos精品欧美棵-人人妻人人澡人人爽精品欧美一区-日韩一区在线看-欧美a级在线免费观看

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 > 编程资源 > 编程问答 >内容正文

编程问答

信号量与令牌桶_限流的4种方式令牌桶实战

發布時間:2023/12/10 编程问答 32 豆豆
生活随笔 收集整理的這篇文章主要介紹了 信号量与令牌桶_限流的4种方式令牌桶实战 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

限流的4種方式

正文

限流

限流是對某一時間窗口內的請求數進行限制,保持系統的可用性和穩定性,防止因流量暴增而導致的系統運行緩慢或宕機。常用的限流算法有令牌桶和和漏桶,而Google開源項目Guava中的RateLimiter使用的就是令牌桶控制算法。

在開發高并發系統時有三把利器用來保護系統:緩存、降級和限流緩存:緩存的目的是提升系統訪問速度和增大系統處理容量

降級:降級是當服務器壓力劇增的情況下,根據當前業務情況及流量對一些服務和頁面有策略的降級,以此釋放服務器資源以保證核心任務的正常運行

限流:限流的目的是通過對并發訪問/請求進行限速,或者對一個時間窗口內的請求進行限速來保護系統,一旦達到限制速率則可以拒絕服務、排隊或等待、降級等處理

我們經常在調別人的接口的時候會發現有限制,比如微信公眾平臺接口、百度API Store、聚合API等等這樣的,對方會限制每天最多調多少次或者每分鐘最多調多少次

我們自己在開發系統的時候也需要考慮到這些,比如我們公司在上傳商品的時候就做了限流,因為用戶每一次上傳商品,我們需要將商品數據同到到美團、餓了么、京東、百度、自營等第三方平臺,這個工作量是巨大,頻繁操作會拖慢系統,故做限流。

以上都是題外話,接下來我們重點看一下令牌桶算法

令牌桶算法

下面是從網上找的兩張圖來描述令牌桶算法:

RateLimiter

RateLimiter的代碼不長,注釋加代碼432行,看一下RateLimiter怎么用

1 package com.cjs.example;

2

3 import com.google.common.util.concurrent.RateLimiter;

4 import org.springframework.web.bind.annotation.RequestMapping;

5 import org.springframework.web.bind.annotation.RestController;

6

7 import java.text.SimpleDateFormat;

8 import java.util.Date;

9

10 @RestController

11 public class HelloController {

12

13 private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

14

15 private static final RateLimiter rateLimiter = RateLimiter.create(2);

16

17 /**

18 * tryAcquire嘗試獲取permit,默認超時時間是0,意思是拿不到就立即返回false

19 */

20 @RequestMapping("/sayHello")

21 public String sayHello() {

22 if (rateLimiter.tryAcquire()) { // 一次拿1個

23 System.out.println(sdf.format(new Date()));

24 try {

25 Thread.sleep(500);

26 } catch (InterruptedException e) {

27 e.printStackTrace();

28 }

29 }else {

30 System.out.println("limit");

31 }

32 return "hello";

33 }

34

35 /**

36 * acquire拿不到就等待,拿到為止

37 */

38 @RequestMapping("/sayHi")

39 public String sayHi() {

40 rateLimiter.acquire(5); // 一次拿5個

41 System.out.println(sdf.format(new Date()));

42 return "hi";

43 }

44

45 }

關于RateLimiter:A rate limiter。每個acquire()方法如果必要的話會阻塞直到一個permit可用,然后消費它。獲得permit以后不需要釋放。

RateLimiter在并發環境下使用是安全的:它將限制所有線程調用的總速率。注意,它不保證公平調用。

RateLimiter在并發環境下使用是安全的:它將限制所有線程調用的總速率。注意,它不保證公平調用。Rate limiter(直譯為:速度限制器)經常被用來限制一些物理或者邏輯資源的訪問速率。這和java.util.concurrent.Semaphore正好形成對照。

一個RateLimiter主要定義了發放permits的速率。如果沒有額外的配置,permits將以固定的速度分配,單位是每秒多少permits。默認情況下,Permits將會被穩定的平緩的發放。

可以配置一個RateLimiter有一個預熱期,在此期間permits的發放速度每秒穩步增長直到到達穩定的速率

基本用法:

final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"

void submitTasks(List tasks, Executor executor) {

for (Runnable task : tasks) {

rateLimiter.acquire(); // may wait

executor.execute(task);

}

}

實現

SmoothBursty以穩定的速度生成permit

SmoothWarmingUp是漸進式的生成,最終達到最大值趨于穩定

源碼片段解讀:

public abstract class RateLimiter {

/**

* 用給定的吞吐量(“permits per second”)創建一個RateLimiter。

* 通常是QPS

*/

public static RateLimiter create(double permitsPerSecond) {

return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());

}

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {

RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);

rateLimiter.setRate(permitsPerSecond);

return rateLimiter;

}

/**

* 用給定的吞吐量(QPS)和一個預熱期創建一個RateLimiter

*/

public static RateLimiter create(double permitsPerSecond, long warmupPeriod, TimeUnit unit) {

checkArgument(warmupPeriod >= 0, "warmupPeriod must not be negative: %s", warmupPeriod);

return create(permitsPerSecond, warmupPeriod, unit, 3.0, SleepingStopwatch.createFromSystemTimer());

}

static RateLimiter create(

double permitsPerSecond,

long warmupPeriod,

TimeUnit unit,

double coldFactor,

SleepingStopwatch stopwatch) {

RateLimiter rateLimiter = new SmoothWarmingUp(stopwatch, warmupPeriod, unit, coldFactor);

rateLimiter.setRate(permitsPerSecond);

return rateLimiter;

}

private final SleepingStopwatch stopwatch;

// 鎖

private volatile Object mutexDoNotUseDirectly;

private Object mutex() {

Object mutex = mutexDoNotUseDirectly;

if (mutex == null) {

synchronized (this) {

mutex = mutexDoNotUseDirectly;

if (mutex == null) {

mutexDoNotUseDirectly = mutex = new Object();

}

}

}

return mutex;

}

/**

* 從RateLimiter中獲取一個permit,阻塞直到請求可以獲得為止

* @return 休眠的時間,單位是秒,如果沒有被限制則是0.0

*/

public double acquire() {

return acquire(1);

}

/**

* 從RateLimiter中獲取指定數量的permits,阻塞直到請求可以獲得為止

*/

public double acquire(int permits) {

long microsToWait = reserve(permits);

stopwatch.sleepMicrosUninterruptibly(microsToWait);

return 1.0 * microsToWait / SECONDS.toMicros(1L);

}

/**

* 預定給定數量的permits以備將來使用

* 直到這些預定數量的permits可以被消費則返回逝去的微秒數

*/

final long reserve(int permits) {

checkPermits(permits);

synchronized (mutex()) {

return reserveAndGetWaitLength(permits, stopwatch.readMicros());

}

}

private static void checkPermits(int permits) {

checkArgument(permits > 0, "Requested permits (%s) must be positive", permits);

}

final long reserveAndGetWaitLength(int permits, long nowMicros) {

long momentAvailable = reserveEarliestAvailable(permits, nowMicros);

return max(momentAvailable - nowMicros, 0);

}

}

abstract class SmoothRateLimiter extends RateLimiter {

/** The currently stored permits. */

double storedPermits;

/** The maximum number of stored permits. */

double maxPermits;

/**

* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits

* per second has a stable interval of 200ms.

*/

double stableIntervalMicros;

/**

* The time when the next request (no matter its size) will be granted. After granting a request,

* this is pushed further in the future. Large requests push this further than small requests.

*/

private long nextFreeTicketMicros = 0L; // could be either in the past or future

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {

resync(nowMicros);

long returnValue = nextFreeTicketMicros;

double storedPermitsToSpend = min(requiredPermits, this.storedPermits); // 本次可以獲取到的permit數量

double freshPermits = requiredPermits - storedPermitsToSpend; // 差值,如果存儲的permit大于本次需要的permit數量則此處是0,否則是一個正數

long waitMicros =

storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)

+ (long) (freshPermits * stableIntervalMicros); // 計算需要等待的時間(微秒)

this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);

this.storedPermits -= storedPermitsToSpend; // 減去本次消費的permit數

return returnValue;

}

void resync(long nowMicros) {

// if nextFreeTicket is in the past, resync to now

if (nowMicros > nextFreeTicketMicros) { // 表示當前可以獲得permit

double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); // 計算這段時間可以生成多少個permit

storedPermits = min(maxPermits, storedPermits + newPermits); // 如果超過maxPermit,則取maxPermit,否則取存儲的permit+新生成的permit

nextFreeTicketMicros = nowMicros; // 設置下一次可以獲得permit的時間點為當前時間

}

}

}

RateLimiter實現的令牌桶算法,不僅可以應對正常流量的限速,而且可以處理突發暴增的請求,實現平滑限流。

通過代碼,我們可以看到它可以預消費,怎么講呢

nextFreeTicketMicros表示下一次請求獲得permits的最早時間。每次授權一個請求以后,這個值會向后推移(PS:想象一下時間軸)即向未來推移。因此,大的請求會比小的請求推得更。這里的大小指的是獲取permit的數量。這個應該很好理解,因為上一次請求獲取的permit數越多,那么下一次再獲取授權時更待的時候會更長,反之,如果上一次獲取的少,那么時間向后推移的就少,下一次獲得許可的時間更短。可見,都是有代價的。正所謂:要浪漫就要付出代價。

還要注意到一點,就是獲取令牌和處理請求是兩個動作,而且,并不是每一次都獲取一個,也不要想當然的認為一個請求獲取一個permit(或者叫令牌),可以再看看前面那幅圖

Stopwatch

一個以納秒為單位度量流逝時間的對象。它是一個相對時間,而不是絕對時間。Stopwatch stopwatch = Stopwatch.createStarted();

System.out.println("hahah");

stopwatch.stop();

Duration duration = stopwatch.elapsed();

System.out.println(stopwatch);

Semaphore(信號量)A counting semaphore. Conceptually, a semaphore maintains a set of permits. Each acquire() blocks if necessary until a permit is available, and then takes it. Each release() adds a permit, potentially releasing a blocking acquirer. However, no actual permit objects are used; the Semaphore just keeps a count of the number available and acts accordingly.

一個信號量維護了一系列permits。

每次調用acquire()方法獲取permit,如果必要的話會阻塞直到有一個permit可用為止。

調用release()方法則會釋放自己持有的permit,即用完了再還回去。

信號量限制的是并發訪問臨界資源的線程數。

令牌桶算法 VS 漏桶算法

漏桶

漏桶的出水速度是恒定的,那么意味著如果瞬時大流量的話,將有大部分請求被丟棄掉(也就是所謂的溢出)。

令牌桶

生成令牌的速度是恒定的,而請求去拿令牌是沒有速度限制的。這意味,面對瞬時大流量,該算法可以在短時間內請求拿到大量令牌,而且拿令牌的過程并不是消耗很大的事情。

最后,不論是對于令牌桶拿不到令牌被拒絕,還是漏桶的水滿了溢出,都是為了保證大部分流量的正常使用,而犧牲掉了少部分流量,這是合理的,如果因為極少部分流量需要保證的話,那么就可能導致系統達到極限而掛掉,得不償失。

小定律:排隊理論the long-term average number L of customers in a stationary system is equal to the long-term average effective arrival rate λ multiplied by the average time W that a customer spends in the system. Expressed algebraically the law is:

在一個固定系統中,顧客的長期平均數量L等于顧客的長期平均到達速率λ乘以顧客在系統中平均花費的時間W。用公式表示為:

雖然這看起來很容易,但這是一個非常顯著的舉世矚目的結果,因為這種關系“不受到達過程的分布,服務分布,服務順序,或其他任何因素的影響”。這個結果適用于任何系統,特別是適用于系統內的系統。唯一的要求是系統必須是穩定的非搶占式的。

例子

例1:找響應時間

假設有一個應用程序沒有簡單的方法來度量響應時間。如果系統的平均數量和吞吐量是已知的,那么平均響應時間就是:

mean response time = mean number in system / mean throughput

平均響應時間 = 系統的平均數量 / 平均吞吐量.

例2:顧客在店里

想象一下,一家小商店只有一個柜臺和一個可供瀏覽的區域,每次只能有一個人在柜臺,并且沒有人不買東西就離開。

所以這個系統大致是:進入 --> 瀏覽 --> 柜臺結賬 --> 離開

在一個穩定的系統中,人們進入商店的速度就是他們到達商店的速度(我們叫做到達速度),它們離開的速度叫做離開速度。

相比之下,到達速度超過離開速度代表是一個不穩定的系統,這就會造成等待的顧客數量將逐漸增加到無窮大。

前面的小定律告訴我們,商店的平均顧客數量L等于有效的到達速度λ乘以顧客在商店的平均停留時間W。用公式表示為:

假設,顧客以每小時10個的速度到達,并且平均停留時間是0.5小時。那么這就意味著,任意時間商店的平均顧客數量是5

現在假設商店正在考慮做更多的廣告,把到達率提高到每小時20。商店必須準備好容納平均10人,或者必須將每個顧客在商店中的時間減少到0.25小時。商店可以通過更快地結帳或者增加更多的柜臺來達到后者的目的。

我們可以把前面的小定律應用到商店系統中。例如,考慮柜臺和在柜臺前排的隊。假設平均有2個人在柜臺前排隊,我們知道顧客到達速度是每小時10,所以顧客平均必須停留時間為0.2小時。

最后

這是單機(單進程)的限流,是JVM級別的的限流,所有的令牌生成都是在內存中,在分布式環境下不能直接這么用。

如果我們能把permit放到Redis中就可以在分布式環境中用了。

參考

總結

以上是生活随笔為你收集整理的信号量与令牌桶_限流的4种方式令牌桶实战的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。