接口流量控制
背景
公有云的服务通常是将私有云的服务进行包装,并对外提供服务的,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统,需要对请求流量进行限速。
漏斗算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
- 优点
可以让流量匀速通过,实现简单
- 缺点
流量始终匀速输出,对于突发特性的流量支持地不好
- 实现
用一个队列即可搞定,消费者线程匀速取出
令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
- 优点
可以应对突发性的流量
- 缺点
实现起来不是很容易
- 实现
下面详细
通过redis实现令牌桶算法进行流量控制
流量控制项目
- 单个Ip访问速度限制
规则: reqs / seconds, 例如: 300 / 60, 表示每分钟最多允许300个请求,也就是平均每秒钟5个请求,但是我们可以允许流量的抖动,允许每5秒内有100个请求,这时,我们可以这样设定: 100 / 5, 两个规则加在一起就能满足两个要求了。
具体流程:
一个请求过来,对于每个规则构造key:
key = GLOBAL_PREFIX_FOR_REDIS + PREFIX_RATE_LIMIT + KEY_SPLIT_FLAG + request.getClientIp() + i;
判断redis中key对应的列表长度, 如果列表长度小于限制,则通过; 如果大于等于限制,首先判断最早加入列表的元素(time)时间和当前时间差是否大于perSecond, 如果是,则将最早的时间元素从redis中移除,并将当前时间元素加入redis,允许请求通过,而且标记上后续清理过期的时间项目。如果不允许通过,则抛出异常
// 统计是否封禁
for (int i = 0; maxRateLimitPerIps != null && i < maxRateLimitPerIps.size(); i++) {
String maxRateLimitPerIp = maxRateLimitPerIps.get(i);
String[] rateLimitInArray = maxRateLimitPerIp.split("/");
int maxLimit = Integer.parseInt(rateLimitInArray[RATE_MAX_LIMITED_INDEX]);
if (DEFAULT_LIMIT_VALUE.equals(rateLimitInArray[RATE_MAX_LIMITED_INDEX])) {
break;
}
int perSecond = Integer.parseInt(rateLimitInArray[RATE_LIMITED_STAT_SECOND_INDEX]);
ForbiddenConfigVO forbiddenConfigVO = RateLimitUtil.getForbiddenConfig(rateLimitInArray, nowTime);
checkLimit(RateLimitUtil.getIpKey(request) + i, maxLimit, perSecond, nowTime, forbiddenConfigVO);
}
public static String getIpKey(RequestEntity request) {
return new StringBuffer(GLOBAL_PREFIX_FOR_REDIS).append(PREFIX_RATE_LIMIT).append(KEY_SPLIT_FLAG).append(request.getClientIp()).append(KEY_SPLIT_FLAG)
.toString();
}
public static RateLimitStatResult statRateLimitList(Jedis jedis,
String listKey, Long nowTime, int perSecond, int maxLimit) {
StopWatch sw = new StopWatch("statRateLimitList_begin_cost:"); // 增加时间监控日志
sw.start("statRateLimitList_jedis.lrange_cost:");
RateLimitStatResult rateLimitStatResult = new RateLimitStatResult();
List<String> rateList = jedis.lrange(listKey, 0, -1);
sw.stop();
sw.start("statRateLimitList_deal_list_cost:");
if (rateList == null || rateList.isEmpty()) {
jedis.rpush(listKey, nowTime.toString());
setListExpireTime(listKey, perSecond, jedis);
LOGGER.debug("new list, key is " + listKey);
sw.stop();
LOGGER.debug(sw.prettyPrint());
return rateLimitStatResult;
}
// 统计的窗口开启时间
long beginTime = nowTime - perSecond * 1000;
setListExpireTime(listKey, perSecond, jedis);
if (rateList.size() < maxLimit) {
jedis.rpush(listKey, nowTime.toString());
LOGGER.debug("rateList.size(): " + rateList.size() + ",maxLimit" + maxLimit + ",now is:" + nowTime);
if (rateList.size() > maxLimit / 2) {
// 存入queue,清理过期数据
rateLimitStatResult.setClearList(true);
}
sw.stop();
LOGGER.debug(sw.prettyPrint());
return rateLimitStatResult;
}
if (beginTime > RateLimitUtil.getTime(rateList.get(0))) {
LOGGER.debug("not new list and compare first node result is delete, key is " + listKey);
jedis.lrem(listKey, 0, rateList.get(0));
jedis.rpush(listKey, nowTime.toString());
rateLimitStatResult.setClearList(true);
sw.stop();
LOGGER.debug(sw.prettyPrint());
// 存入queue,清理过期数据
return rateLimitStatResult;
}
LOGGER.debug("not new list and OverLimited, key is " + listKey);
rateLimitStatResult.setOverLimited(true);
sw.stop();
LOGGER.debug(sw.prettyPrint());
return rateLimitStatResult;
}
每个接口(/uri级别)限速
达到限速阈值后,是否封禁,这个可以在配置里配置 封禁的实现是可以通过map来实现的,value为封禁截止时间
黑白名单
按账户级别进行限速
实现细节探讨
- redis分布式锁怎么防止由于客户端奔溃而死锁
设置过期时间,对于高并发下,过期时间可能设置过短,对于目前业务来说,这种误差是可以接受的,如果不允许这种误差,要想更好的方案(暂时没想出来)
- redis锁的获取
构造一个key作为lock key, 然后调用setnx, 如果返回不是1,则认为是已被锁住,需要等待,sleep(1)