前言
在一個高並發系統中對流量的把控是非常重要的,當巨大的流量直接請求到我們的伺服器上沒多久就可能造成接口不可用,不處理的話甚至會造成整個應用不可用。
比如最近就有個這樣的需求,我作為客戶端要向kafka生產數據,而kafka的消費者則再源源不斷的消費數據,並將消費的數據全部請求到web伺服器,雖說做了負載(有4臺web伺服器)但業務數據的量也是巨大的,每秒鐘可能有上萬條數據產生。如果生產者直接生產數據的話極有可能把web伺服器拖垮。
對此就必須要做限流處理,每秒鐘生產一定限額的數據到kafka,這樣就能極大程度的保證web的正常運轉。
其實不管處理何種場景,本質都是降低流量保證應用的高可用。
常見算法
對於限流常見有兩種算法:
漏桶算法比較簡單,就是將流量放入桶中,漏桶同時也按照一定的速率流出,如果流量過快的話就會溢出(漏桶並不會提高流出速率)。溢出的流量則直接丟棄。
如下圖所示:
這種做法簡單粗暴。
漏桶算法雖說簡單,但卻不能應對實際場景,比如突然暴增的流量。
這時就需要用到令牌桶算法:
令牌桶會以一個恆定的速率向固定容量大小桶中放入令牌,當有流量來時則取走一個或多個令牌。當桶中沒有令牌則將當前請求丟棄或阻塞。
相比之下令牌桶可以應對一定的突發流量。
RateLimiter實現
對於令牌桶的代碼實現,可以直接使用Guava包中的RateLimiter。
@Override public BaseResponse<UserResVO> getUserByFeignBatch(@RequestBody UserReqVO userReqVO) { //調用遠程服務 OrderNoReqVO vo = new OrderNoReqVO() ; vo.setReqNo(userReqVO.getReqNo()); RateLimiter limiter = RateLimiter.create(2.0) ; //批量調用 for (int i = 0 ;i< 10 ; i++){ double acquire = limiter.acquire(); logger.debug("獲取令牌成功!,消耗=" + acquire); BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNo(vo); logger.debug("遠程返回:"+JSON.toJSONString(orderNo)); } UserRes userRes = new UserRes() ; userRes.setUserId(123); userRes.setUserName("張三"); userRes.setReqNo(userReqVO.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes ; }
詳見此。
調用結果如下:
代碼可以看出以每秒向桶中放入兩個令牌,請求一次消耗一個令牌。所以每秒鐘只能發送兩個請求。按照圖中的時間來看也確實如此(返回值是獲取此令牌所消耗的時間,差不多也是每500ms一個)。
使用RateLimiter有幾個值得注意的地方:
允許先消費,後付款,意思就是它可以來一個請求的時候一次性取走幾個或者是剩下所有的令牌甚至多取,但是後面的請求就得為上一次請求買單,它需要等待桶中的令牌補齊之後才能繼續獲取令牌。
總結
針對於單個應用的限流 RateLimiter 夠用了,如果是分布式環境可以藉助 Redis 來完成。
來做演示。
在 Order 應用提供的接口中採取了限流。首先是配置了限流工具的 Bean:
@Configuration public class RedisLimitConfig { @Value("${redis.limit}") private int limit; @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public RedisLimit build() { RedisClusterConnection clusterConnection = jedisConnectionFactory.getClusterConnection(); JedisCluster jedisCluster = (JedisCluster) clusterConnection.getNativeConnection(); RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build(); return redisLimit; } }
接著在 Controller 使用組件:
@Autowired private RedisLimit redisLimit ; @Override @CheckReqNo public BaseResponse<OrderNoResVO> getOrderNo(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); //限流 boolean limit = redisLimit.limit(); if (!limit){ res.setCode(StatusEnum.REQUEST_LIMIT.getCode()); res.setMessage(StatusEnum.REQUEST_LIMIT.getMessage()); return res ; } res.setReqNo(orderNoReq.getReqNo()); if (null == orderNoReq.getAppId()){ throw new SBCException(StatusEnum.FAIL); } OrderNoResVO orderNoRes = new OrderNoResVO() ; orderNoRes.setOrderId(DateUtil.getLongTime()); res.setCode(StatusEnum.SUCCESS.getCode()); res.setMessage(StatusEnum.SUCCESS.getMessage()); res.setDataBody(orderNoRes); return res ; }
為了方便使用,也提供了註解:
@Override @ControllerLimit public BaseResponse<OrderNoResVO> getOrderNoLimit(@RequestBody OrderNoReqVO orderNoReq) { BaseResponse<OrderNoResVO> res = new BaseResponse(); // 業務邏輯 return res ; }
該註解攔截了 http 請求,會再請求達到閾值時直接返回。
普通方法也可使用:
@CommonLimit public void doSomething(){}
會在調用達到閾值時拋出異常。
為了模擬並發,在 User 應用中開啟了 10 個線程調用 Order(限流次數為5) 接口(也可使用專業的並發測試工具 JMeter 等)。
@Override public BaseResponse<UserResVO> getUserByFeign(@RequestBody UserReqVO userReq) { //調用遠程服務 OrderNoReqVO vo = new OrderNoReqVO(); vo.setAppId(1L); vo.setReqNo(userReq.getReqNo()); for (int i = 0; i < 10; i++) { executorService.execute(new Worker(vo, orderServiceClient)); } UserRes userRes = new UserRes(); userRes.setUserId(123); userRes.setUserName("張三"); userRes.setReqNo(userReq.getReqNo()); userRes.setCode(StatusEnum.SUCCESS.getCode()); userRes.setMessage("成功"); return userRes; } private static class Worker implements Runnable { private OrderNoReqVO vo; private OrderServiceClient orderServiceClient; public Worker(OrderNoReqVO vo, OrderServiceClient orderServiceClient) { this.vo = vo; this.orderServiceClient = orderServiceClient; } @Override public void run() { BaseResponse<OrderNoResVO> orderNo = orderServiceClient.getOrderNoCommonLimit(vo); logger.info("遠程返回:" + JSON.toJSONString(orderNo)); } }
為了驗證分布式效果啟動了兩個 Order 應用。
效果如下:
實現原理
實現原理其實很簡單。既然要達到分布式全局限流的效果,那自然需要一個第三方組件來記錄請求的次數。
其中 Redis 就非常適合這樣的場景。
每次請求時將當前時間(精確到秒)作為 Key 寫入到 Redis 中,超時時間設置為 2 秒,Redis 將該 Key 的值進行自增。 當達到閾值時返回錯誤。 寫入 Redis 的操作用 Lua 腳本來完成,利用 Redis 的單線程機制可以保證每個 Redis 請求的原子性。Lua 腳本如下:
--lua 下標從 1 開始-- 限流 keylocal key = KEYS[1]-- 限流大小local limit = tonumber(ARGV[1])-- 獲取當前流量大小local curentLimit = tonumber(redis.call('get', key) or "0")if curentLimit + 1 > limit then -- 達到限流大小 返回 return 0;else -- 沒有達到閾值 value + 1 redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1end
Java 中的調用邏輯:
local key = KEYS[1] local limit = tonumber(ARGV[1]) local curentLimit = tonumber(redis.call('get', key) or "0") if curentLimit + 1 > limit then return 0; else redis.call("INCRBY", key, 1) redis.call("EXPIRE", key, 2) return curentLimit + 1 end
所以只需要在需要限流的地方調用該方法對返回值進行判斷即可達到限流的目的。
當然這只是利用 Redis 做了一個粗暴的計數器,如果想實現類似於上文中的令牌桶算法可以基於 Lua 自行實現。
Builder 構建器
在設計這個組件時想儘量的提供給使用者清晰、可讀性、不易出錯的 API。
比如***步,如何構建一個限流對象。
最常用的方式自然就是構造函數,如果有多個域則可以採用重疊構造器的方式:
public A(){} public A(int a){} public A(int a,int b){}
缺點也是顯而易見的:如果參數過多會導致難以閱讀,甚至如果參數類型一致的情況下客戶端顛倒了順序,但不會引起警告從而出現難以預測的結果。
第二種方案可以採用 JavaBean 模式,利用 setter 方法進行構建:
A a = new A(); a.setA(a); a.setB(b);
這種方式清晰易讀,但卻容易讓對象處於不一致的狀態,使對象處於線程不安全的狀態。
所以這裡採用了第三種創建對象的方式,構建器:
public class RedisLimit { private JedisCommands jedis; private int limit = 200; private static final int FAIL_CODE = 0; /** * lua script */ private String script; private RedisLimit(Builder builder) { this.limit = builder.limit ; this.jedis = builder.jedis ; buildScript(); } /** * limit traffic * @return if true */ public boolean limit() { String key = String.valueOf(System.currentTimeMillis() / 1000); Object result = null; if (jedis instanceof Jedis) { result = ((Jedis) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else if (jedis instanceof JedisCluster) { result = ((JedisCluster) this.jedis).eval(script, Collections.singletonList(key), Collections.singletonList(String.valueOf(limit))); } else { //throw new RuntimeException("instance is error") ; return false; } if (FAIL_CODE != (Long) result) { return true; } else { return false; } } /** * read lua script */ private void buildScript() { script = ScriptUtil.getScript("limit.lua"); } /** * the builder * @param <T> */ public static class Builder<T extends JedisCommands>{ private T jedis = null ; private int limit = 200; public Builder(T jedis){ this.jedis = jedis ; } public Builder limit(int limit){ this.limit = limit ; return this; } public RedisLimit build(){ return new RedisLimit(this) ; } } }
這樣客戶端在使用時:
RedisLimit redisLimit = new RedisLimit.Builder<>(jedisCluster) .limit(limit) .build();
更加的簡單直接,並且避免了將創建過程分成了多個子步驟。
這在有多個構造參數,但又不是必選欄位時很有作用。
因此順便將分布式鎖的構建器方式也一併更新了:
https://github.com/crossoverJie/distributed-redis-tool#features
API
從上文可以看出,使用過程就是調用 limit 方法。
//限流 boolean limit = redisLimit.limit(); if (!limit){ //具體限流邏輯 }
為了減少侵入性,也為了簡化客戶端提供了兩種註解方式。
@ControllerLimit
該註解可以作用於 @RequestMapping 修飾的接口中,並會在限流後提供限流響應。
實現如下:
@Component public class WebIntercept extends WebMvcConfigurerAdapter { private static Logger logger = LoggerFactory.getLogger(WebIntercept.class); @Autowired private RedisLimit redisLimit; @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new CustomInterceptor()) .addPathPatterns("/**"); } private class CustomInterceptor extends HandlerInterceptorAdapter { @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } if (handler instanceof HandlerMethod) { HandlerMethod method = (HandlerMethod) handler; ControllerLimit annotation = method.getMethodAnnotation(ControllerLimit.class); if (annotation == null) { //skip return true; } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); response.sendError(500, "request limit"); return false; } } return true; } } }
其實就是實現了 SpringMVC 中的攔截器,並在攔截過程中判斷是否有使用註解,從而調用限流邏輯。
前提是應用需要掃描到該類,讓 Spring 進行管理。
@ComponentScan(value = "com.crossoverjie.distributed.intercept")
@CommonLimit
當然也可以在普通方法中使用。實現原理則是 Spring AOP (SpringMVC 的攔截器本質也是 AOP)。
@Aspect @Component @EnableAspectJAutoProxy(proxyTargetClass = true) public class CommonAspect { private static Logger logger = LoggerFactory.getLogger(CommonAspect.class); @Autowired private RedisLimit redisLimit ; @Pointcut("@annotation(com.crossoverjie.distributed.annotation.CommonLimit)") private void check(){} @Before("check()") public void before(JoinPoint joinPoint) throws Exception { if (redisLimit == null) { throw new NullPointerException("redisLimit is null"); } boolean limit = redisLimit.limit(); if (!limit) { logger.warn("request has bean limit"); throw new RuntimeException("request has bean limit") ; } } }
很簡單,也是在攔截過程中調用限流。
當然使用時也得掃描到該包:
@ComponentScan(value = "com.crossoverjie.distributed.intercept")
總結
限流在一個高並發大流量的系統中是保護應用的一個利器,成熟的方案也很多,希望對剛了解這一塊的朋友提供一些思路。
【編輯推薦】
【責任編輯:
武曉燕TEL:(010)68476606】