摘要:主要的实现实际上运行还是一个,它对做了一个封装,让开发人员可以从其中获取返回值是有状态的共种状态,四种状态变换的可能和的区别通过方法调用有返回值可以抛异常结果的实现原理判断状态非状态则直接进入返回结果处于状态,则进入等待流程获
主要的实现FutureTask
# FutureTask实际上运行还是一个runnable,它对callable做了一个封装,让开发人员可以从其中获取返回值; FutrueTask是有状态的 共7种状态,四种状态变换的可能 NEW -> COMPLETING -> EXCEPTIONAL NEW -> CANCELLED NEW -> COMPLETING -> NORMAL NEW -> INTERRUPTING -> INTERRUPTEDCallable和runnable的区别
0. 通过call方法调用; 1. 有返回值 2. 可以抛异常get结果的实现原理
1. 判断状态; 2. 非NEW,COMPLETING状态则直接 进入report返回结果; 3. 处于NEW,COMPLETING状态,则进入等待awaitDone();3.x awaitDone 流程
3.1. 获取等待的超时时间deadline; 3.2. 进入自旋 3.3. 判断线程是否被中断:如果被中断则移出等待waiters队列;并抛出异常; 3.4. 判断FutrueTask状态:如果">COMPLETING",代表执行完成,进入report; 3.5. 判断FutrueTask状态:如果"=COMPLETING",让出CPU执行Thread.yield(); 3.6. 为当前线程创建一个node节点; 3.7. 将当前线程WaitNode加入等待队列waiters中; 3.8. 判断是否超时; 3.9. 通过LockSupport.park挂起线程,等待运行许可; 4. report返回执行结果:如果一切正常就返回执行结果,否则返回Exception;run具体执行原理如下:
1. 判断状态是否正常,避免重复执行; 2. 调用callable的call()方法; 3. 修改执行状态;保存执行结果;并通知正在等待get的线程; ## 3.x通知机制finishCompletion 3.1. 获取所有waiters的集合; 3.2. 通过cas 拿到执行权; 3.3. 循环遍历所有等待的线程,通过LockSupport.unpark 唤醒其执行;Callable和Future的实现原理(JDK8源码分析) 1. cancel 取消执行
public boolean cancel(boolean mayInterruptIfRunning) { // 判断状态:只有刚创建的情况下才能取消 // mayInterruptIfRunning:是否中断当前正在运行这个FutureTask的线程; if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception // 如果要中断当前线程,则对runner发布interrupt信号; if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state // 修改状态为:已经通知线程进行中断 UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 通知其他在等待结果的线程 finishCompletion(); } return true; }2. run
public void run() { // 判断状态及设置futuretask归属的线程 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable3. getc = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 执行Callable result = c.call(); // 标记为执行成功 ran = true; } catch (Throwable ex) { result = null; // 标记为执行不成功 ran = false; // 设置为异常状态,并通知其他在等待结果的线程 setException(ex); } // 如果执行成功,修改状态为正常,并通知其他在等待结果的线程 if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 如果状态为准备发起中断信号或者已经发出中断信号,则让出CPU(Thread.yield()) if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果还没执行完,则等待 if (s <= COMPLETING) s = awaitDone(false, 0L); // 通过report取结果 return report(s); }3.1 report 取执行结果
private V report(int s) throws ExecutionException { Object x = outcome; // 如果一切正常,则返回x(x是callable执行的结果outcome) if (s == NORMAL) return (V)x; // 如果被取消,则抛出已取消异常 if (s >= CANCELLED) throw new CancellationException(); // 否则抛出执行异常 throw new ExecutionException((Throwable)x); }3.2 awaitDone 等待FutureTask执行结束
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 记录等待超时的时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; // 多个在等待结果的线程,通过一个链表进行保存,waitNode就是每个线程在链表中的节点; WaitNode q = null; boolean queued = false; // 死循环...也可以说是自旋锁同步 for (;;) { // 判断当前这个调用get的线程是否被中断 if (Thread.interrupted()) { // 将当前线程移出队列 removeWaiter(q); throw new InterruptedException(); } int s = state; // 如果状态非初创或执行完毕了,则跳出循环,通过report()取执行结果 if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 如果状态等于已执行,让出CPU执行,等待状态变为正常结束 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 如果当前线程还没有创建对象的waitNode节点,则创建一个 else if (q == null) q = new WaitNode(); // 如果当前线程对应的waitNode还没有加入到等待链表中,则加入进去; else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 如果有设置等待超时时间,则通过parkNanos挂起当前线程,等待继续执行的信号 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } // 通过park挂起当前线程,等待task执行结束后给它发一个继续执行的信号(unpark) else LockSupport.park(this); } }4. finishCompletion 通知所有在等待结果的线程
private void finishCompletion() { // assert state > COMPLETING; // 遍历所有正在等待执行结果的线程 for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; // unpark,发布一个让它继续执行的“许可” LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/69216.html
摘要:表示一个异步任务的结果,就是向线程池提交一个任务后,它会返回对应的对象。它们分别提供两个重要的功能阻塞当前线程等待一段时间直到完成或者异常终止取消任务。此时,线程从中返回,然后检查当前的状态已经被改变,随后退出循环。 0 引言 前段时间需要把一个C++的项目port到Java中,因此时隔三年后重新熟悉了下Java。由于需要一个通用的线程池,自然而然就想到了Executors。 用了...
摘要:零前期准备文章异常啰嗦且绕弯。版本版本简介是中默认的实现类,常与结合进行多线程并发操作。所以方法的主体其实就是去唤醒被阻塞的线程。本文仅为个人的学习笔记,可能存在错误或者表述不清的地方,有缘补充 零 前期准备 0 FBI WARNING 文章异常啰嗦且绕弯。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 简介 ...
摘要:本文的源码基于。人如其名,包含了和两部分。而将一个任务的状态设置成终止态只有三种方法我们将在下文的源码解析中分析这三个方法。将栈中所有挂起的线程都唤醒后,下面就是执行方法这个方法是一个空方 前言 系列文章目录 有了上一篇对预备知识的了解之后,分析源码就容易多了,本篇我们就直接来看看FutureTask的源码。 本文的源码基于JDK1.8。 Future和Task 在深入分析源码之前,我...
摘要:从而可以启动和取消异步计算任务查询异步计算任务是否完成和获取异步计算任务的返回结果。原理分析在分析中我们没有看它的父类,其中有一个方法,返回一个,说明该方法可以获取异步任务的返回结果。 FutureTask介绍 FutureTask是一种可取消的异步计算任务。它实现了Future接口,代表了异步任务的返回结果。从而FutureTask可以启动和取消异步计算任务、查询异步计算任务是否完成...
阅读 2922·2021-11-17 09:33
阅读 1640·2021-10-12 10:13
阅读 2466·2021-09-22 15:48
阅读 2341·2019-08-29 17:19
阅读 2596·2019-08-26 11:50
阅读 1573·2019-08-26 10:37
阅读 1738·2019-08-23 16:54
阅读 2926·2019-08-23 14:14