摘要:前言在前面的三篇文章中先后介绍了框架的任务组件体系体系源码并简单介绍了目前的并行流应用场景框架本质上是对的扩展它依旧支持经典的使用方式即任务池的配合向池中提交任务并异步地等待结果毫无疑问前面的文章已经解释了框架的新颖性初步了解了工作窃取
前言
在前面的三篇文章中先后介绍了ForkJoin框架的任务组件(ForkJoinTask体系,CountedCompleter体系)源码,并简单介绍了目前的并行流应用场景.ForkJoin框架本质上是对Executor-Runnable/Callable-Future/FutureTask的扩展,它依旧支持经典的Executor使用方式,即任务+池的配合,向池中提交任务,并异步地等待结果.
毫无疑问,前面的文章已经解释了ForkJoin框架的新颖性,初步了解了工作窃取依托的数据结构,ForkJoinTask/CountedCompleter在执行期的行为,也提到它们一定要在ForkJoinPool中进行运行和调度,这也是本文力求解决的问题.
ForkJoinPool源码ForkJoinPool源码是ForkJoin框架中最复杂,最难理解的部分,且因为交叉依赖ForkJoinTask,CountedCompleter,ForkJoinWorkerThread,作者在前面多带带用两篇文章分析了它们,以前两篇文章为基础,重复部分本文不再详述.
首先看类签名.
//禁止伪共享 @sun.misc.Contended //继承自AbstractExecutorService public class ForkJoinPool extends AbstractExecutorService
前面的几篇文章不止一次强调过ForkJoin框架的"轻量线程,轻量任务"等概念,也提到少量线程-多数计算,资源空闲时窃取任务.并介绍了基于status状态的调度(ForkJoinTask系列),不基于status而由子任务触发完成的调度(CountedCompleter系列),显然它们的共性就是让线程在正常调度的前提下尽量少的空闲,最大幅度利用cpu资源,伪共享/缓存行的问题在ForkJoin框架中显然会是一个更大的性能大杀器.在1.8之前,一般通过补位的方式解决伪共享问题,1.8之后,官方使用@Contended注解,令虚拟机尽量注解标注的字段(字段的情况)或成员字段放置在不同的缓存行,从而规避了伪共享问题.
建立ForkJoinPool可以直接new,也可以使用Executors的入口方法.
//Executors方法,显然ForkJoinPool被称作工作窃取线程池.参数指定了并行度. public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, //默认线程工厂,前文中已提过默认的ForkJoinWorkerThread ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //不提供并行度. public static ExecutorService newWorkStealingPool() { return new ForkJoinPool //使用所有可用的处理器 (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } //对应的,ForkJoinPool的构造器们. //不指定任何参数. public ForkJoinPool() { //并行度取MAX_CAP和可用处理器数的最小值. this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()), //默认的线程工厂.无异常处理器,非异步模式. defaultForkJoinWorkerThreadFactory, null, false); } //同上,只是使用参数中的并行度. public ForkJoinPool(int parallelism) { this(parallelism, defaultForkJoinWorkerThreadFactory, null, false); } public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, boolean asyncMode) { //并行度需要校验 this(checkParallelism(parallelism), //校验线程工厂 checkFactory(factory), //参数指定的未捕获异常处理器. handler, //前面的几处代码asyncMode都是false,会选用LIFO队列,是true是会选用FIFO队列,后面详述. asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //线程名前缀 "ForkJoinPool-" + nextPoolId() + "-worker-"); //检查许可,不关心. checkPermission(); } //检查方法很简单. //并行度不能大于MAX_CAP不能不大于0. private static int checkParallelism(int parallelism) { if (parallelism <= 0 || parallelism > MAX_CAP) throw new IllegalArgumentException(); return parallelism; } //线程工厂非空即可. private static ForkJoinWorkerThreadFactory checkFactory (ForkJoinWorkerThreadFactory factory) { if (factory == null) throw new NullPointerException(); return factory; } //最终构造器,私有.待介绍完一些基础字段后再述. private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; //config初始化值,用并行度与mode取或,显然mode是FIFO时,将有一个第17位的1. this.config = (parallelism & SMASK) | mode; //np保存并行度(正数)的相反数(补码). long np = (long)(-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
了解其他线程池源码的朋友可以去回忆其他线程池的构建,不论是调度线程池还是普通的线程池或者缓存池,他们其实都设置了核心线程数和最大线程数.当然这要看定义"线程池分类"的视角,以Executors入口的api分类,或许可以分类成固定线程池,缓冲池,单线程池,调度池,工作窃取池;但以真正的实现分类,其实只有ThreadPoolExecutor系列(固定线程池,单线程池都直接是ThreadPoolExecutor,调度池是它的子类,缓冲池也是ThreadPoolExecutor,只是阻塞队列限定为SynchronizedQueue)和ForkJoinPool系列(工作窃取池).
作者更倾向于用实现的方式区分,也间接参照Executors的api使用用途的区分方式.如果不使用Executors的入口api,不论哪种ThreadPoolExecutor系列,我们都可以提供线程池的大小配置,阻塞队列,线程空闲存活时间及单位,池满拒绝策略,线程工厂等,而所谓的缓存池和固定池的区别只是队列的区别.
调度池的构造参数与ThreadPoolExecutor无异,只是内限了阻塞队列的类型,它虽然是ThreadPoolExecutor的扩展,却不仅没有拓充参数,反而减少了两个参数:阻塞队列和最大线程数.阻塞队列被默认设置为内部类DelayQueue,它实现了BlockingQueue,最大线程数则为整数上限,同时新增的对任务的延时或重试等属性则是依托于内部维护的一个FutureTask的扩展,并未增加到构造参数.
而到了ForkJoinPool,我们看到的是截然不同于ThreadPoolExecutor系列的构建方式.首先根本没有提供核心线程和最大线程数,线程空闲存活时间的参数和阻塞队列以及池满拒绝策略;线程工厂也仅能提供生产ForkJoinWorkerThread的工厂bean;还具备一些ThreadPoolExecutor没有的参数,如未捕获异常处理器,同步异步模式,工作线程前缀(其实别的类型的线程工厂也可以提供线程前缀,默认就是常见的pool-前缀)等.
显然从参数看便可猜测出若干不同于其他线程池的功能.但我们更关心其中的一些参数设置.
一般的参数都能见名知义,仅有config和ctl难以理解,此处也不详细介绍,只说他们的初值的初始化.
config是并行度与SMASK取与运算再与mode取或,这里并行度最大是15位整数(MAX_CAP=0x7FFF),而SMASK作用于整数后16位,mode在FIFO为1<<16,LIFO是0.很好计算.
ctl其实是一个控制信号,我们后面会在具体源码就地解释,它的计算先通过了一个局部变量np.
np的计算方法是将并行度的相反数(补码)转换为长整型.前面简单分析,并行度不会大于MAX_CAP,因此np至少前49位全部是1.
计算ctl时,将np左移AC_SHIFT即为取后16位,将np左移TC_SHIFT即取它的后32位,分别与AC_MASK和TC_SHIFT,表示取np的后16位分别放置于ctl的前16位和33至48位.而ctl的后32位初值为0.
因为生成的ctl前16位和后16位相等,如果仔细用数学验证,可以发现,对前16位和后16位的末位同时加1,当添加了parallel次后,ctl将归0.这也是添加worker限制的重要数理依据.
前面列举了获取ForkJoinPool实例的几种方法,初步展示了构造一个ForkJoinPool的属性,也暴露了一些实现细节,而这些细节依赖于一些字段和成员函数,我们先从它们开始.
//ForkJoinWorkerThread的线程工厂. public static interface ForkJoinWorkerThreadFactory { //创建新线程要实现的方法. public ForkJoinWorkerThread newThread(ForkJoinPool pool); } //前面看到的默认线程工厂. static final class DefaultForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return new ForkJoinWorkerThread(pool); } } //创建InnocuousForkJoinWorkerThread的线程工厂,上一文已经介绍过. static final class InnocuousForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { private static final AccessControlContext innocuousAcc; static { Permissions innocuousPerms = new Permissions(); innocuousPerms.add(modifyThreadPermission); innocuousPerms.add(new RuntimePermission( "enableContextClassLoaderOverride")); innocuousPerms.add(new RuntimePermission( "modifyThreadGroup")); innocuousAcc = new AccessControlContext(new ProtectionDomain[] { new ProtectionDomain(null, innocuousPerms) }); } public final ForkJoinWorkerThread newThread(ForkJoinPool pool) { return (ForkJoinWorkerThread.InnocuousForkJoinWorkerThread) java.security.AccessController.doPrivileged( new java.security.PrivilegedAction() { public ForkJoinWorkerThread run() { return new ForkJoinWorkerThread. InnocuousForkJoinWorkerThread(pool); }}, innocuousAcc); } } //空任务 static final class EmptyTask extends ForkJoinTask { private static final long serialVersionUID = -7721805057305804111L; EmptyTask() { status = ForkJoinTask.NORMAL; } //状态直接是已正常完成. public final Void getRawResult() { return null; } public final void setRawResult(Void x) {} public final boolean exec() { return true; } }
以上是线程工厂和一个默认的EmptyTask.接下来看一些跨池和工作队列的公用常量.
// 与边界有关的常量 static final int SMASK = 0xffff; // 后16位. static final int MAX_CAP = 0x7fff; // 前面在定并行度时参考的最大容量. static final int EVENMASK = 0xfffe; // 后16位验偶数 static final int SQMASK = 0x007e; // 最大64个偶数槽,从第2位至7位共6位,2的6次方. // 与WorkQueue有关 static final int SCANNING = 1; // 对WorkQueue正在运行任务的标记 static final int INACTIVE = 1 << 31; // 标记负数 static final int SS_SEQ = 1 << 16; // 版本号使用,第17位1 // ForkJoinPool和WorkQueue的config有关常量. static final int MODE_MASK = 0xffff << 16; // 能滤取前16位. static final int LIFO_QUEUE = 0;//前面提到过的,非async模式(false),值取0. static final int FIFO_QUEUE = 1 << 16;//async模式(true),值取1. static final int SHARED_QUEUE = 1 << 31; // 共享队列标识,符号位表示负.
以上的字段含义只是粗略的描述,先有一个印象,后面看到时自然理解其含义.
接下来看核心的WorkQueue内部类.
//前面的文章说过,它是一个支持工作窃取和外部提交任务的队列.显然,它的实例对内存部局十分敏感, //WorkQueue本身的实例,或者内部数组元素均应避免共享同一缓存行. @sun.misc.Contended static final class WorkQueue { //队列内部数组的初始容量,默认是2的12次方,它必须是2的几次方,且不能小于4. //但它应该设置一个较大的值来减少队列间的缓存行共享. //在前面的java运行时和54篇java官方文档术语中曾提到,jvm通常会将 //数组放在能够共享gc标记(如卡片标记)的位置,这样每一次写都会造成严重内存竞态. static final int INITIAL_QUEUE_CAPACITY = 1 << 13; //最大内部数组容量,默认64M,也必须是2的平方,但不大于1<<(31-数组元素项宽度), //根据官方注释,这可以确保无需计算索引概括,但定义一个略小于此的值有助于用户在 //系统饱合前捕获失控的程序. static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M // unsafe机制有关的字段. private static final sun.misc.Unsafe U; private static final int ABASE; private static final int ASHIFT; private static final long QTOP; private static final long QLOCK; private static final long QCURRENTSTEAL; static { try { U = sun.misc.Unsafe.getUnsafe(); Class> wk = WorkQueue.class; Class> ak = ForkJoinTask[].class; //top字段的句柄. QTOP = U.objectFieldOffset (wk.getDeclaredField("top")); //qlock字段的句柄. QLOCK = U.objectFieldOffset (wk.getDeclaredField("qlock")); //currentSteal的句柄 QCURRENTSTEAL = U.objectFieldOffset (wk.getDeclaredField("currentSteal")); //ABASE是ForkJoinTask数组的首地址. ABASE = U.arrayBaseOffset(ak); //scale代表数组元素的索引大小.它必须是2的平方. int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); //计算ASHIFT,它是31与scale的高位0位数量的差值.因为上一步约定了scale一定是一个正的2的几次方, //ASHIFT的结果一定会大于1.可以理解ASHIFT是数组索引大小的有效位数. ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } } //插曲,在Integer类的numberOfLeadingZeros方法,果然一流的程序是数学. public static int numberOfLeadingZeros(int i) { // HD, Figure 5-6 if (i == 0) //i本身已是0,毫无疑问地返回32.本例中i是2起,所以不会. return 32; //先将n初始化1.最后会减掉首位1. int n = 1; //i的前16位不存在非零值,则将n加上16并移除i的前16位.将i转换为一个以原i后16位开头的新值. if (i >>> 16 == 0) { n += 16; i <<= 16; } //不论前一步结果如何,若此时i的前8位不存在非零值,则n加上8,i移除前8位.将i转换为原i的后24位开头的新值. if (i >>> 24 == 0) { n += 8; i <<= 8; } //不论前一步结果如何,若此时i的前4位不存在非零值,则n加上4,i移除前4位.将i转换为原i的后28位开头的新值. if (i >>> 28 == 0) { n += 4; i <<= 4; } //不论前一步结果如何,若此时i的前2位不存在非零值,则n加上2,i移除前2位.将i转换为原i的后30位开头的新值. if (i >>> 30 == 0) { n += 2; i <<= 2; } //经过前面的运算,i的前30位的非零值数量已经记入n, //在前一步的基础上,此时i的前1位若存在非零值,则n-1,否则n保留原值. n -= i >>> 31; return n; } //回到WorkQueue // 实例字段 volatile int scanState; // 版本号,小于0代表不活跃,注释解释奇数代表正在扫描,但从代码语义上看正好相反. int stackPred; // 前一个池栈控制信号(ctl),它保有前一个栈顶记录. int nsteals; // 偷盗的任务数 int hint; // 一个随机数,用于决定偷取任务的索引. int config; // 配置,表示池的索引和模式 volatile int qlock; // 队列锁,1表示锁了,小于0表示终止,其他情况是0. volatile int base; // 底,表示下一个poll操作的插槽索引 int top; // 顶,表示下一个push操作的插槽索引 ForkJoinTask>[] array; // 存放任务元素的数组,初始不分配,首扩容会分配. final ForkJoinPool pool; // 包含该队列的池,可能在某些时刻是null. final ForkJoinWorkerThread owner; // 持有该队列的线程,如果队列是共享的,owner是null. volatile Thread parker; // 在调用park阻塞的owner,非阻塞时为null volatile ForkJoinTask> currentJoin; // 被在awaitJoin中join的task. volatile ForkJoinTask> currentSteal; // 字面意思当前偷的任务,主要用来helpStealer方法使用. //工作队列构造器,只初始化线程池,owner等字段. WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) { this.pool = pool; this.owner = owner; // Place indices in the center of array (that is not yet allocated) //base和top初始均为INITIAL_QUEUE_CAPACITY的一半,也就是2的11次方. base = top = INITIAL_QUEUE_CAPACITY >>> 1; } //返回本队列在池中的索引,使用config的2至4位表示.因为config的最后一位是奇偶位,忽略. final int getPoolIndex() { return (config & 0xffff) >>> 1; } //返回队列中的任务数. final int queueSize() { //非owner的调用者必须先读base,用base-top,得到的结果小于0则取相反数,否则取0. //忽略即时的负数,它并不严格准确. int n = base - top; return (n >= 0) ? 0 : -n; } //判断队列是否为空队.本方法较为精确,对于近空队列,要检查是否有至少一个未被占有的任务. final boolean isEmpty() { ForkJoinTask>[] a; int n, m, s; //base大于等于top,说明空了. return ((n = base - (s = top)) >= 0 || //有容量,且恰好计算为1,可能只有一个任务. (n == -1 && //计算为1,再验数组是不是空的. ((a = array) == null || (m = a.length - 1) < 0 || //取该位置元素的值判空,空则说明isEmpty. //取值的方式是取ForkJoinTask.class首地址加上偏移量(数组长度减一(最后一个元素位置,经典案例32-1)与运算top减一左移ASHIFT(索引大小有效位数)位)的值. U.getObject (a, (long)((m & (s - 1)) << ASHIFT) + ABASE) == null))); } //将一个任务压入队列,前文提过的fork最终就会压队.但此方法只能由非共享队列的持有者调用. //当使用线程池的"外部压入"externalPush方法时,压入共享队列. final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; //保存当时的base top. int b = base, s = top, n; //如果数组被移除则忽略. if ((a = array) != null) { //数组最后一个下标.如长度32,则m取31这个质数.此时保存一个m,对于保存后其他push操作相当于打了屏障. int m = a.length - 1; //向数组中的指定位置压入该任务.位置包含上面的m和s进行与运算(数组中的位置),结果左移索引有效长度位(索引长度),再加上数组首索引偏移量(起始地址). U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); //将top加1. U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { //计算旧的任务数量,发现不大于1个,说明原来很可能工作线程正在阻塞等待新的任务.需要唤醒它. if ((p = pool) != null) //signalWork会根据情况,添加新的工作线程或唤醒等待任务的线程. p.signalWork(p.workQueues, this); } else if (n >= m)//2. //任务数量超出了,对数组扩容. growArray(); } } //添加任务过程主流程无锁,包括可能出现的growArray.当原队列为空时,它会初始化一个数组,否则扩容一倍. //持有者调用时,不需要加锁,但当其他线程调用时,需要持有锁.在resize过程中,base可以移动,但top不然. final ForkJoinTask>[] growArray() { //记录老数组. ForkJoinTask>[] oldA = array; //根据老数组决定新容量,老数组空则INITIAL_QUEUE_CAPACITY否则国倍. int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) //新大小大于最大数组大小则拒绝. throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; //直接将原来的数组引用替换成新的. ForkJoinTask>[] a = array = new ForkJoinTask>[size]; //如果是初次分配,就此打住返回a,是扩容,且老数组非空则进入下面的循环拷贝. if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { //根据前面的运算,size一定是2的幂,减一用来哈希,这是经典处理办法. int mask = size - 1; do { ForkJoinTask> x; //老数组base自增过若干次的得到b,它代表的元素对应的索引. int oldj = ((b & oldMask) << ASHIFT) + ABASE; //用b在新数组中找出索引. int j = ((b & mask) << ASHIFT) + ABASE; //老数组中用索引取出元素. x = (ForkJoinTask>)U.getObjectVolatile(oldA, oldj); if (x != null && //老数组置空,放入新数组. U.compareAndSwapObject(oldA, oldj, x, null)) U.putObjectVolatile(a, j, x); //每处理完一个task,就将base自增1,直到top为止. } while (++b != t); } //返回新数组. return a; } //存在下一个任务,弹出,顺序是后进先出.此方法仅限非共享队列的owner调用. final ForkJoinTask> pop() { ForkJoinTask>[] a; ForkJoinTask> t; int m; //还有元素. if ((a = array) != null && (m = a.length - 1) >= 0) { //1.top至少比base大一.注意,每次循环都会读出新的top,它是volatile修饰的. for (int s; (s = top - 1) - base >= 0;) { //top对应的索引. long j = ((m & s) << ASHIFT) + ABASE; //2.该索引没有元素,break,返回null.而且就代表这个位置的确是null,与竞态无关. //因为此方法仅owner线程使用,不会出现另一个线程计算了同样的j,且先执行了3的情况. //出现这种情况,则是此位置的任务当先被执行并出栈,或者就从未设置过任务,后续分析这种极端情况. //故如果出现某个任务在数组的中间,提前被执行并置空(非pop或poll方式),那么再对WorkQueue进行pop时将会中断, //留下一部分null之后的任务不能出栈,所以可以允许任务非pop或poll方式查出并执行,但为了能pop出所有任务,不能中间置null. if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) break; //3.有元素,将该索引位置置null.若cas失败,说明元素被取出了, //但下次循环即使在2处break并返回null,也不是因为竞态,因为每次循环到1都会读取新的top, //也就有新的j. if (U.compareAndSwapObject(a, j, t, null)) { //数组位置置null的同时top减1. U.putOrderedInt(this, QTOP, s); return t; } } } //循环退出,说明top位置没有元素,也相当于说明数组为空.显然此方法的另一个作用是将队列压缩,空队列会将top先降到base+1,再循环最后一次将top降到base. return null; } //如果b是base,使用FIFO的次序尝试无竞态取底部的任务.它会在ForkJoinPool的scan和helpStealer中使用. final ForkJoinTask> pollAt(int b) { ForkJoinTask> t; ForkJoinTask>[] a; if ((a = array) != null) { //和前面一样的的方式计算b对应的索引j int j = (((a.length - 1) & b) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObjectVolatile(a, j)) != null && //j对应位置有task且当前base==b,尝试将task出队. base == b && U.compareAndSwapObject(a, j, t, null)) { //出队成功base增1.不需要额外的同步,因为两个线程不可能同时在上面的cas成功. //当一切条件匹配(b就是base且j位置有元素),pollAt同一个b只会有一个返回非空的t. //如果多个线程传入的b不相等,在同一时刻只有一个会等于base. base = b + 1; return t; } } return null; } //用FIFO的次序取下一个任务. final ForkJoinTask> poll() { ForkJoinTask>[] a; int b; ForkJoinTask> t; //1.循环从base取任务,当base增长到top或其他操作重置array为null则终止循环. while ((b = base) - top < 0 && (a = array) != null) { //前面已叙述过取索引的逻辑,使用一个top到base间的数与数组长度-1与运算并左移索引长度位再加上数组基准偏移量.后面不再缀述. int j = (((a.length - 1) & b) << ASHIFT) + ABASE; //取出task t = (ForkJoinTask>)U.getObjectVolatile(a, j); //2.如果发生竞态,base已经不是b,直接开启下一轮循环把新的base读给b. if (base == b) { if (t != null) { //3.当前t是base任务,用cas置空,base+1,返回t. //如果此处发生竞态,则只有一个线程可以成功返回t并重置base(4). //不成功的线程会开启下一轮循环,此时成功线程可能未来的及执行4更新base, //也可能已经更新base,则导致先前失败的线程在2处通过,经5种或判队列空返回,或非空再次循环,而 //在当前成功线程执行4成功后,所有前面失败的线程可以在1处读到新的base,这些线程 //在下一次循环中依旧只会有一个成功弹出t并重置base,直到所有线程执行完毕. if (U.compareAndSwapObject(a, j, t, null)) { //4重置加返回 base = b + 1; return t; } } //5.t取出的是空,发现此时临时变量b(其他成功线程在此轮循环前置的base)已增至top-1,且当前线程又没能成功的弹出t,说明一定会有一个线程 //将t弹出并更新base到top的值,当前线程没必要再开下一个循环了,直接break并返回null. //t取出的是空,但是没到top,说明只是被提前执行并置空了,那么继续读取新的base并循环,且若没有其他线程去更改base,array的长度,或者把top降到 //base,则当前线程就永远死循环下去了,因为每次循环都是125且每个变量都不变.因此为避免循环,每个任务可以提前执行,但一定不能提前离队(置null). //也就是说:只能用poll或pop方式弹出任务,其他方式获得任务并执行是允许的,但不能在执行后置null,留待后续源码验证一下. else if (b + 1 == top) // now empty break; } } //从循环退出来有两种情况,可能是在5处满足退出条件,或者在2处发现b已经是脏数据,下轮循环不满足循环条件所致.两种都应该返回null. return null; } //根据mode来取下一个本队列元素.根据模式. final ForkJoinTask> nextLocalTask() { //当前WorkQueue的配置了FIFO,则poll,否则pop. //尽管还未看到注册worker的源码,在此提前透露下,ForkJoinPool也有一个config(前面讲构造函数提过) //该config保存了mode信息,并原样赋给了WorkQueue的mode.注意,相应的任务会出队. return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } //根据模式取出下一个任务,但是不出队. final ForkJoinTask> peek() { ForkJoinTask>[] a = array; int m; //空队,返回null. if (a == null || (m = a.length - 1) < 0) return null; //根据mode定位要取的索引j. int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) << ASHIFT) + ABASE; //返回读出的值,不出队. return (ForkJoinTask>)U.getObjectVolatile(a, j); } //如果参数t是当前队的top,则弹出. final boolean tryUnpush(ForkJoinTask> t) { ForkJoinTask>[] a; int s; if ((a = array) != null && (s = top) != base && //1.满足非空条件.尝试用t去当当作计算出的索引位置的原任务的值并cas为null来出队. U.compareAndSwapObject (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) { //cas成功,说明t确实是top,将top减一返回true. U.putOrderedInt(this, QTOP, s); return true; } //2.cas失败或不满足1的条件,返回false. return false; } //移除并取消队列中所有已知的任务,忽略异常. final void cancelAll() { ForkJoinTask> t; if ((t = currentJoin) != null) { //有currentJoin,引用置空,取消并忽略异常. currentJoin = null; ForkJoinTask.cancelIgnoringExceptions(t); } if ((t = currentSteal) != null) { //有currentSteal,引用置空,取消并忽略异常. currentSteal = null; ForkJoinTask.cancelIgnoringExceptions(t); } //除了上面两个,就只剩下数组中的任务了.按LILO的顺序弹出并依次取消,忽略所有异常. while ((t = poll()) != null) ForkJoinTask.cancelIgnoringExceptions(t); } // 以下是执行方法. //按FIFO顺序从队首弹出任务并执行所有非空任务. final void pollAndExecAll() { for (ForkJoinTask> t; (t = poll()) != null;) //很明显,如果未按严格顺序执行,先执行中间的一个任务, //再调用本方法,则会半路中止. t.doExec(); } //移除并执行完所有本队列的任务,如果是先进先出,则执行前面的pollAndExecAll方法. //否则pop循环执行到空为止.按前面的分析,只要坚持只能pop或poll弹出,其他方式执行任务但不能置空的原则, //可以保证pop或poll出现空的情况只能是竞态发生的情况. final void execLocalTasks() { int b = base, m, s; ForkJoinTask>[] a = array; //初始满足条件,top至少比base大1.队列非空. if (b - (s = top - 1) <= 0 && a != null && (m = a.length - 1) >= 0) { //不是FIFO模式. if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask> t;;) { //原子getAndSet,查出并弹出原本的task if ((t = (ForkJoinTask>)U.getAndSetObject (a, ((m & s) << ASHIFT) + ABASE, null)) == null) //弹出的task是空,break.说明整个工作流程中,如果未保证严格有序, //如先从中间的某个任务开始执行并且出队了,再调用execLocalTasks,会导致中间停顿. //只执行不出队,则至少不会中断.出现t是null的情况只能是竞态或末尾. break; //top减一,执行任务. U.putOrderedInt(this, QTOP, s); t.doExec(); //如果base大于等于top,则中止. if (base - (s = top - 1) > 0) break; } } //是FIFO模式,pollAndExecAll. else pollAndExecAll(); } } //重点入口方法来了,前面留下诸多关于执行任务是否出队的讨论,下面来分析入口方法. //该方法的入口是每个工作线程的run方法,因此只有一个线程. final void runTask(ForkJoinTask> task) { //传入task是空直接不理会. if (task != null) { //标记成忙.scanState是WorkQueue的成员变量,每个WorkQueue只有一个值, //前面说过,一般情况下,每个线程会有一个WorkQueue,所以某种情况来讲也可以标记为 //当前ForkJoinWorkerThread繁忙. //SCANNING常量值是1,这个操作实质上就是将scanState变量的个位置0,也就是变成了偶数并标记它要忙了. //显然偶数才表示忙碌,这也是为什么前面觉得官方注释scanState是奇数表示"正在扫描"很奇怪. scanState &= ~SCANNING; //将currentSteal设置为传入的任务,并运行该任务,若该任务内部进行了分叉,则进入相应的入队逻辑. (currentSteal = task).doExec(); //执行完该任务后,将currentSteal置空.将该task释放掉,帮助gc. U.putOrderedObject(this, QCURRENTSTEAL, null); //调用前面提到的,根据mode选择依次pop或poll的方式将自己的工作队列内的任务出队并执行的方法. execLocalTasks(); //到此,自己队列中的所有任务都已经完成.包含偷来的任务fork后又入队到自己队列的子任务. //取出owner线程.处理偷取任务有关的一些信息. ForkJoinWorkerThread thread = owner; if (++nsteals < 0) //发现当前WorkQueue偷来的任务数即将溢出了,将它转到线程池. transferStealCount(pool); //取消忙碌标记. scanState |= SCANNING; if (thread != null) //执行afterTopLevelExec勾子方法,上一节中介绍ForkJoinWorkerThread时已介绍. thread.afterTopLevelExec(); } //方法结束,注意,没有任何操作将task从所在的数组中移除,不论这个task是哪个WorkQueue中的元素. //同时,此方法原则上讲可以多次调用(尽管事实上就一次调用),入口处和出口处分别用忙碌标记来标记scanState,但重复标记显然不影响执行. } //如果线程池中已经初始化了用于记录的stealCounter,则用它加上当前WorkQueue的nsteals/或最大整数(发生溢出时). //并初始化当前WorkQueue的nsteals. final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { //线程池中存放了stealCounter,它是一个原子整数. int s = nsteals; nsteals = 0; //恢复0. //若nsteals是负,增加最大整数,否则增加nsteal sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } //如果task存在,则将它从队列中移除并执行,发现有位于顶部的取消任务,则移除之,只用于awaitJoin. //如果队列空并且任务不知道完成了,则返回true. final boolean tryRemoveAndExec(ForkJoinTask> task) { ForkJoinTask>[] a; int m, s, b, n; //进入if的条件,存在非空任务数组,参数task非空. if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { //循环条件,队列非空.从s开始遍历到b,也就是从顶到底.后进先出. while ((n = (s = top) - (b = base)) > 0) { //1.内层循环. for (ForkJoinTask> t;;) { //2.从顶开始的索引j,每次向下找一个. long j = ((--s & m) << ASHIFT) + ABASE; if ((t = (ForkJoinTask>)U.getObject(a, j)) == null) //3.取出的是空,返回值取决于top是不是内层循环是第一次运行,外循环每次会将s更新为新top, //内循环则会每次将s减一.内循环只跑了一次的情况,显然会返回true. //显然这种情况下top也没有被其他线程更新,内循环又是第一次跑,那么将足以说明当前队列为空,该为false. //true的情况,向下遍历了几个元素打到了底,未进入46 10这三种要重开启一轮外循环的情况,也没找到task. //不管怎样,发现空任务就返回. return s + 1 == top;// 比预期短,第一个或第n个出现了空值,但循环条件未false else if (t == task) { //找到的任务t不是空,且是目标任务. boolean removed = false; if (s + 1 == top) { //4.发现是首轮内循环,s+1==top成立,进行pop操作,将task弹出并将top减一. //显然,task是最顶任务,可以用pop方式,将它置空. if (U.compareAndSwapObject(a, j, task, null)) { U.putOrderedInt(this, QTOP, s); //5.置removed为true. removed = true; } } //6.不是首轮循环,而且base没有在处理期间发生改变. else if (base == b) //7.尝试将task替换成一个EmptyTask实例.成功则removed是true, //这样虽然该任务出了队,但在队上还有一个空的任务,而不会出现前面担心的中间null //的情况,也不改变top或base的值. removed = U.compareAndSwapObject( a, j, task, new EmptyTask()); if (removed) //8.只要任务成功出队(不论是4还是7,则执行. task.doExec(); //9.只要找到任务,退出内循环,回到外循环重置相应的条件. break; } //10.本轮内循环没找到匹配task的任务. else if (t.status < 0 && s + 1 == top) {//官方注释是取消. //11.若t是完成的任务且是首轮内循环且top未变动,将该任务出队并令top减一. if (U.compareAndSwapObject(a, j, t, null)) U.putOrderedInt(this, QTOP, s); //12.只要进入此分支就退出内循环. break; } if (--n == 0) //13.内循环每执行到此一次,就说明有一次没找到目标任务,减少n(开始时的base top差值).达0时返回false停止循环. //即每个内循环都只能执行n次,进入外循环时重置n. return false; } //14.结束了任何一轮内循环时,发现目标task已经完成,则停止外循环返回false. if (task.status < 0) return false; } } //15.task参数传空,或者当前WorkQueue没有任务,直接返回true. return true; } //简单梳理一下tryRemoveAndExec的执行流程和生命周期. //a.显然,一上来就判队列的空和参数的空,如果第一个if都进不去,按约定返回true. //b.经过1初始化一个内层循环,并初始化了n,它决定内循环最多跑n次,如果内循环一直不break(9找到任务或12发现顶部任务是完成态),也假定一般碰不到14(发现目标任务完成了) //也没有出现几种return(3查出null,14某轮内循环目标task发现被完成了),那么最终只会耗尽次数,遍历到底,在13处return false(确定此轮循环task不在队列) //c.如果出现了几种break(9,12),9其实代表查到任务,12代表顶部任务已完成(官方说取消),那就会停止内循环,重新开启一轮外循环,初始化n,继续从新的top到base遍历(b). //但此时,可能找不到task了(它已经在上一轮内循环出队或被替换成代理),但也可能实际上未出队(该task不是top,即4,base也发生了改变造成7未执行),那么可能在本轮循环 //找到任务,在b中进入相应的break,并且成功移除并会进入d,也可能没进入break而是再重复一次b. //d.如果某一次break成功删除了任务,那么外循环更新了n,base,top,重启了一次内循环,但是所有找到task的分支不会再有了,如果接下来不再碰到被完成(取消)的顶部任务11-12, //同样也没发现目标task完成了(不进14),那么最终的结果就是n次内循环后n降低到0,直接return false. //e.从b-d任何一次内循环在最后发现了task结束,立即返回false.否则,它可能在某一次内循环中弹出并执行了该任务,却可能一直在等待它完成,因此这个机制可以让等待task完成前, //帮助当前WorkQueue清理顶部无效任务等操作. //此方法适用于不论共享或者独有的模式,只在helpComplete时使用. //它会弹出和task相同的CountedCompleter,在前一节讲解CountedCompleter时已介绍过此方法. //父Completer仅能在栈链上找到它的父和祖先completer并帮助减挂起任务数或完成root,但在此处 //它可以帮助栈链上的前置(子任务),前提是要popCC弹出. final CountedCompleter> popCC(CountedCompleter> task, int mode) { int s; ForkJoinTask>[] a; Object o; //当前队列有元素. if (base - (s = top) < 0 && (a = array) != null) { //老逻辑从顶部确定j. long j = (((a.length - 1) & (s - 1)) << ASHIFT) + ABASE; if ((o = U.getObjectVolatile(a, j)) != null && (o instanceof CountedCompleter)) { //当前队列中存在类型为CountedCompleter的元素.对该completer栈链开启一个循环. CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //对该CountedCompleter及它的completer栈元素进行遍历,每一个遍历到的临时存放r. //找到r==task,说明有一个completer位于task的执行路径. if (r == task) { //mode小于0,这个mode其实有误解性,它的调用者其实是将一个WorkQueue的config传给了这个mode. //而config只有两处初始化,一是将线程注册到池的时候,初始化WorkQueue, //二是外部提交的任务,使用externalSubmit时新建的WorkQueue,config会是负值且没有owner. //它也说明是共享队列,需要有锁定机制.. if (mode < 0) { //另一个字段qlock派上了用场,将它置为1表示加锁. if (U.compareAndSwapInt(this, QLOCK, 0, 1)) { //加锁成功,在top和array这过程中未发生变动的情况下,尝试 //将t出队,此时t是栈顶上的元素,它的completer栈链前方有task. if (top == s && array == a && U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); U.putOrderedInt(this, QLOCK, 0); return t; } //不论出队成功还是失败,解锁. U.compareAndSwapInt(this, QLOCK, 1, 0); } } //非共享队列,直接将t出列. else if (U.compareAndSwapObject(a, j, t, null)) { U.putOrderedInt(this, QTOP, s - 1); return t; } //只要找到,哪怕两处cas出现不成功的情况,也是竞态失败,break终止循环. break; } //r不等于task,找出r的父并开始下轮循环,直到root或找到task为止. else if ((r = r.completer) == null) // try parent break; } } } //空队列,顶部不是Completer或者不是task的子任务,返回null. return null; } //尝试在无竞态下偷取此WorkQueue中与给定task处于同一个completer栈链上的任务并运行它, //若不成功,返回一个校验合/控制信号给调用它的helpComplete方法. //返回规则,成功偷取则返回1;返回2代表可重试(被其他小偷击败),如果队列非空但未找到匹配task,返回-1, //其他情况返回一个强制负的基准索引. final int pollAndExecCC(CountedCompleter> task) { int b, h; ForkJoinTask>[] a; Object o; if ((b = base) - top >= 0 || (a = array) == null) //空队列,与最小整数(负值)取或作为信号h h = b | Integer.MIN_VALUE; // to sense movement on re-poll else { //从底部取索引j long j = (((a.length - 1) & b) << ASHIFT) + ABASE; //用该索引取task取出null,说明被捷足先登了,信号置为可重试. if ((o = U.getObjectVolatile(a, j)) == null) h = 2; // retryable //取出的非空任务类型不是CountedCompleter.说明不匹配,信号-1 else if (!(o instanceof CountedCompleter)) h = -1; // unmatchable else { //是CountedCompleter类型 CountedCompleter> t = (CountedCompleter>)o; for (CountedCompleter> r = t;;) { //基本同上个方法的逻辑,只是上个方法t取的是top,这里取base. //r从t开始找它的父,直到它本身或它的父等于task.将它从底端出队. if (r == task) { if (base == b && U.compareAndSwapObject(a, j, t, null)) { //出队成功,因为我们找的是base,且竞态成功,直接更新base即可. base = b + 1; //出队后执行该出队的任务.返回1代表成功. t.doExec(); h = 1; // success } //base被其他线程修改了,或者cas竞态失败.(其实是一个情况),信号2,可以从新的base开始重试. else h = 2; // lost CAS //只要找到task的子任务就break,返回竞态成功或可重试的信号. break; } //迭代函数,当前r不是task,将r指向它的父,直到某一个r的父是task或者是null进入else if. else if ((r = r.completer) == null) { //能够进来,说明r已经指向了root,却没有找到整条链上有这个task,返回信号为未匹配到. h = -1; // unmatched break; } } } } return h; } //如果当前线程拥有此队列且明显未被锁定,返回true. final boolean isApparentlyUnblocked() { Thread wt; Thread.State s; //前面提过的scanState会在一上来runTask时和1的反码取与运算,直到运行完任务才会反向运算. //这个过程,scanState的最后一位会置0,但这与此判断条件关系不大. //前面对scanState有所注释,小于0代表不活跃. return (scanState >= 0 && //队列处于活跃态且当前线程的状态不是阻塞,不是等待,不是定时等待,则返回true. (wt = owner) != null && (s = wt.getState()) != Thread.State.BLOCKED && s != Thread.State.WAITING && s != Thread.State.TIMED_WAITING); } }
到此终于WorkQueue内部类的代码告一段落.
这一段介绍了WorkQueue的内部实现机制,以及与上一节有关的提到的CountedCompleter在帮助子任务时处于WorkQueue的实现细节(似乎默认情况下即asnycMode传true时只会从当前工作线程队列取顶部元素,从其他随机队列的底部开取,有可能可以重复取,具体细节到ForkJoinPool的helpComplete相关源码再说),以及构建好的WorkQueue会有哪些可能的状态和相应的字段,以及若干模式(同步异步或者LIFO,FIFO等),出队入队的操作,还提出了队列中元素为什么中间不能为空,如果出现要将中间元素出队怎么办?别忘了答案是换成一个EmptyTask.
不妨小结一下WorkQueue的大致结构.
1.它规避了伪共享.
2.它用scanState表示运行状态,版本号,小于0代表不活跃维护了忙碌标记,也用scanState在runTask入口开始运行任务时标记为忙碌(偶数),结束后再取消忙碌状态(奇数).注释解释奇数代表正在扫描,但从代码语义上看正好相反
3.它维护了一个可以扩容的数组,也维护了足够大的top和base,[base,top)或许可以形象地表示它的集合,pop是从top-1开始,poll从base开始,当任务压入队成功后,检查若top-base达到了数组长度,也就是集合[base,top)的元素数达到或者超过了队列数组长度,将对数组进行扩容,因使用数组长度-1与哈希值的方式,扩容前后原数组元素索引不变,新压入队列的元素将在此基础上无缝添加,因此扩容也规避了出现中间任务null的情况.初始容量在runWorker时分配.
4.它维护了偷取任务的记录和个数,并在溢出等情况及时累加给池.它也维护了阻塞者线程和主人线程.
5.它可能没有主人线程(共享队列),或有主人线程(非共享,注册入池时生成)
6.它维护了队列锁qlock,但目前仅在popCC且当前为共享队列情况下使用,保证争抢的同步.
7.其他一些字段如config,currentJoin,hint,parker等,需要在后续的线程池自身代码中结合前面的源码继续了解,包含stackPred据说保持前一个池栈的运行信号.
WorkQueue本质也是一个内部类,它虽然定义了一系列实现,但这些实现方法的调度还是由ForkJoinPool来实现,所以我们还是要回归到ForkJoinPool自身的方法和公有api上,遇到使用上面WorkQueue定义好的工具方法时,我们再来回顾.
前面已经看了一些影响WorkQueue的位于ForkJoinPool的常量,再来继续看其他的ForkJoinPool中的一些常量.
//默认线程工厂.前面提过两个实现 public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory; //是否允许启动者在方法中杀死线程的许可,我们忽略这方面的内容. private static final RuntimePermission modifyThreadPermission; //静态的common池 static final ForkJoinPool common; common池的并行度. static final int commonParallelism; //tryComensate方法中对构造备用线程的创造. private static int commonMaxSpares; //池顺序号,创建工作线程会拼接在名称上. private static int poolNumberSequence; //同步方法同,递增的池id. private static final synchronized int nextPoolId() { return ++poolNumberSequence; } // 以下为一些静态配置常量. //IDLE_TIMEOUT代表了一个初始的纳秒单位的超时时间,默认为2s,它用于线程触发静止停顿以等待新的任务. //一旦超过了这个 时长,线程将会尝试收缩worker数量.为了避免某些如长gc等停顿的影响,这个值应该足够大 private static final long IDLE_TIMEOUT = 2000L * 1000L * 1000L; // 2sec //为应对定时器下冲设置的空闲超时容忍度. private static final long TIMEOUT_SLOP = 20L * 1000L * 1000L; // 20ms //它是commonMaxSpares静态初始化时的初值,这个值远超普通的需要,但距离 //MAX_CAP和一般的操作系统线程限制要差很远,这也使得jvm能够在资源耗尽前 //捕获资源的滥用. private static final int DEFAULT_COMMON_MAX_SPARES = 256; //在block之前自旋等待的次数,它在awaitRunStateLock方法和awaitWork方法中使用, //但它事实上是0,因此这两个方法其实在用随机的自旋次数,设置为0也减少了cpu的使用. //如果将它的值改为大于0的值,那么必须设置为2的幂,至少4.这个值设置达到2048已经可以 //耗费一般上下文切换时间的一小部分. private static final int SPINS = 0; //种子生成器的默认增量.注册新worker时详述. private static final int SEED_INCREMENT = 0x9e3779b9;
上面都是一些常量的声明定义,下面看一些与线程池config和ctl有关的常量,以及前面构造器提过的变量.
// 高低位 private static final long SP_MASK = 0xffffffffL;//long型低32位. private static final long UC_MASK = ~SP_MASK;//long型高32位. // 活跃数. private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位开始. private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48代表一个活跃数单位. private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64) // 总数量 private static final int TC_SHIFT = 32;//移位偏移量,33位开始. private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32代表一个总数量 private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位 private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位 //与运行状态有关的位,显然后面的runState是个int型,这些移位数也明显是int的范围. //SHUTDOWN显然一定是负值,其他值也都是2的幂. private static final int RSLOCK = 1;//run state锁,简单来说就是奇偶位. private static final int RSIGNAL = 1 << 1;//2 运行状态的唤醒. private static final int STARTED = 1 << 2;//4,启动 private static final int STOP = 1 << 29;//30位,代表停. private static final int TERMINATED = 1 << 30;//31位代表终止. private static final int SHUTDOWN = 1 << 31;//32位代表关闭. //实例字段. volatile long ctl; // 代表池的主要控制信号,long型 volatile int runState; // 可以锁的运行状态 final int config; // 同时保存了并行度和模式(开篇的构造函数) int indexSeed; // 索引种子,生成worker的索引 volatile WorkQueue[] workQueues; // 工作队列的注册数组. final ForkJoinWorkerThreadFactory factory;//线程工厂 final UncaughtExceptionHandler ueh; // 每一个worker线程的未捕获异常处理器. final String workerNamePrefix; // 工作线程名称前缀. volatile AtomicLong stealCounter; // 代表偷取任务数量,前面提过,官方注释说也用作同步监视器
仅仅看这些字段的简单描述是无法彻底搞清楚它们的含义的,还是要到应用的代码来看,我们继续向下看ForkJoinPool中的一些方法.
//尝试对当前的runState加锁标志位,并返回一个runState,这个runState可能是原值(无竞态)或新值(竞态且成功). //不太准确的语言可以说是"锁住"runState这个字段,其实不是,从代码上下文看, //该标志位被设置为1的期间,尝试去lock的线程可以去更改runState的其他位,比如信号位. //而lockRunState成功的线程则是紧接着去更改ctl控制信号,工作队列等运行时数据,故可以称runState在锁标志这一块 //可以理解为运行状态锁. private int lockRunState() { int rs; //runState已经是奇数,表示已经锁上了,awaitRunState return ((((rs = runState) & RSLOCK) != 0 || //发现原本没锁,尝试将原rs置为rs+1,即变为奇数. !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ? //原来锁了或者尝试竞态加锁不成功,等待加锁成功,否则直接返回rs. awaitRunStateLock() : rs); } //自旋或阻塞等待runstate锁可用,这与上面的runState字段有关.也是上一个方法的自旋+阻塞实现. private int awaitRunStateLock() { Object lock; boolean wasInterrupted = false; for (int spins = SPINS, r = 0, rs, ns;;) { //每轮循环重读rs. if (((rs = runState) & RSLOCK) == 0) { //1.发现rs还是偶数,尝试将它置为奇数.(锁) if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) { //2,锁成功后发现扰动了,则扰动当前线程,catch住不符合安全策略的情况. if (wasInterrupted) { try { //2.1扰动.它将影响到后面awaitWork方法的使用. Thread.currentThread().interrupt(); } catch (SecurityException ignore) { } } //2.2返回的是新的runStatus,相当于原+1,是个奇数. //注意,此方法中只有此处一个出口,也就是说必须要锁到结果. return ns; } } //在1中发现被锁了或者2处争锁竞态失败. else if (r == 0) //3.所有循环中只会执行一次,如果简单去看,nextSecondarySeed是一个生成 //伪随机数的代码,它不会返回0值.r的初值是0. r = ThreadLocalRandom.nextSecondarySeed(); else if (spins > 0) { //4.有自旋次数,则将r的值进行一些转换并开启下轮循环.默认spins是0,不会有自旋次数. //从源码来看,自旋的唯一作用就是改变r的值,使之可能重新进入3,也会根据r的结果决定是否减 //少一次自旋. //r的算法,将当前r的后6位保留,用r的后26位与前26位异或被保存为r的前26位(a). //再将(a)的结果处理,r的前21位保持不变,后11位与前11位异或并保存为r的后11位(b). //再将(b)的结果处理,r的后7位保持不变,用前25位与后25位异或并保存为r的前25位(c) //个中数学原理,有兴趣的研究一下吧. //显然,自旋次数并不是循环次数,它只能决定进入6中锁代码块前要运行至少几轮循环. r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift if (r >= 0) //经过上面的折腾r还不小于0的,减少一个自旋次数. //自旋次数不是每次循环都减一,但减到0之后不代表方法停止循环,而是进入2(成功)或者6(阻塞). --spins; } //某一次循环,r不为0,不能进入3,自旋次数也不剩余,不能进入4.则到此. else if ((rs & STARTED) == 0 || (lock = stealCounter) == null) //5.线程池的runState表示还未开启,或者还未初始化偷锁(stealCounter),说明 //还没完成初始化,此处是初始化时的竞态,直接让出当前线程的执行权.等到重新获取执行权时, //重新循环,读取新的runState并进行. Thread.yield(); // initialization race else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {//可重入 //6.没能对runState加锁,也不是5中的初始化时竞态的情况,尝试加上信号位,以stealCounter进行加锁. //显然,这种加信号位的加法不会因为信号位而失败,而会因为runState的其他字段比如锁标识位失败,这时 //重新开始循环即可. synchronized (lock) { //明显的double check if ((runState & RSIGNAL) != 0) { //6.1当前pool的runState有信号位的值,说明没有线程去释放信号位. try { //6.2runState期间没有被去除信号位,等待. lock.wait(); } catch (InterruptedException ie) { //6.3等待过程中发生异常,且不是记录一个标记,在2处会因它中断当前线程. if (!(Thread.currentThread() instanceof ForkJoinWorkerThread)) wasInterrupted = true; } } else //6.4当前runState没有信号位的值,说明被释放了,顺便唤醒等待同步块的线程.让他们继续转圈. lock.notifyAll(); } } } } //解锁runState,前面解释过,这个锁可以理解为对runState的锁标志位进行设定,而设定成功的结果就是可以改信号量ctl. //它会解锁runState,并会用新的runState替换. private void unlockRunState(int oldRunState, int newRunState) { //首先尝试cas.cas成功可能会导致上一个方法中进入同步块的线程改走6.4唤醒阻塞线程. if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) { //cas不成功,直接强制更改. Object lock = stealCounter; runState = newRunState;// 这一步可能清除掉信号位.使上一个方法中已进入同步块的线程改走6.4 if (lock != null) //强制更换为新的运行状态后,唤醒所有等待lock的线程. synchronized (lock) { lock.notifyAll(); } } }
上面的几个方法是对runState字段进行操作的,并利用了信号位,锁标识位,运行状态位.
显然,虽然可以不精确地说加锁解锁是对runState的锁标识位进行设置,严格来说,这却是为ctl/工作队列等运行时数据服务的(后面再述),显然精确说是对运行时数据的修改权限加锁.
同样的,加锁过程采用自旋+阻塞的方式,整个循环中同时兼容了线程池还在初始化(处理方式让出执行权),设定了自旋次数(处理方式,随机数判断要不要减少自旋次数,自旋次数降0前不会阻塞)这两种情况,也顺便在阻塞被扰动的情况下暂时忽略扰动,只在成功设置锁标识位后顺手负责扰动当前线程.
简单剥离这三种情况,加锁过程是一轮轮的循环,会尝试设置锁标识位,成功则返回新标识,不成功则去设置信号位(可能已经有其他线程设
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/75118.html
摘要:前言在前面的文章框架之中梳理了框架的简要运行格架和异常处理流程显然要理解框架的调度包含工作窃取等思想需要去中了解而对于的拓展和使用则需要了解它的一些子类前文中偶尔会提到的一个子类直译为计数的完成器前文也说过的并行流其实就是基于了框架实现因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的简要运行格架和异常处理流程,显然要理解ForkJoi...
摘要:这减轻了手动重复执行相同基准测试的痛苦,并简化了获取结果的流程。处理项目的代码并从标有注释的方法处生成基准测试程序。用和运行该基准测试得到以下结果。同时,和的基线测试结果也有略微的不同。 Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处java.util.concurrent 改动,但...
摘要:前言在前面的文章和响应式编程中提到了和后者毫无疑问是一个线程池前者则是一个类似经典定义的概念官方有一个非常无语的解释就是运行在的一个任务抽象就是运行的线程池框架包含和若干的子类它的核心在于分治和工作窍取最大程度利用线程池中的工作线程避免忙的 前言 在前面的文章CompletableFuture和响应式编程中提到了ForkJoinTask和ForkJoinPool,后者毫无疑问是一个线程...
摘要:使用方式要把任务提交到线程池,必须创建的一个子类,其中是并行化任务产生的结果如果没有结果使用类型。对一个子任务调用的话,可以使一个子任务重用当前线程,避免线程池中多分配一个任务带来的开销。 【概念 分支和并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体的结果,它是ExecutorService的一个实现,它把子任务分配给线程池(Fork...
摘要:对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。是叉子分叉的意思,即将大任务分解成并行的小任务,是连接结合的意思,即将所有并行的小任务的执行结果汇总起来。使用方法会阻塞并等待子任务执行完并得到其结果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小...
阅读 2557·2021-10-25 09:45
阅读 1220·2021-10-14 09:43
阅读 2275·2021-09-22 15:23
阅读 1502·2021-09-22 14:58
阅读 1919·2019-08-30 15:54
阅读 3526·2019-08-30 13:00
阅读 1324·2019-08-29 18:44
阅读 1554·2019-08-29 16:59