资讯专栏INFORMATION COLUMN

RateLimter源码解析

RaoMeng / 1241人阅读

摘要:允许突发流量的平滑限流器的实现。实例执行结果方法返回结果代表获取所等待的时间。先看下示例代码运行效果为了预防突然暴增的流量将系统压垮,很贴心的增加了预热。

RateLimiter 类图

RateLimiter:作为抽象类提供一个限流器的基本的抽象方法。
SmoothRateLimiter:平滑限流器实现,提供了Ratelimiter中的抽象限流方法的平滑实现。
SmoothBursty:允许突发流量的平滑限流器的实现。
SmoothWarmingUp:平滑预热限流器的实现。

实例
public void test() throws InterruptedException {
        RateLimiter rateLimiter = RateLimiter.create(2);

        while (true){
            System.out.println(rateLimiter.acquire(2));
            TimeUnit.SECONDS.sleep(2);
            System.out.println(rateLimiter.acquire(1));
            System.out.println(rateLimiter.acquire(1));
            System.out.println(rateLimiter.acquire(10));
        }
    }

执行结果

acquire方法返回结果代表获取token所等待的时间。

第一行0等待,刚创建限流器,还没来得及放任何token,此处存储的token=0,但是无欠款所以预消费2个;
sleep 2秒,按照每秒2个的速度,先“还”了欠款,然后token直接恢复至max = 2;
第二行0,现有2个token,用一个,无需等待。
第三行0,现有1个token,用一个,无需等待。
第四行0,现有0个token,无欠款,直接借10个。
第五行4.999868,帮上一个还欠款,等待5秒直到还完欠款后,又借了2个。
重复第一行......

在使用RateLimiter.create(double)方法初始化限流器时,实际上默认使用的是SmoothBursty

public static RateLimiter create(double permitsPerSecond) {
    return create(permitsPerSecond, SleepingStopwatch.createFromSystemTimer());
  }

static RateLimiter create(double permitsPerSecond, SleepingStopwatch stopwatch) {
    RateLimiter rateLimiter = new SmoothBursty(stopwatch, 1.0 /* maxBurstSeconds */);
    rateLimiter.setRate(permitsPerSecond);
    return rateLimiter;
  }
SmoothBursty
/** The currently stored permits.
    当前存储的令牌数
 */
double storedPermits;

/**
 * The maximum number of stored permits.
 * 最大存储令牌数
 */
double maxPermits;

/**
 * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
 * per second has a stable interval of 200ms.
 * 添加令牌时间间隔
 */
double stableIntervalMicros;

/**
 * The time when the next request (no matter its size) will be granted. After granting a request,
 * this is pushed further in the future. Large requests push this further than small requests.
 * 下一次请求可以获取令牌的起始时间
 * 由于RateLimiter允许预消费,上次请求预消费令牌后
 * 下次请求需要等待相应的时间到nextFreeTicketMicros时刻才可以获取令牌
 */
private long nextFreeTicketMicros = 0L; // could be either in the past or future

从acquire()函数开始

public double acquire() {
    return acquire(1);//默认取一个令牌
  }
public double acquire(int permits) {
    long microsToWait = reserve(permits);//从限流器中获取指定的令牌,并返回需要等待的时间
    stopwatch.sleepMicrosUninterruptibly(microsToWait);//让“闹钟”将当前线程停止睡眠指定时间
    return 1.0 * microsToWait / SECONDS.toMicros(1L);//返回等待的时间,单位是秒
  }
final long reserve(int permits) {
    checkPermits(permits);//参数校验
    synchronized (mutex()) {//获取锁,多个请求到达,需要串行的获取
      return reserveAndGetWaitLength(permits, stopwatch.readMicros());
    }
  }    

先来看下加锁的逻辑

private volatile Object mutexDoNotUseDirectly;

  private Object mutex() {
    Object mutex = mutexDoNotUseDirectly;
    if (mutex == null) {
      synchronized (this) {
        mutex = mutexDoNotUseDirectly;
        if (mutex == null) {
          mutexDoNotUseDirectly = mutex = new Object();
        }
      }
    }
    return mutex;
  }

典型的双重检查单例
接着继续探索获取令牌的逻辑代码

final long reserveAndGetWaitLength(int permits, long nowMicros) {
    long momentAvailable = reserveEarliestAvailable(permits, nowMicros);//获取token并返回下个请求可以来获取token的时间
    return max(momentAvailable - nowMicros, 0);//计算等待时间
  }

关键函数一:
abstract long reserveEarliestAvailable(int permits, long nowMicros);
SmoothRateLimiter实现了它,是获取token、消耗token的主流程

final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
    resync(nowMicros);
    long returnValue = nextFreeTicketMicros;
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
    double freshPermits = requiredPermits - storedPermitsToSpend;
    long waitMicros =
 **加粗文字**       storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
    this.storedPermits -= storedPermitsToSpend;
    System.out.println(String.format("storedPermitsToSpend=%s,freshPermits=%s,waitMicros=%s,storedPermits=%s", storedPermitsToSpend, freshPermits, waitMicros, storedPermits));
    return returnValue;
  }

更新令牌桶中的token。

计算下次能获得令牌的时间

扣除本次所需令牌

storedPermitsToSpend代表需要从storedPermits扣除的token,如果storedPermits已经=0了,那么不会扣除到负数
waitMicros代表此次预消费的令牌需要多少时间来恢复,最终将它加到nextFreeTicketMicros**
那么SmoothBursty是怎么实现预消费的呢?,让我们先看下更新token的逻辑,即void resync(long nowMicros)

void resync(long nowMicros) {
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }

更新流程

如果当前时间大于freeTime,则进入更新操作。

将时间差除以令牌恢复间隔,计算出得到恢复的令牌个数

更新令牌桶令牌的存储数量和freeTime

SmoothBursty是怎么实现预消费的呢?
其实,只要保证一点就可以进行预消费,即无欠款,无欠款就代表当前时间大于等于nextFreeTime,SmoothBursty就是依靠此原理来处理突发的流量。

SmoothWarmingUp

先看下示例代码

public void test_warmingUp(){
        RateLimiter rateLimiter = RateLimiter.create(2, 4, TimeUnit.SECONDS);

        while (true){
            System.out.println(rateLimiter.acquire(1));
            System.out.println(rateLimiter.acquire(1));
            System.out.println(rateLimiter.acquire(1));
            System.out.println(rateLimiter.acquire(1));
        }
    }

运行效果

0.0
1.372264
1.117788
0.869905
0.620505
0.496059
0.495301
0.496027
0.495794

SmoothWarmingUp为了预防突然暴增的流量将系统压垮,很贴心的增加了“预热”。指定的warmupPeriod就是预热时间,在“冷状态”即没有流量进入时,放入每个token的时间不仅仅是1/permitsPerSecond,还要加上一个预热时间,类注释上的图作了很好的解释。

SmoothWarmingUp在初始阶段与SmoothBursty有点不同,SmoothWarmingUp初始storePermits = maxPermits。一直使用permits直至storePermits减少到thresholdPermits(setRate调用时计算)放入token的时间便稳定下来,到达了“热状态”,此时与SmoothBursty是一模一样。但是如果在warmupPeriod时间内没有流量进入,则再次会进入“冷状态“。

在实现上SmoothWarmingUp与SmoothBursty基本相同,唯一不同仅仅只有两个函数的实现上

coolDownIntervalMicros()返回一个token的冷却时间,SmoothWarmingUp注释中有介绍,为了保证在warmUpPeriod时间刚好可以恢复maxPermits个token,因此SmoothWarmingUp此函数返回的是warmupPeriodMicros / maxPermits

storedPermitsToWaitTime(double storedPermits, double permitsToTake)返回消耗permitsToTake个token所需要等待的时间,SmoothBursty则是直接返回0.

SmoothWarmingUp的注释解释的很到位,在预热限流器中,计算token的等待时间就可以转化计算图中的面积,大家可以顺着注释推导一下。

总结

SmoothBursty:初始token为0,允许预消费,放入token的时间固定为1/permitsPerSecond.(一开始直接上)
SmoothWarmingUp:初始token为MaxPermits,允许预消费,可以指定预热时间,在与预热时间过后速率恢复平稳与SmoothBursty一致。(老司机有前戏)

SmoothWarmingUp像了改良版的SmoothBursty,有个预热时间,系统能更加从容的应付流量的来袭,因此一般可以优先使用SmoothWarmingUp。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76815.html

相关文章

  • Flink Metrics 源码解析

    摘要:有如下模块源码解析源码解析源码解析源码解析源码解析源码解析源码解析源码解析源码解析使用和监控和博客从到学习介绍从到学习上搭建环境并构建运行简单程序入门从到学习配置文件详解从到学习介绍从到学习如何自 Flink Metrics 有如下模块: Flink Metrics 源码解析 —— Flink-metrics-core Flink Metrics 源码解析 —— Flink-metr...

    sshe 评论0 收藏0
  • Flink 源码解析 —— 深度解析 Flink Checkpoint 机制

    摘要:机制博客从到学习介绍从到学习上搭建环境并构建运行简单程序入门从到学习配置文件详解从到学习介绍从到学习如何自定义从到学习介绍从到学习如何自定义从到学习转换从到学习介绍中的从到学习中的几种详解从到学习读取数据写入到从到学习项目如何运行从 Flink Checkpoint 机制 https://t.zsxq.com/ynQNbeM 博客 1、Flink 从0到1学习 —— Apache Fl...

    0x584a 评论0 收藏0
  • Flink 源码解析 —— 深度解析 Flink 序列化机制

    摘要:序列化机制博客从到学习介绍从到学习上搭建环境并构建运行简单程序入门从到学习配置文件详解从到学习介绍从到学习如何自定义从到学习介绍从到学习如何自定义从到学习转换从到学习介绍中的从到学习中的几种详解从到学习读取数据写入到从到学习项目如何 Flink 序列化机制 https://t.zsxq.com/JaQfeMf 博客 1、Flink 从0到1学习 —— Apache Flink 介绍 2...

    y1chuan 评论0 收藏0
  • Flink Clients 源码解析

    摘要:模块中的类结构如下博客从到学习介绍从到学习上搭建环境并构建运行简单程序入门从到学习配置文件详解从到学习介绍从到学习如何自定义从到学习介绍从到学习如何自定义从到学习转换从到学习介绍中的从到学习中的几种详解从到学习读取数据写入到从到学 Flink-Client 模块中的类结构如下: https://t.zsxq.com/IMzNZjY showImg(https://segmentfau...

    xiao7cn 评论0 收藏0
  • Flink Annotations 源码解析

    摘要:模块中的类结构如下博客从到学习介绍从到学习上搭建环境并构建运行简单程序入门从到学习配置文件详解从到学习介绍从到学习如何自定义从到学习介绍从到学习如何自定义从到学习转换从到学习介绍中的从到学习中的几种详解从到学习读取数据写入到从到学 Flink-Annotations 模块中的类结构如下: https://t.zsxq.com/f6eAu3J showImg(https://segme...

    Eirunye 评论0 收藏0

发表评论

0条评论

RaoMeng

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<