资讯专栏INFORMATION COLUMN

【源起Netty 外传】ScheduledThreadPoolExecutor源码解读

Eastboat / 1275人阅读

引言

本文是源起netty专栏的第4篇文章,很明显前3篇文章已经在偏离主题的道路上越来越远。于是乎,我决定:继续保持……

使用

首先看看源码类注释中的示例(未改变官方示例逻辑,只是增加了print输出和注释)

import java.time.LocalTime;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

public class ScheduleExecutorServiceDemo {
    private final static ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(5);

    public static void main(String args[]){
        final Runnable beeper = new Runnable() {
            public void run() {
                System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
                //TODO 沉睡吧,少年
                //try {
                //    TimeUnit.SECONDS.sleep(3L);
                //} catch (InterruptedException e) {
                //    e.printStackTrace();
                //}
            }
        };

        //从0s开始输出beep,间隔1s
        final ScheduledFuture beeperHandle =
                scheduler.scheduleAtFixedRate(beeper, 0, 1, TimeUnit.SECONDS);

        //10s之后停止beeperHandle的疯狂输出行为
        scheduler.schedule(new Runnable() {
            public void run() {
                System.out.println("觉悟吧,beeperHandle!I will kill you!");
                beeperHandle.cancel(true);
            }
        }, 10, TimeUnit.SECONDS);
    }
}

scheduleAtFixedRate也是该类常用的打开方式之一,网上很多文章会拿该方法与scheduleWithFixedDelay进行对比,对比结果其实和方法名一致:

scheduleAtFixedRate    //以固定频率执行
scheduleWithFixedDelay    //延迟方式执行,间隔时间=间隔时间入参+任务执行时间

ScheduleExecutorService实则是Timer的进化版,主要改进了Timer单线程方面的弊端,改进方式自然是线程池,ScheduleExecutorService的好基友ScheduledThreadPoolExecutor华丽丽登场。其实ScheduledThreadPoolExecutor才是主角,ScheduleExecutorService扮演的是抛砖引玉中的砖……

先看下ScheduledThreadPoolExecutor类的江湖地位:

既然继承自ThreadPoolExecutor,确乃线程池无疑。

疑问

本文以如下方法作为切入点:
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit)

方法入参period(译:周期)就是scheduleAtFixedRate所指的固定频率吗?
这个问题很好验证,把示例中这部分代码的注释去掉就能得到答案。

final Runnable beeper = new Runnable() {
    public void run() {
        System.out.println(Thread.currentThread().getName()+" >>> "+LocalTime.now().toString()+" >>> beep");
                
        //TODO 沉睡吧,少年
        //try {
        //    TimeUnit.SECONDS.sleep(3L);
        //} catch (InterruptedException e) {
        //    e.printStackTrace();
        //}
    }
};

答案就是,如果方法执行时间大于间隔周期period,则任务的下次执行时间将超过period的设定!

执行结果如下,可以看出任务间隔为3s,而不是period设置的1s

不禁好奇,ScheduleExecutorService是怎么实现的多长时间之后执行下一个任务?有句话叫源码之下无秘密,so..let"s do this !

源码分析 1.初始化

从ScheduleExecutorService的初始化开始:

private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

追随调用链Executors.newScheduledThreadPool -> new ScheduledThreadPoolExecutor(corePoolSize),进入如下方法:

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());  //注意最后一个参数
}

线程池中的任务队列用的new DelayedWorkQueue(),而DelayedWorkQueue是ScheduledThreadPoolExecutor的内部类
初始化部分关注到这一点即可,之后会是一些成员变量的赋值,不作解释。

2.任务封装

接下来从scheduleAtFixedRate方法开始,进入它的实现方法:

public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask sft = new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

Runnable command被封装成了ScheduledFutureTask类,无独有偶,ScheduledFutureTask是ScheduledThreadPoolExecutor的另外一个内部类。看下它的类关系图:

有没有发现ScheduledFutureTask实现了Comparable接口?众所周知这个接口是以某种规则用来比较大小的,这里的规则就是任务的开始执行时间——ScheduledFutureTask的一个属性:

/** The time the task is enabled to execute in nanoTime units */
private long time;

compareTo方法就是明证:

public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask x = (ScheduledFutureTask)other;
        long diff = time - x.time;    //focus这里啊喂!!!
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

一般来说,这些比较(compare)放在集合中才有意义,那ScheduledFutureTask之后会放在哪个集合中吗?有些朋友可能已经猜到了,没错,ScheduledFutureTask后续会置于前文提到的DelayedWorkQueue中。

3.延时执行

继续ScheduledThreadPoolExecutor.scheduleAtFixedRate方法:

ScheduledFutureTask sft = new ScheduledFutureTask(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
RunnableScheduledFuture t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);    //醒醒,该你出场了

进入delayedExecute方法:

private void delayedExecute(RunnableScheduledFuture task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //代码一 - 任务加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //代码二 - 任务开始
    }
}

追踪 代码一 位置的调用链:
-> DelayedWorkQueue.add -> offer -> siftUp(int k, RunnableScheduledFuture key)

private void siftUp(int k, RunnableScheduledFuture key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

可以看到,siftUp方法实现了向DelayedWorkQueue添加任务时(add),开始时间靠后的任务(ScheduledFutureTask)会放在后面

ok,回到 代码二 位置的ensurePrestart方法,接着追:
ensurePrestart -> addWorker(Runnable firstTask, boolean core)

浓缩版addWorker方法如下:

private boolean addWorker(Runnable firstTask, boolean core){
    ...    //省略很多的验证逻辑

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try{
        w = new Worker(firstTask);    //代码三 - 封装成worker,new Worker会从线程池中获取线程
        final Thread t = w.thread;
        if (t != null){
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            
            ...    //省略部分状态控制逻辑
            
            if (workerAdded){
                t.start();    //代码四 - 执行Worker的run方法
                workerStarted = true;
            }
        }
    }finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

这里发现firstTask(ScheduledFutureTask)再次被封装成了Worker(代码三),那么t.start()(代码四),自然会执行Worker的run方法,跟下Worker.run方法:Worker.run -> runWorker(Worker w)

浓缩后的runWorker

final void runWorker(Worker w){
    
    ...    //省略部分代码
    
    try{
        while (task != null || (task = getTask()) != null){    //代码五 -  getTask()获取任务
            
            ...    //省略部分代码
                          
            task.run();    //代码六 - 任务执行
            
            ...    //省略部分代码
        }
        completedAbruptly = false;
    }finally{
        processWorkerExit(w, completedAbruptly);
    }
}

老规矩,五、六两处关键代码分别看一下:

代码五 getTask最终定位到DelayedWorkQueue.take方法,这里只分析延时任务的执行情况

public RunnableScheduledFuture take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture first = queue[0];
            if (first == null)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    return finishPoll(first);
                first = null; // don"t retain ref while waiting
                if (leader != null)    //代码八 - leader线程就是下一次的工作线程
                    available.await();
                else {
                    Thread thisThread = Thread.currentThread();    //代码七 - 指定leader线程
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);    //等待
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

对于延时任务来说,线程池中第一个调用take的线程进来会作为leader线程(代码七),然后等待。结束等待的位置在哪?在ScheduledFutureTask.run的调用中!(我作断点调试的时候,这个等待时间总是很大,一般两个小时以上,似乎直接用await就成?这一点确有疑问)。
而线程池中的其它线程调用take时,发现leader已经被第一个线程抢了,只能等着(代码八)

回到 代码六 位置,task.run()也就是ScheduledFutureTask.run

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {    //对于延时任务,会进入这个分支
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

对于延时任务,会执行ScheduledFutureTask.super.runAndReset()

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable c = callable;
        if (c != null && s == NEW) {
            try {
                //代码九 - 阻塞式等待beeper完成
                c.call(); // don"t set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

runAndReset方法会等待最初定义的beeper逻辑执行完成(代码九),这也解释了为什么scheduleAtFixedRate的下次任务执行时间会有可能超过参数period的设定!

然后调用reExecutePeriodic

void reExecutePeriodic(RunnableScheduledFuture task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);    //队列中再次加入任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //再次回到ensurePrestart方法
    }
}

reExecutePeriodic方法看上去是不是似曾相识,与本小节(3.延时执行)开端的delayedExecute方法对比下:

private void delayedExecute(RunnableScheduledFuture task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);    //任务加入DelayedWorkQueue
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();    //任务开始
    }
}

都是加入队列,然后任务开始!

DelayedWorkQueue.add中到底做了什么?之前没有分析,在这里看一下:DelayedWorkQueue.add -> offer

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture e = (RunnableScheduledFuture)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        if (i >= queue.length)
            grow();
        size = i + 1;
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        } else {
            siftUp(i, e);
        }
        if (queue[0] == e) {
            leader = null;    //将leader赋值清除
            available.signal();    //代码十 - 通知线程
        }
    } finally {
        lock.unlock();
    }
    return true;
}

可以看到,就是在offer方法(代码十),将DelayedWorkQueue.take中的available.awaitNanos(delay)唤醒了!

总结

是不是已经绕晕了?很正常,因为源码终归是需要自己去读个几遍才能理清整个脉络。所以老铁们,加油!

最后的总结还是不能缺少的,一个定时任务的执行流程是这样的:

1.任务开始时,将任务ScheduledFutureTask放入队列DelayedWorkQueue。任务放入过程会计算该任务的开始执行时间,执行时间靠前的放入队列的前端,执行时间靠后的放入队列的后端。

2.之后的ensurePrestart方法,先从线程池中获取线程,该线程会从队列DelayedWorkQueue中获取ScheduledFutureTask

获取过程DelayedWorkQueue.take先计算任务的延时时间delay ,有两种情况:

delay<=0 已不需要延时,立即获取任务

delay>0 需要延时,出现如下局面:

第一个进入的线程成为leader

其它线程等待

long delay = first.getDelay(NANOSECONDS);    //计算延时时间delay 

//已不需要延时,立即获取任务
if (delay <= 0)
    return finishPoll(first);    
first = null; // don"t retain ref while waiting

//需要延时的任务(与此同时有任务正在执行)
if (leader != null)    //其它线程进来时,有leader线程存在了,等待
    available.await();
else {
    Thread thisThread = Thread.currentThread();    //第一个进入这里的线程会成为leader
    leader = thisThread;
    try {
        available.awaitNanos(delay);    //等待
    } finally {
        if (leader == thisThread)
            leader = null;
    }
}

3.获取任务后,进入执行环节Worker.run -> ScheduledFutureTask.run。执行过程会阻塞式等待任务完成,这也是任务执行时间可能会超过period的原因!任务执行结束会再次放入任务,这样又回到步骤1,反复执行。

感谢

分析Java延迟与周期任务的实现原理描述

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76305.html

相关文章

  • 源起Netty 外传ScheduledThreadPoolExecutor源码解读

    引言 本文是源起netty专栏的第4篇文章,很明显前3篇文章已经在偏离主题的道路上越来越远。于是乎,我决定:继续保持…… 使用 首先看看源码类注释中的示例(未改变官方示例逻辑,只是增加了print输出和注释) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    funnyZhang 评论0 收藏0
  • 源起Netty 外传ScheduledThreadPoolExecutor源码解读

    引言 本文是源起netty专栏的第4篇文章,很明显前3篇文章已经在偏离主题的道路上越来越远。于是乎,我决定:继续保持…… 使用 首先看看源码类注释中的示例(未改变官方示例逻辑,只是增加了print输出和注释) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....

    Martin91 评论0 收藏0
  • 源起Netty 外传】FastThreadLocal怎么Fast?

    摘要:实现原理浅谈帮助理解的示意图中有一属性,类型是的静态内部类。刚刚说过,是一个中的静态内部类,则是的内部节点。这个会在线程中,作为其属性初始是一个数组的索引,达成与类似的效果。的方法被调用时,会根据记录的槽位信息进行大扫除。 概述 FastThreadLocal的类名本身就充满了对ThreadLocal的挑衅,快男FastThreadLocal是怎么快的?源码中类注释坦白如下: /** ...

    gxyz 评论0 收藏0
  • 源起Netty 外传】ServiceLoader详解

    摘要:答曰摸索直译为服务加载器,最终目的是获取的实现类。代码走起首先,要有一个接口形状接口介绍然后,要有该接口的实现类。期具体实现依靠的内部类,感性趣的朋友可以自己看一下。总结重点在于可跨越包获取,这一点笔者通过多模块项目亲测延时加载特性 前戏 netty源码注释有云: ... If a provider class has been installed in a jar file tha...

    MoAir 评论0 收藏0
  • 源起Netty 外传】System.getPropert()详解

    摘要:阅读源码时,发现很多,理所当然会想翻阅资料后,该技能,姿势如下环境中的全部属性全部属性注意如果将本行代码放在自定义属性之后,会不会打出把自定义属性也给获取到可以结论会获取目前环境中全部的属性值,无论系统提供还是个人定义系统提供属性代码中定义 阅读源码时,发现很多System.getProperty(xxx),理所当然会想:whats fucking this? 翻阅资料后,Get该技能...

    lixiang 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<