资讯专栏INFORMATION COLUMN

FutureTask

GeekGhc / 762人阅读

摘要:可取消的异步计算。只有在计算完成后才能检索结果如果计算还没有完成,方法将会被阻塞。任务正常执行结束。任务执行过程中发生异常。任务即将被中断。运行完成后将会清空。根据执行结果设置状态。

FutureTask What is it

​ 可取消的异步计算。该类提供了 Future的基本实现,其中包括启动和取消计算的方法,查询计算是否完成以及检索计算结果的方法。只有在计算完成后才能检索结果;如果计算还没有完成,{getcode}方法将会被阻塞。一旦计算完成,计算不能被重新启动或取消(除非计算是使用调用的runAndReset()。

​ 该类实现自RunableFuture接口,其中RunableFuture接口又继承自Runable和Future。所以可以理解为:FutureTask是一个可以计算Future结果的一个Future实现,

How to use

由于FutureTask间接或直接实现了Runable和Future接口,所以其具有如下特征:

可以像一个普通的任务一样,使用线程池提交一个任务并执行。

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(new FutureTask(new Callable() {
    @Override
    public Integer call() throws Exception {
          return 100;
    }
}));

可以像一个普通的任务一样,使用Thread来执行,但可以异步获取结果。

 FutureTask futureTask = new FutureTask(new Callable() {
   @Override
   public Integer call() throws Exception {
      return 100;
   }
});
new Thread(futureTask).start();
futureTask.get();

When to use

考虑一种使用Cache的场景:一般情况下,对于热点数据我们都会使用cache保存数据,只有当cache失效了,才会进行耗时的网络调用或者数据库查询。但是当cache失效时,同时有多个该key的查询,那么在短时间内可能会有多个相同的耗时查询,瞬间对系统性能会有一定的损失,为了解决这种情况可以采取缓存FutureTask的方式解决:

思路借鉴:https://github.com/javacreed/...

//获取缓存的客户端
public class CacheClient {
    public static  T getCache(int id){
        return null;
    }
}
//Service层逻辑
public class CacheService {
    private static ConcurrentMap> cacheFuture = new ConcurrentHashMap<>();
    public User getUserInfo(int id) {
        Future future = createFutureIfAbsent(id);
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return null;
    }
    private Future createFutureIfAbsent(final int id) {
        Future future = cacheFuture.get(id);
        if (future == null) {
            FutureTask futureTask = new FutureTask(new Callable() {
                @Override
                public User call() throws Exception {
                    return CacheClient.getCache(id);
                }
            });
            future = cacheFuture.putIfAbsent(id, futureTask);
            if (future == null) {
                future = futureTask;
                futureTask.run();
            }
        }
        return future;
    }
    public class User {
        private int id;
        private String name;
        private String age;
        。。。
    }
}
How to design
状态机

​ FutureTask作为一个可运行的Future,其运行过程中存在状态的迁移过程,FutureTask的运行状态有:

NEW:初始状态。

COMPLETING:结果正在被set过程中。

NORMAL:任务正常执行结束。

EXCEPTIONAL:任务执行过程中发生异常。

CANCELLED:任务执行过程中被取消。

INTERRUPTING:任务即将被中断。

INTERRUPTED:任务已经被中断。

状态跃迁:

正常结束:NEW->COMPLETING->NORMAL

出现异常:NEW->COMPLETING->EXCEPTIONAL

任务被取消且不响应中断:NEW->CANCELLED

任务被取消且响应中断:NEW->INTERRUPTING->INTERRUPTED

成员变量

state:指示当前任务执行的状态。

callback:需要被运行的任务。运行完成后将会清空。

outcome:保存任务执行之后的结果。

runner:持有任务执行过程中运行线程。

waiters:等待线程的堆栈[稍后将做详细分析]。

构造方法
public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}

FutureTask有两个构造方法,虽然两个构造方法的入参略有不同,但是在底层执行时都是按照Callback任务来构建的。并在此过程初始化当前的任务状态为:NEW

核心方法

下面将从核心方法开始,逐渐分析FutureTask的原理:

run():任务执行

public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

该方法的逻辑很简单,主要完成了如下任务:

1.首先判断任务的有效性:1)该任务的状态是否为初始状态:NEW,2)把运行任务的线程设置给成员变量runner。

2.执行任务。

3.根据执行结果设置状态。

get()/get(long timeout, TimeUnit unit)

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
 }```

该方法的逻辑更简单:首先判断当前的状态,然后就会调用awaitDone()方法等待结果,当等待超时就会抛出TimeOutException,否则调用report()将结果报告出去。下面看看等待结果是如何处理的:

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
          //首先计算出该任务的最终结束时间
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            //判断当前线程是否被中断
            if (Thread.interrupted()) {
                  //从等待队列中删除该线程的等待节点
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            //如果状态>COMPLETING,说明任务已经结束了,不管是否正常结束,都是可以返回的
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
              //如果当前状态还是COMPLETING,说明结果来没有返回呢,那就让出CPU
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            //如果当前任务还没有生成等待节点,那么就创建一个以当前线程的等待节点。
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
            //采用头插法构建等待队列
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                  //任务执行超时了,那么就删除等待队列
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                  //还没有超时,那么就将当前线程park
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
 

该方法虽然篇幅很大,但是完成的任务也是很简单的,主要可以总结如下:

首先判断在超时时间内,任务是否执行完成(失败)。

通过状态为判断任务是否执行完成或失败。

​ NOTE:为什么要使用这个waiter?[多带带文章分析:]

Conclusion

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68154.html

相关文章

  • 追踪解析 FutureTask 源码

    摘要:零前期准备文章异常啰嗦且绕弯。版本版本简介是中默认的实现类,常与结合进行多线程并发操作。所以方法的主体其实就是去唤醒被阻塞的线程。本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充 零 前期准备 0 FBI WARNING 文章异常啰嗦且绕弯。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 简介 ...

    xcc3641 评论0 收藏0
  • 【Java并发】Runnable、Callable、Future、FutureTask

    摘要:声明了几种方法,其中有一个就是传入声明了对具体的或者任务执行进行取消查询结果获取等方法。事实上,是接口的一个唯一实现类。使用示例第一种方式是使用继承了的线程池中的方法,将直接提交创建。 创建线程的两种方式 直接继承 Thread 实现 Runnable 接口 这两种方式都有一个缺点:在执行完成任务之后,无法直接获取到最后的执行结果。如果需要获取执行结果,就必须通过共享变量或线程通...

    zhaot 评论0 收藏0
  • Java多线程-Callable和Future

    摘要:类提供了一些有用的方法在线程池中执行内的任务。在线程池提交任务后返回了一个对象,使用它可以知道任务的状态和得到返回的执行结果。 Callable和Future出现的原因 创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。 这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。 如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达...

    seasonley 评论0 收藏0
  • CountDownLatch + Callbale+FutureTask 实现异步变同步调用

    摘要:背景通过接口实现调用发送数据,接口返回值为发送数据的对应结果。接口为同步阻塞,为异步回调方式。接收数据回调接收到数据后,通过闭锁释放阻塞的线程,同时设置结果返回给调用者 背景 通过HTTP接口实现调用MQTT Client发送数据,HTTP接口返回值为MQTT Client发送数据的对应结果。 HTTP接口为同步阻塞,MQTT Client 为异步回调方式。如何实现在HTTP接口中调用...

    张金宝 评论0 收藏0
  • java并发编程学习之FutureTask

    摘要:在并发编程学习之三种线程启动方式中有提过。是否执行结束,包括正常执行结束或异常结束。获取返回值,没有得到返回值前一直阻塞。运行结果如下由于任务被取消,所以抛出异常。注意的是,此时线程还在跑,和返回的是。并不能让任务真正的结束。 FutureTask 在java并发编程学习之三种线程启动方式中有提过。主要的方法如下: cancel(boolean mayInterruptIfRunni...

    BothEyes1993 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<