分布式限流
前言
本文接著上文應用限流進行討論。
之前談到的限流方案只能針對于單個 JVM 有效,也就是單機應用。而對于現在普遍的分布式應用也得有一個分布式限流的方案。
基于此嘗試寫了這個組件:
https://github.com/crossoverJie/distributed-redis-tool
?
DEMO
以下采用的是
https://github.com/crossoverJie/springboot-cloud
來做演示。
在 Order 應用提供的接口中采取了限流。首先是配置了限流工具的 Bean:
@Configuration public class RedisLimitConfig {@Value("${redis.limit}")private int limit;@Autowiredprivate JedisConnectionFactory jedisConnectionFactory;@Beanpublic RedisLimit build() {RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection();JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection();RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster).limit(limit).build();return redisLimit;}}接著在 Controller 使用組件:
@Autowiredprivate RedisLimit redisLimit ;@Override@CheckReqNopublic BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) {BaseResponse<OrderNoResVO> res = new BaseResponse();//限流boolean limit = redisLimit.limit();if (!limit){res.setCode(StatusEnum.REQUEST_LIMIT.getCode());res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage());return res ;}res.setReqNo(orderNoReq.getReqNo());if (null == orderNoReq.getAppId()){throw new SBCException(StatusEnum.FAIL);}OrderNoResVO orderNoRes = new OrderNoResVO() ;orderNoRes.setOrderId(DateUtil.getLongTime());res.setCode(StatusEnum.SUCCESS.getCode());res.setMessage(StatusEnum.SUCCESS.getMessage());res.setDataBody(orderNoRes);return res ;}為了方便使用,也提供了注解:
? @Override@ControllerLimitpublic BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) {BaseResponse<OrderNoResVO> res = new BaseResponse();// 業務邏輯return res ;}該注解攔截了 http 請求,會再請求達到閾值時直接返回。
普通方法也可使用:
@CommonLimit public void doSomething(){}會在調用達到閾值時拋出異常。
為了模擬并發,在?User?應用中開啟了 10 個線程調用 Order(限流次數為5) 接口(也可使用專業的并發測試工具 JMeter 等)。
@Overridepublic BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) {//調用遠程服務OrderNoReqVO vo = new OrderNoReqVO();vo.setAppId(1L);vo.setReqNo(userReq.getReqNo());for (int i = 0; i < 10; i++) {executorService.execute(new Worker(vo, orderServiceClient));}UserRes userRes = new UserRes();userRes.setUserId(123);userRes.setUserName("張三");userRes.setReqNo(userReq.getReqNo());userRes.setCode(StatusEnum.SUCCESS.getCode());userRes.setMessage("成功");return userRes;}private static class Worker implements Runnable {private OrderNoReqVO vo;private OrderServiceClient orderServiceClient;public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) {this.vo = vo;this.orderServiceClient = orderServiceClient;}@Overridepublic void run() {BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo);logger.info("遠程返回:" + JSON.toJSONString(orderNo));}}為了驗證分布式效果啟動了兩個 Order 應用。
效果如下:?
?
實現原理
實現原理其實很簡單。既然要達到分布式全局限流的效果,那自然需要一個第三方組件來記錄請求的次數。
其中 Redis 就非常適合這樣的場景。
- 每次請求時將當前時間(精確到秒)作為 Key 寫入到 Redis 中,超時時間設置為 2 秒,Redis 將該 Key 的值進行自增。
- 當達到閾值時返回錯誤。
- 寫入 Redis 的操作用 Lua 腳本來完成,利用 Redis 的單線程機制可以保證每個 Redis 請求的原子性。
Lua 腳本如下:
--lua 下標從 1 開始 -- 限流 key local key = KEYS[1] -- 限流大小 local limit = tonumber(ARGV[1])-- 獲取當前流量大小 local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then-- 達到限流大小 返回return 0; else-- 沒有達到閾值 value + 1redis.call("INCRBY", key, 1)redis.call("EXPIRE", key, 2)return curentLimit + 1 endJava 中的調用邏輯:
? ?public boolean limit() {String key = String.valueOf(System.currentTimeMillis() / 1000);Object result = null;if (jedis instanceof Jedis) {result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));} else if (jedis instanceof JedisCluster) {result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));} else {//throw new RuntimeException("instance is error") ;return false;}if (FAIL_CODE != (Long) result) {return true;} else {return false;}}所以只需要在需要限流的地方調用該方法對返回值進行判斷即可達到限流的目的。
當然這只是利用 Redis 做了一個粗暴的計數器,如果想實現類似于上文中的令牌桶算法可以基于 Lua 自行實現。
?
Builder 構建器
在設計這個組件時想盡量的提供給使用者清晰、可讀性、不易出錯的 API。
比如第一步,如何構建一個限流對象。
最常用的方式自然就是構造函數,如果有多個域則可以采用重疊構造器的方式:
public A(){} public A(int a){} public A(int a,int b){}缺點也是顯而易見的:如果參數過多會導致難以閱讀,甚至如果參數類型一致的情況下客戶端顛倒了順序,但不會引起警告從而出現難以預測的結果。
第二種方案可以采用 JavaBean 模式,利用?setter?方法進行構建:
A a = new A(); a.setA(a); a.setB(b);這種方式清晰易讀,但卻容易讓對象處于不一致的狀態,使對象處于線程不安全的狀態。
所以這里采用了第三種創建對象的方式,構建器:
public class RedisLimit {private JedisCommands jedis;private int limit = 200;private static final int FAIL_CODE = 0;/*** lua script*/private String script;private RedisLimit(Builder builder) {this.limit = builder.limit ;this.jedis = builder.jedis ;buildScript();}/*** limit traffic* @return if true*/public boolean limit() {String key = String.valueOf(System.currentTimeMillis() / 1000);Object result = null;if (jedis instanceof Jedis) {result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));} else if (jedis instanceof JedisCluster) {result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit)));} else {//throw new RuntimeException("instance is error") ;return false;}if (FAIL_CODE != (Long) result) {return true;} else {return false;}}/*** read lua script*/private void buildScript() {script = ScriptUtil.getScript("limit.lua");}/*** ?the builder* @param <T>*/public static class Builder<T extends JedisCommands>{private T jedis = null ;private int limit = 200;public Builder(T jedis){this.jedis = jedis ;}public Builder limit(int limit){this.limit = limit ;return this;}public RedisLimit build(){return new RedisLimit(this) ;}}}這樣客戶端在使用時:
RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster).limit(limit).build();更加的簡單直接,并且避免了將創建過程分成了多個子步驟。
這在有多個構造參數,但又不是必選字段時很有作用。
因此順便將分布式鎖的構建器方式也一并更新了:
https://github.com/crossoverJie/distributed-redis-tool#features
更多內容可以參考 Effective Java
?
API
從上文可以看出,使用過程就是調用?limit?方法。
//限流boolean limit = redisLimit.limit();if (!limit){//具體限流邏輯}為了減少侵入性,也為了簡化客戶端提供了兩種注解方式。
@ControllerLimit
該注解可以作用于?@RequestMapping?修飾的接口中,并會在限流后提供限流響應。
實現如下:
@Component public class WebIntercept extends WebMvcConfigurerAdapter {private static Logger logger = LoggerFactory.getLogger(WebIntercept.class);@Autowiredprivate RedisLimit redisLimit;@Overridepublic void addInterceptors(InterceptorRegistry registry) {registry.addInterceptor(new CustomInterceptor()).addPathPatterns("/**");}private class CustomInterceptor extends HandlerInterceptorAdapter {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response,Object handler) throws Exception {if (redisLimit == null) {throw new NullPointerException("redisLimit is null");}if (handler instanceof HandlerMethod) {HandlerMethod method = (HandlerMethod) handler;ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class);if (annotation == null) {//skipreturn true;}boolean limit = redisLimit.limit();if (!limit) {logger.warn("request has bean limit");response.sendError(500, "request limit");return false;}}return true;}}}其實就是實現了 SpringMVC 中的攔截器,并在攔截過程中判斷是否有使用注解,從而調用限流邏輯。
前提是應用需要掃描到該類,讓 Spring 進行管理。
@ComponentScan(value = "com.crossoverjie.distributed.intercept")@CommonLimit
當然也可以在普通方法中使用。實現原理則是 Spring AOP (SpringMVC 的攔截器本質也是 AOP)。
@Aspect @Component @EnableAspectJAutoProxy(proxyTargetClass = true) public class CommonAspect {private static Logger logger = LoggerFactory.getLogger(CommonAspect.class);@Autowiredprivate RedisLimit redisLimit ;@Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)")private void check(){}@Before("check()")public void before(JoinPoint joinPoint) throws Exception {if (redisLimit == null) {throw new NullPointerException("redisLimit is null");}boolean limit = redisLimit.limit();if (!limit) {logger.warn("request has bean limit");throw new RuntimeException("request has bean limit") ;}}}很簡單,也是在攔截過程中調用限流。
當然使用時也得掃描到該包:
@ComponentScan(value = "com.crossoverjie.distributed.intercept")?
總結
限流在一個高并發大流量的系統中是保護應用的一個利器,成熟的方案也很多,希望對剛了解這一塊的朋友提供一些思路。
以上所有的源碼:
- https://github.com/crossoverJie/distributed-redis-tool
- https://github.com/crossoverJie/springboot-cloud
感興趣的朋友可以點個 Star 或是提交 PR。
總結
- 上一篇: 应用限流
- 下一篇: 动手实现一个 LRU cache