资讯专栏INFORMATION COLUMN

追踪解析 FutureTask 源码

xcc3641 / 540人阅读

摘要:零前期准备文章异常啰嗦且绕弯。版本版本简介是中默认的实现类,常与结合进行多线程并发操作。所以方法的主体其实就是去唤醒被阻塞的线程。本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

零 前期准备 0 FBI WARNING

文章异常啰嗦且绕弯。

1 版本

JDK 版本 : OpenJDK 11.0.1

IDE : idea 2018.3

2 ThreadLocal 简介

FutureTask 是 jdk 中默认的 Future 实现类,常与 Callable 结合进行多线程并发操作。

3 Demo
import java.util.concurrent.*;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
        
        //创建一个线程池
        ExecutorService pool = Executors.newFixedThreadPool(1);
        try{
            //创建一个要执行的 Callable 对象
            //此处其实 Runnable 对象也可以,但是通常不会那样做
            Callable task = () -> {
                //休眠三秒
                TimeUnit.SECONDS.sleep(3);
                //返回一个字符串
                return "hello";
            };

            //用 FutureTask 对象去包装 Callable
            FutureTask futureTask = new FutureTask<>(task);

            //此处将 FutureTask 对象丢进线程池里
            pool.submit(futureTask);

            //注意,此处的 futureTask 本质上是作为 Runnable 被丢进池子里的
            //所以也可以用线程池的 execute(...) 方法
            //pool.execute(futureTask)

            //还有一种更常见的执行方式是直接使用 Thread
            //new Thread(futureTask).start();

            //获取结果
            //注意,如果没有获取到的话此处会阻塞线程直到获取到为止
            String result = futureTask.get();

            //还有一种限时策略的结果获取
            //超时的情况下会抛出异常
            //String result = futureTask.get(1,TimeUnit.SECONDS);

            System.out.println(result);
        }finally {
            //关闭连接池
            pool.shutdown();
        }

    }
}
一 FutureTask 的创建

回到 Demo 中的创建代码:

FutureTask futureTask = new FutureTask<>(task);

追踪 FutureTask 的构造器:

//FutureTask.class
public FutureTask(Callable callable) {
    //有效性判断,不能为空
    if (callable == null)
        throw new NullPointerException();
    //记录下 callable 对象
    this.callable = callable;
    //state 是一个 int 类型的对象,是一个
    //NEW = 0
    this.state = NEW;
}
二 run

FutureTask 本身是 Runnable 的实现类,其在被 ThreadPoolExecutor 或者 Thread 对象消费的时候也是被当做 Runnable 的实现类的。

所以其本身的核心逻辑就必然在 run() 方法中:

//FutureTask.class
public void run() {

    //先判断状态,如果状态不是 NEW 就会直接返回
    //RUNNER 是一个 VarHandler 类型的变量,指向了 FutureTask 中的 thread 变量,用于储存当前的线程
    //但是如果 thread 已经不为 null,此处也会直接返回
    //这两种返回条件都意味着此 FutureTask 的 run() 方法已经执行过了
    if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;

    try {
        //获取 callable
        Callable c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //执行 callable 的业务逻辑
                result = c.call();
                //ran 为成功标识
                ran = true;
            } catch (Throwable ex) {
                //出错的情况下
                result = null;
                ran = false;
                //不成功的情况下存入 exception
                setException(ex);
            }
            //如果成功的话会在此处进行操作
            if (ran)
                set(result);
        }
    } finally {
        //置空
        runner = null;
        int s = state;
        if (s >= INTERRUPTING)
            //如果此 FutreTask 的状态是中断状态,会在此处不断调用 Thread.yield() 空转
            handlePossibleCancellationInterrupt(s);
    }
}

此处有两个关键方法,即为 setException(...) 和 set(...):

//FutureTask.class
protected void setException(Throwable t) {
    //用 CAS 操作比较并更新状态值
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        //outcome 是一个 Object 对象,用于存储 callable 的返回值
        //此处由于报错了,所以储存的是错误对象
        outcome = t;
        //EXCEPTIONAL = 3
        STATE.setRelease(this, EXCEPTIONAL);
        //最后清理工作,主要用于唤醒等待线程和执行 callable
        finishCompletion();
    }
}

//FutureTask.class
protected void set(V v) {
    //基本逻辑和 setException(...) 方法雷同,只是 STATE 和 outcome 的储存值不同
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        outcome = v;
        STATE.setRelease(this, NORMAL);
        finishCompletion();
    }
}

再来看 finishCompletion() 方法:

//FutureTask.class
private void finishCompletion() {
    //WaitNode 是 FutureTask 的静态内部类
    //其本质上是单向链表的节点表示类,用于存放想要获取 Callable 的返回值但是被阻塞的线程的线程对象
    for (WaitNode q; (q = waiters) != null;) {
        //此处使用 CAS 将 q 从 WAITERS 里去除
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    //此处置空线程对象,帮助 GC
                    q.thread = null;
                    //唤醒线程
                    LockSupport.unpark(t);
                }
                //接着往下遍历
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; 
                q = next;
            }
            break;
        }
    }
    //此方法是空的
    done();
    //置空 callable
    callable = null;
}

之前提到过在 FutureTask 的 get(...) 方法中会阻塞线程,直到 Callable 执行完毕并能够获取返回值的时候才会结束阻塞。

所以 finishCompletion() 方法的主体其实就是去唤醒被阻塞的线程。

三 get

回到 Demo 中的创建代码:

String result = futureTask.get();

追踪 get() 方法:

//step 1
//FutureTask.class
public V get() throws InterruptedException, ExecutionException {
    //此处先判断状态值,如果非 COMPLETING,即为还没完成,就会调用 awaitDone(...) 方法阻塞线程
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //返回结果
    return report(s);
}

//step 2
//FutureTask.class
private V report(int s) throws ExecutionException {
    //获取需要返回的对象
    Object x = outcome;
    //如果是正常结束的就直接返回对象即可
    if (s == NORMAL)
        return (V)x;
    //出错的情况下,抛异常
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

再来看一下阻塞线程的 awaitDone(...) 方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException {
    
    //循环的次数
    long startTime = 0L;
    //节点对象
    WaitNode q = null;
    //链表队列标识,代表该线程是否被加入链表中,初始为 false 代表未被加入
    boolean queued = false;
    for (;;) {
        int s = state;
        if (s > COMPLETING) { //如果 Callable 的执行已经完成
            if (q != null)
                q.thread = null;
            return s;
        }else if (s == COMPLETING) //Callable 的执行刚刚完成,后续工作还没做
            Thread.yield();
        else if (Thread.interrupted()) {
            //线程被中断了,会抛出错误
            removeWaiter(q);
            throw new InterruptedException();
        } else if (q == null) { //进入此处的判断证明 Callable 还未完成,所以会创建等待节点
            //此处的 timed 传入为 false,不会在此返回
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode(); //新建节点
        }else if (!queued)
            //queued 初始为 false,进入此处的时候会将上一个判断条件中新建的 q 加入到链表的首节点中
            //并且 queued 变成 true
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        else if (timed) {
            //如果此操作是限时的,那么这里需要判断时间
            final long parkNanos;
            if (startTime == 0L) {
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            if (state < COMPLETING)
                //此处挂起线程,时间为 parkNanos
                //本例中传入为 0L,所以是永久挂起
                LockSupport.parkNanos(this, parkNanos);
        }else
            //永久挂起线程
            LockSupport.park(this);
    }
}
四 一点唠叨

FutureTask 和 ThreadLocal 一样,都是 java.util.current 包中的小工具,封装不复杂,理解即可。

本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/73421.html

相关文章

  • FutureTask源码解析(1)——预备知识

    摘要:在分析它的源码之前我们需要先了解一些预备知识。因为接口没有返回值所以为了与兼容我们额外传入了一个参数使得返回的对象的方法直接执行的方法然后返回传入的参数。 前言 系列文章目录 FutureTask 是一个同步工具类,它实现了Future语义,表示了一种抽象的可生成结果的计算。在包括线程池在内的许多工具类中都会用到,弄懂它的实现将有利于我们更加深入地理解Java异步操作实现。 在分析...

    mmy123456 评论0 收藏0
  • 【Java并发】Runnable、Callable、Future、FutureTask

    摘要:声明了几种方法,其中有一个就是传入声明了对具体的或者任务执行进行取消查询结果获取等方法。事实上,是接口的一个唯一实现类。使用示例第一种方式是使用继承了的线程池中的方法,将直接提交创建。 创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果。如果需要获取执行结果,就必须通过共享变量或线程通...

    zhaot 评论0 收藏0
  • FutureTask源码解析(2)——深入理解FutureTask

    摘要:本文的源码基于。人如其名,包含了和两部分。而将一个任务的状态设置成终止态只有三种方法我们将在下文的源码解析中分析这三个方法。将栈中所有挂起的线程都唤醒后,下面就是执行方法这个方法是一个空方 前言 系列文章目录 有了上一篇对预备知识的了解之后,分析源码就容易多了,本篇我们就直接来看看FutureTask的源码。 本文的源码基于JDK1.8。 Future和Task 在深入分析源码之前,我...

    Harpsichord1207 评论0 收藏0
  • 系列文章目录

    摘要:为了避免一篇文章的篇幅过长,于是一些比较大的主题就都分成几篇来讲了,这篇文章是笔者所有文章的目录,将会持续更新,以给大家一个查看系列文章的入口。 前言 大家好,笔者是今年才开始写博客的,写作的初衷主要是想记录和分享自己的学习经历。因为写作的时候发现,为了弄懂一个知识,不得不先去了解另外一些知识,这样以来,为了说明一个问题,就要把一系列知识都了解一遍,写出来的文章就特别长。 为了避免一篇...

    lijy91 评论0 收藏0
  • 系列文章目录

    摘要:为了避免一篇文章的篇幅过长,于是一些比较大的主题就都分成几篇来讲了,这篇文章是笔者所有文章的目录,将会持续更新,以给大家一个查看系列文章的入口。 前言 大家好,笔者是今年才开始写博客的,写作的初衷主要是想记录和分享自己的学习经历。因为写作的时候发现,为了弄懂一个知识,不得不先去了解另外一些知识,这样以来,为了说明一个问题,就要把一系列知识都了解一遍,写出来的文章就特别长。 为了避免一篇...

    Yumenokanata 评论0 收藏0

发表评论

0条评论

xcc3641

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<