sponsored links

Spring Cloud Gateway 自定义限流

在使用Spring Cloud Gateway限流功能时官网提供的限流中的流速以及桶容量是针对所有策略的,意思是只要配置上那么所有的都是一样的,不能根据不同的类型配置不同的参数,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那么不管是A渠道还是B渠道都是这个值,如果修改那么对应的其他渠道也会修改,如何能做到分为不同渠道进行限流呢,A渠道replenishRate:10,burstCapacity:100,B渠道:replenishRate:20,burstCapacity:1000,下面开始分析:
限流方式采用的redis,底层使用的redis的lua脚本实现的,具体可以自行查阅,不做讲解,默认限流类:RedisRateLimiter,本文也是仿照这个进行重写的。
本文是参考“Spring Cloud Gateway 结合配置中心限流”进行简化改造,通过配置的方式进行实现。

引入依赖

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

自定义限流类

参照RedisRateLimiter进行自定义限流类SystemRedisRateLimiter用于渠道限流方式,实现代码如下:

/**
 * @author : Erick
 * @version : 1.0
 * @Description :
 * @time :2018-12-1
 */
public class SystemRedisRateLimiter extends AbstractRateLimiter<SystemRedisRateLimiter.Config> implements ApplicationContextAware {
    //这些变量全部从RedisRateLimiter复制的,都会用到。
    public static final String REPLENISH_RATE_KEY = "replenishRate";

    public static final String BURST_CAPACITY_KEY = "burstCapacity";

    public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter";
    public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
    public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
    public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
    public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

    //处理速度
    private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
    //容量
    private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";

    private ReactiveRedisTemplate<String, String> redisTemplate;
    private RedisScript<List<Long>> script;
    private AtomicBoolean initialized = new AtomicBoolean(false);

    private String remainingHeader = REMAINING_HEADER;

    /** The name of the header that returns the replenish rate configuration. */
    private String replenishRateHeader = REPLENISH_RATE_HEADER;

    /** The name of the header that returns the burst capacity configuration. */
    private String burstCapacityHeader = BURST_CAPACITY_HEADER;

    private Config defaultConfig;

    public SystemRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
                                  RedisScript<List<Long>> script, Validator validator) {
        super(Config.class , CONFIGURATION_PROPERTY_NAME , validator);
        this.redisTemplate = redisTemplate;
        this.script = script;
        initialized.compareAndSet(false,true);
    }

    public SystemRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){
        super(Config.class , CONFIGURATION_PROPERTY_NAME , null);
        defaultConfig = new Config()
                .setReplenishRate(defaultReplenishRate)
                .setBurstCapacity(defaultBurstCapacity);

    }
    //具体限流实现,此处调用的是lua脚本
    @Override
    public Mono<RateLimiter.Response> isAllowed(String routeId, String id) {
        if (!this.initialized.get()) {
            throw new IllegalStateException("RedisRateLimiter is not initialized");
        }
        if (ObjectUtils.isEmpty(rateLimiterConf) ){
            throw new IllegalArgumentException("No Configuration found for route " + routeId);
        }
        //获取的是自定义的map
        Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap();
        //缓存的key
        String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY;
        //若map中不存在则采用默认值,存在则取值。
        int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey);
        //容量key
        String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY;
        //若map中不存在则采用默认值,存在则取值。
        int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey);

        try {
            List<String> keys = getKeys(id);

            List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
                    Instant.now().getEpochSecond() + "", "1");
            Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);

            return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
                    .reduce(new ArrayList<Long>(), (longs, l) -> {
                        longs.addAll(l);
                        return longs;
                    }) .map(results -> {
                        boolean allowed = results.get(0) == 1L;
                        Long tokensLeft = results.get(1);

                        RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft));

                        return response;
                    });
        } catch (Exception e) {
            e.printStackTrace();
        }

        return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L)));
    }

    private RateLimiterConf rateLimiterConf;
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.rateLimiterConf = applicationContext.getBean(RateLimiterConf.class);
    }

    public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) {
        HashMap<String, String> headers = new HashMap<>();
        headers.put(this.remainingHeader, tokensLeft.toString());
        headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
        headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
        return headers;
    }

    static List<String> getKeys(String id) {
        // use `{}` around keys to use Redis Key hash tags
        // this allows for using redis cluster

        // Make a unique key per user.
        //此处可以自定义redis前缀信息
        String prefix = "request_sys_rate_limiter.{" + id;

        // You need two Redis keys for Token Bucket.
        String tokenKey = prefix + "}.tokens";
        String timestampKey = prefix + "}.timestamp";
        return Arrays.asList(tokenKey, timestampKey);
    }

    @Validated
    public static class Config{
        @Min(1)
        private int replenishRate;
        @Min(1)
        private int burstCapacity = 1;

        public int getReplenishRate() {
            return replenishRate;
        }

        public Config setReplenishRate(int replenishRate) {
            this.replenishRate = replenishRate;
            return this;
        }

        public int getBurstCapacity() {
            return burstCapacity;
        }

        public Config setBurstCapacity(int burstCapacity) {
            this.burstCapacity = burstCapacity;
            return this;
        }

        @Override
        public String toString() {
            return "Config{" +
                    "replenishRate=" + replenishRate +
                    ", burstCapacity=" + burstCapacity +
                    '}';
        }
    }
}

在继承AbstractRateLimiter泛型使用的是自定义类中的config:SystemRedisRateLimiter.Config

配置类

配置类主要用于初始化map参数。

/**
 * @author : Erick
 * @version : 1.0
 * @Description :
 * @time :2018-12-1
 */
@Component
//使用配置文件的方式进行初始化
@ConfigurationProperties(prefix = "ratelimiter-conf")
public class RateLimiterConf {
    //处理速度
    private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
    //容量
    private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";

    private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){
        {
            put(DEFAULT_REPLENISHRATE , 10);
            put(DEFAULT_BURSTCAPACITY , 100);
        }
    };

    public Map<String, Integer> getRateLimitMap() {
        return rateLimitMap;
    }

    public void setRateLimitMap(Map<String, Integer> rateLimitMap) {
        this.rateLimitMap = rateLimitMap;
    }
}

配置文件主要采用配置文件的方式进行初始化,若配置则进行添加,若没有配置则采用默认值。

配置文件

主要用于配置各个渠道的限流阀值,例如文章开头举例的A渠道和B渠道,配置如下:

//与配置类RateLimiterConf保持一致
ratelimiter-conf:
  #配置限流参数与RateLimiterConf类映射
  rateLimitMap:
    #格式为:routeid(gateway配置routes时指定的).系统名称.replenishRate(流速)/burstCapacity令牌桶大小
    service.A.replenishRate: 10
    service.A.burstCapacity: 100
    service.B.replenishRate: 20
    service.B.burstCapacity: 1000

到此配置限流相关的代码已经完成,需要在启动类和bootstrap.yml中进行配置才能够真正的使用。

启动类中声名

在启动类中声明使用的策略,指定自定义限流类。

@Bean
 KeyResolver sysKeyResolver(){
      //从请求地址中截取sys值,进行限流。
     return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("sys"));
 }

@Bean
@Primary
//使用自己定义的限流类
SystemRedisRateLimiter systemRedisRateLimiter(
        ReactiveRedisTemplate<String, String> redisTemplate,
        @Qualifier(SystemRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
        Validator validator){
    return new SystemRedisRateLimiter(redisTemplate , script , validator);
}

采用的是从请求地址中获取sys参数对应的值,当然也可以设置为其他值,再声名自定义的限流类。

配置限流

spring:
  cloud:
    gateway:
      routes:
        - id: service //id名称需要与配置限流的保持一致
          uri: lb://serviceA
          predicates:
            - Path=/service/**/ //最后的/可以去掉,在本文中特意添加的如果去掉会把filters当成注释内容。
          filters:
            - name: RequestRateLimiter
              args:
                //需要与上边的方法名保持一致
                rate-limiter: "#{@systemRedisRateLimiter}"
                //需要与策略类的方法名保持一致。
                key-resolver: "#{@sysKeyResolver}"

至此自定义限流代码全部完成,如有什么不妥支持请指正,同时也希望对其他人有帮助。实例代码已上传到码云可以点击查看。

Spring Cloud Gateway 自定义限流

Tags: