接口流量控制
背景
公有云的服务通常是将私有云的服务进行包装,并对外提供服务的,由于业务应用系统的负载能力有限,为了防止非预期的请求对系统压力过大而拖垮业务应用系统,需要对请求流量进行限速。
漏斗算法
漏桶(Leaky Bucket)算法思路很简单,水(请求)先进入到漏桶里,漏桶以一定的速度出水(接口有响应速率),当水流入速度过大会直接溢出(访问频率超过接口响应速率),然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。
- 优点
可以让流量匀速通过,实现简单
- 缺点
流量始终匀速输出,对于突发特性的流量支持地不好
- 实现
用一个队列即可搞定,消费者线程匀速取出
令牌桶算法
令牌桶算法(Token Bucket)和 Leaky Bucket 效果一样但方向相反的算法,更加容易理解.随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入Token(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了.新请求来临时,会各自拿走一个Token,如果没有Token可拿了就阻塞或者拒绝服务.
- 优点
可以应对突发性的流量
- 缺点
实现起来不是很容易
- 实现
下面详细
通过redis实现令牌桶算法进行流量控制
流量控制项目
- 单个Ip访问速度限制
规则: reqs / seconds, 例如: 300 / 60, 表示每分钟最多允许300个请求,也就是平均每秒钟5个请求,但是我们可以允许流量的抖动,允许每5秒内有100个请求,这时,我们可以这样设定: 100 / 5, 两个规则加在一起就能满足两个要求了。
具体流程:
一个请求过来,对于每个规则构造key:
  key = GLOBAL_PREFIX_FOR_REDIS + PREFIX_RATE_LIMIT + KEY_SPLIT_FLAG + request.getClientIp() + i;
 判断redis中key对应的列表长度, 如果列表长度小于限制,则通过; 如果大于等于限制,首先判断最早加入列表的元素(time)时间和当前时间差是否大于perSecond, 如果是,则将最早的时间元素从redis中移除,并将当前时间元素加入redis,允许请求通过,而且标记上后续清理过期的时间项目。如果不允许通过,则抛出异常
        // 统计是否封禁
        for (int i = 0; maxRateLimitPerIps != null && i < maxRateLimitPerIps.size(); i++) {
            String maxRateLimitPerIp = maxRateLimitPerIps.get(i);
            String[] rateLimitInArray = maxRateLimitPerIp.split("/");
            int maxLimit = Integer.parseInt(rateLimitInArray[RATE_MAX_LIMITED_INDEX]);
            if (DEFAULT_LIMIT_VALUE.equals(rateLimitInArray[RATE_MAX_LIMITED_INDEX])) {
                break;
            }
            int perSecond = Integer.parseInt(rateLimitInArray[RATE_LIMITED_STAT_SECOND_INDEX]);
            ForbiddenConfigVO forbiddenConfigVO = RateLimitUtil.getForbiddenConfig(rateLimitInArray, nowTime);
            checkLimit(RateLimitUtil.getIpKey(request) + i, maxLimit, perSecond, nowTime, forbiddenConfigVO);
        }
    public static String getIpKey(RequestEntity request) {
        return new StringBuffer(GLOBAL_PREFIX_FOR_REDIS).append(PREFIX_RATE_LIMIT).append(KEY_SPLIT_FLAG).append(request.getClientIp()).append(KEY_SPLIT_FLAG)
                .toString();
    }
    public static RateLimitStatResult statRateLimitList(Jedis jedis,
                                                        String listKey, Long nowTime, int perSecond, int maxLimit) {
        StopWatch sw = new StopWatch("statRateLimitList_begin_cost:"); // 增加时间监控日志
        sw.start("statRateLimitList_jedis.lrange_cost:");
        RateLimitStatResult rateLimitStatResult = new RateLimitStatResult();
        List<String> rateList = jedis.lrange(listKey, 0, -1);
        sw.stop();
        sw.start("statRateLimitList_deal_list_cost:");
        if (rateList == null || rateList.isEmpty()) {
            jedis.rpush(listKey, nowTime.toString());
            setListExpireTime(listKey, perSecond, jedis);
            LOGGER.debug("new list, key is " + listKey);
            sw.stop();
            LOGGER.debug(sw.prettyPrint());
            return rateLimitStatResult;
        }
        // 统计的窗口开启时间
        long beginTime = nowTime - perSecond * 1000;
        setListExpireTime(listKey, perSecond, jedis);
        if (rateList.size() < maxLimit) {
            jedis.rpush(listKey, nowTime.toString());
            LOGGER.debug("rateList.size(): " + rateList.size() + ",maxLimit" + maxLimit + ",now is:" + nowTime);
            if (rateList.size() > maxLimit / 2) {
                // 存入queue,清理过期数据
                rateLimitStatResult.setClearList(true);
            }
            sw.stop();
            LOGGER.debug(sw.prettyPrint());
            return rateLimitStatResult;
        }
        if (beginTime > RateLimitUtil.getTime(rateList.get(0))) {
            LOGGER.debug("not new list and compare first node result is delete, key is " + listKey);
            jedis.lrem(listKey, 0, rateList.get(0));
            jedis.rpush(listKey, nowTime.toString());
            rateLimitStatResult.setClearList(true);
            sw.stop();
            LOGGER.debug(sw.prettyPrint());
            // 存入queue,清理过期数据
            return rateLimitStatResult;
        }
        LOGGER.debug("not new list and OverLimited, key is " + listKey);
        rateLimitStatResult.setOverLimited(true);
        sw.stop();
        LOGGER.debug(sw.prettyPrint());
        return rateLimitStatResult;
    }
- 每个接口(/uri级别)限速 
- 达到限速阈值后,是否封禁,这个可以在配置里配置 封禁的实现是可以通过map来实现的,value为封禁截止时间 
- 黑白名单 
- 按账户级别进行限速 
实现细节探讨
- redis分布式锁怎么防止由于客户端奔溃而死锁
设置过期时间,对于高并发下,过期时间可能设置过短,对于目前业务来说,这种误差是可以接受的,如果不允许这种误差,要想更好的方案(暂时没想出来)
- redis锁的获取
构造一个key作为lock key, 然后调用setnx, 如果返回不是1,则认为是已被锁住,需要等待,sleep(1)
