资讯专栏INFORMATION COLUMN

JUC 之Phaser

shaonbean / 3242人阅读

摘要:前言在前面的几篇文章中详述了框架的若干组分在相应的官方文档中总会不时地提起同样的也提到可以用于帮助运行在中的运行时保持有效的执行并行度其实特指其他都在等待一个的前进时熟悉的朋友都知道它的大概组成部分包含支持并发的容器同步器线程池阻塞队列原子

前言

在前面的几篇文章中详述了ForkJoin框架的若干组分,在相应的官方文档中总会不时地提起"Phaser",同样的,也提到Phaser可以用于帮助运行在ForkJoinPool中的ForkJoinTask运行时保持有效的执行并行度(其实特指其他task都在等待一个phase的前进时).

熟悉JUC的朋友都知道它的大概组成部分包含:Containers(支持并发的容器),Synchronizers(同步器),Executors(线程池),BlockingQueue(阻塞队列),Atomic(原子类),Lock and Condition(锁).而Phaser和CyclicBarrier,Semaphore等一样是一个同步器.

本文主要介绍Phaser的内部实现,粗略介绍使用,它的源码相比于线程池较为简单,但最好能对比其他同步器来了解,读者最好拥有juc其他同步器,原子类,部分ForkJoin框架的基础.

同时,本文也会再次提到ForkJoinPool::managedBlock(blocker),之前在ForkJoinPool一文提到了实现和接口,而在CompletableFuture中见到了一个blocker的实现.

Phaser源码

首先来看一些与Phaser状态有关的简单的常量.

//64位整数表示Phaser的状态.
private volatile long state;

private static final int  MAX_PARTIES     = 0xffff;//最大parties,后16位表示.
private static final int  MAX_PHASE       = Integer.MAX_VALUE;//最大phase,最大整数值.
private static final int  PARTIES_SHIFT   = 16;//取parties使用的移位数,16
private static final int  PHASE_SHIFT     = 32;//取phase的移位数,32
private static final int  UNARRIVED_MASK  = 0xffff;      //未到的,取后16位.
private static final long PARTIES_MASK    = 0xffff0000L; //参加者,17-32位.
private static final long COUNTS_MASK     = 0xffffffffL; //数量,后32位.
private static final long TERMINATION_BIT = 1L << 63;//终止态,首位.

// 特殊值.
private static final int  ONE_ARRIVAL     = 1;
private static final int  ONE_PARTY       = 1 << PARTIES_SHIFT;
private static final int  ONE_DEREGISTER  = ONE_ARRIVAL|ONE_PARTY;//第1位和17位.显然,它表示了一个ONE_ARRIVAL信息和PARTY信息.
private static final int  EMPTY           = 1;

//对一个state s计算unarrived的count,
private static int unarrivedOf(long s) {
    //直接取整数位,如果等于EMPTY(1)则返回0,否则取后16位.
    int counts = (int)s;
    return (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
}

//对一个state,取出parties信息,直接取state的17至32位.
private static int partiesOf(long s) {
    return (int)s >>> PARTIES_SHIFT;
}
//对于一个state,取出phase信息,直接取前32位.
private static int phaseOf(long s) {
    return (int)(s >>> PHASE_SHIFT);
}
//对于一个state,取出arrived信息
private static int arrivedOf(long s) {
    int counts = (int)s;
    //state的后32位等于1(EMPTY)返回0,否则返回parties(state的17至32位,参考上面的partiesOf方法)和UNARRIVED(state的后16位)的差.
    return (counts == EMPTY) ? 0 :
        (counts >>> PARTIES_SHIFT) - (counts & UNARRIVED_MASK);
}

上面都是一些常量,没什么可分析的,简单来个总结.

Phaser用一个long型的state保存状态信息.

state的前32位表示phase,后16位表示unarrivied,17至32位表示parties,parties减去unarrived即arrived.

下面我们看一些成员变量和有关函数.

//this的父,可以是null表示none
private final Phaser parent;

//phaser显然是个树的结果,root代表根,如果当前phaser不在树内,则root==this
private final Phaser root;


//偶数队列和奇数队列.它们存放等待线程栈的头,为了减少当添加线程与释放线程的竞态,
//这里使用了两个队列并互相切换,子phaser共享root的队列以加快释放.
private final AtomicReference evenQ;
private final AtomicReference oddQ;

//决定某个phase的等待线程队列.
private AtomicReference queueFor(int phase) {
    //选择队列的方法,如果参数phase是偶数,使用evenQ,否则oddQ.
    return ((phase & 1) == 0) ? evenQ : oddQ;
}

//出现arrive事件时的边界异常信息.
private String badArrive(long s) {
    return "Attempted arrival of unregistered party for " +
        stateToString(s);
}

//注册时的边界异常信息.
private String badRegister(long s) {
    return "Attempt to register more than " +
        MAX_PARTIES + " parties for " + stateToString(s);
}
//他们都用到的stateToString(s),计算参数s对应的phase,parties,arrived.
private String stateToString(long s) {
    return super.toString() +
        "[phase = " + phaseOf(s) +
        " parties = " + partiesOf(s) +
        " arrived = " + arrivedOf(s) + "]";
}

为了便于理解,先来看队列的实现.

//表示等待队列的QNode,实现了ManagedBlocker
static final class QNode implements ForkJoinPool.ManagedBlocker {
    //存放所属phaser
    final Phaser phaser;
    //所属phase
    final int phase;
    //是否可扰动
    final boolean interruptible;
    //是否定时
    final boolean timed;
    //是否已扰动
    boolean wasInterrupted;
    //计时相关
    long nanos;
    final long deadline;
    //关联线程,当是null时,取消等待.
    volatile Thread thread; 
    //下一个QNode
    QNode next;

    QNode(Phaser phaser, int phase, boolean interruptible,
          boolean timed, long nanos) {
        this.phaser = phaser;
        this.phase = phase;
        this.interruptible = interruptible;
        this.nanos = nanos;
        this.timed = timed;
        this.deadline = timed ? System.nanoTime() + nanos : 0L;
        //取当前线程.
        thread = Thread.currentThread();
    }
    //isReleasable方法
    public boolean isReleasable() {
        if (thread == null)
            //1.线程已置空(如2),返回true释放.
            return true;
        if (phaser.getPhase() != phase) {
            //2.发现phaser所处的phase不是构建QNode时的phase了,就置线程为空,返回true.
            thread = null;
            return true;
        }
        if (Thread.interrupted())
            //3.如果当前线程扰动了.
            wasInterrupted = true;
        if (wasInterrupted && interruptible) {
            //4.发现扰动标记,并且QNode配置为可扰动,则置线程null并返回true
            thread = null;
            return true;
        }
        if (timed) {
            //5.定时逻辑,还有nanos,计算新的时长.
            if (nanos > 0L) {
                nanos = deadline - System.nanoTime();
            }
            if (nanos <= 0L) {
                //已经到时间,返回true,线程置空.
                thread = null;
                return true;
            }
        }
        return false;
    }
    //block逻辑
    public boolean block() {
        if (isReleasable())
            return true;
        else if (!timed)
            //不定时的park
            LockSupport.park(this);
        else if (nanos > 0L)
            //定时的情况.
            LockSupport.parkNanos(this, nanos);
        //老规矩
        return isReleasable();
    }
}

前面介绍过CompletableFuture的Singnaller,以及ForkJoinPool中的managedBlock,这一块的逻辑显然驾轻就熟.

很明显,如果我们在ForkJoinPool中使用它作为blocker,并在相应的ForkJoinTask的exec或CountedCompleter的compute方法中使用ForkJoinPool::managedBlock(blocker),将每个ForkJoinWorkerThread在阻塞前构建一个QNode进入Phaser的等待队列(虽然还没有讲到相关内容,但是Phaser显然不用我们直接操作内部类QNode),那么它将依照上述逻辑进行补偿,保障有效的并行度.

前面完成了承前启后,预热到此结束,开始看Phaser的核心方法.

//doArrive方法
//它是arrive和arriveAndDeregister方法的主要实现.手动调用这些方法可以加速通过和最小化竞态窗口期.
//参数代表要从当前state中减去的调整数值,它的单位依托于业务,当为arrive时减去的单位为ONE_ARRIVAL,
//当为arriveAndDeregister时减去的单位为ONE_DEREGISTER.
private int doArrive(int adjust) {
    final Phaser root = this.root;
    for (;;) {
        //1.变量s初始化,取决于是否当前Phaser是root.不是root将试图从root同步滞后的state.
        long s = (root == this) ? state : reconcileState();
        //计算phase,前32位.
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //2.负数直接返回.说明原来的state首位就是1,前面的TERMINATE_BIT就是64位置1.
            return phase;
        //取count,后32位.
        int counts = (int)s;
        //计算unarrived,和前面一样的逻辑.
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)//2.1
            //没有unarrived了,说明不应该调用此方法,抛出异常,信息就是前面介绍过的badArrive
            throw new IllegalStateException(badArrive(s));
        //3.尝试将state减去adjust数.
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s, s-=adjust)) {
            //3.1cas成功后,unarrived余1,则前进一个phase
            if (unarrived == 1) {
                //3.1.1取出parties作为下一个state的基础.
                long n = s & PARTIES_MASK;
                //3.1.2 下一个unarrived,数值上等于parties.
                int nextUnarrived = (int)n >>> PARTIES_SHIFT;
                if (root == this) {
                    //3.1.3当前Phaser是root,onAdvance返回true,则加上终止信号.
                    if (onAdvance(phase, nextUnarrived))
                        n |= TERMINATION_BIT;
                    else if (nextUnarrived == 0)
                        //3.1.4 onAdvance返回false,而计算得出的nextUnarrived是0,即没有parties,n加上一个empty(1)
                        n |= EMPTY;
                    else
                        //3.1.5nextUnArrived不是0,加到n上.
                        n |= nextUnarrived;
                    //3.1.6前面的流程完成了state的后32位(parties和unarrived),接下来处理前32位.
                    //限定在MAX_PHASE之内,对当前phase加1.
                    int nextPhase = (phase + 1) & MAX_PHASE;
                    //将nextPhase的值加到n的前32位.并用n去cas掉原来的state,因为有3处入口的cas,此处一定能成功
                    n |= (long)nextPhase << PHASE_SHIFT;
                    UNSAFE.compareAndSwapLong(this, stateOffset, s, n);
                    //更新到新的phase,唤醒等待的waiter.
                    releaseWaiters(phase);
                }
                //3.1.7当前Phaser不是root,当nextUnarrived计算得0时,像父传递解除注册,参数ONE_DEREGISTER
                //会同时减去一个unarrived和一个parties.下轮循环正常应进入3.1.8
                else if (nextUnarrived == 0) { 
                    phase = parent.doArrive(ONE_DEREGISTER);
                    //完成传递后,将自己的state置empty.
                    UNSAFE.compareAndSwapLong(this, stateOffset,
                                              s, s | EMPTY);
                }
                else
                    //3.1.8,当前Phaser不是root,计算的nextUnarrived非0,像父传递一个arrive事件,减去一个unarrived.
                    phase = parent.doArrive(ONE_ARRIVAL);
            }
            //3.2返回当前phase,可能是已进入3.1递增的.仅有此处可退出循环.
            return phase;
        }
    }
}

关于该方法的执行流程,我们结合几个周边方法一并分析,先来看注册方法和onAdvance勾子.

//注册和批量注册.参数代表parties和unarrived字段的增加数,它必须大于0.
private int doRegister(int registrations) {
    // 1.用参数计算一个adjust,同时包含parties和arrive.
    long adjust = ((long)registrations << PARTIES_SHIFT) | registrations;
    final Phaser parent = this.parent;
    int phase;
    //循环尝试更改.
    for (;;) {
        //2.存在parent,则用root的phase调整this的state.
        long s = (parent == null) ? state : reconcileState();
        //取出当前state中保存的counts,parties,unarrived信息.
        int counts = (int)s;
        int parties = counts >>> PARTIES_SHIFT;
        int unarrived = counts & UNARRIVED_MASK;
        if (registrations > MAX_PARTIES - parties)
            //要注册的数量大于了余量,抛出异常.
            throw new IllegalStateException(badRegister(s));
        //3.计算出phase
        phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //phase为负说明state为负,即终止态,终止.
            break;
        //4.当前state表示的参与数非空的逻辑,当前注册非首次注册.
        if (counts != EMPTY) {           
            if (parent == null || reconcileState() == s) {
                //this是root或者从root同步的state不变,继续执行,否则重新循环.
                if (unarrived == 0)  
                    //4.1本轮循环通过原state计算的unarrived为0,说明应等待下一phase,使用root等待      
                    root.internalAwaitAdvance(phase, null);
                else if (UNSAFE.compareAndSwapLong(this, stateOffset,
                                                   s, s + adjust))
                    //4.2本轮循环未发现应等待下一phase,尝试原子更新,增加adjust到state上.
                    break;
            }
        }
        //5.当前不存在counts,且自身就是root,代表root的首次注册.
        else if (parent == null) { 
            //5.1计算下一个state,因为没有参与数,使用phase初始化前32位,并使用adjust做后32位.        
            long next = ((long)phase << PHASE_SHIFT) | adjust;
            if (UNSAFE.compareAndSwapLong(this, stateOffset, s, next))
                //5.2 cas成功,退出,不成功,下轮循环.
                break;
        }
        //6.是首次注册,但也不是root的逻辑.代表非root的Phaser的首次注册.
        else {
            //6.1对当前Phaser加锁并double check,避免同时调用.加锁失败的线程将在后续进入2的逻辑.
            synchronized (this) {  
                //double check state未发生改变.   
                if (state == s) {
                    //6.2首先向父Phaser注册1.
                    phase = parent.doRegister(1);
                    if (phase < 0)
                        //发现进入终止态,直接停止.
                        break;
                    
                    //6.3向父Phaser注册成功,循环尝试cas掉老的state,新state的算法同上,phase加adjust.
                    //在整个while循环中,不再考虑phase进入终止态的情况,因为这些操作处于同一个"事务"中,
                    //且因竞态等原因,若某次cas时计入了负数的phase,方法返回后也可以及时发现.
                    while (!UNSAFE.compareAndSwapLong
                           (this, stateOffset, s,
                            ((long)phase << PHASE_SHIFT) | adjust)) {
                        //如果cas不成功,则读取s为新的state,计算新的phase并重新循环.
                        s = state;
                        phase = (int)(root.state >>> PHASE_SHIFT);
                        // assert (int)s == EMPTY;
                    }
                    //6.4cas成功后退出循环.
                    break;
                }
                //如果if(state==s)判断失败,说明有别的线程有当前线程进入synchronized块前已经加锁并执行了内部的逻辑且稍后释放了锁,
                //这样当前线程加锁成功,但if判断失败,它会立即释放锁并返回到2.
            }
        }

    }
    return phase;
}


//使用root的phase调整this的state,更新滞后的结果.这一般发生在root前进了phase但是
//子phaser还没有做到这一步,这种情况下,子phaser必须完成这个前进的步骤,这一过程中,phase将
//被置为root的phase,unarrived则会重置为parties,若parties为0,则置为EMPTY.返回结果state.
private long reconcileState() {
    final Phaser root = this.root;
    long s = state;
    //不是root才进行下一步.
    if (root != this) {
        int phase, p;
        //cas,phase采用root,parties不变,unarrived重置为parties或EMPTY.
        while ((phase = (int)(root.state >>> PHASE_SHIFT)) !=
               (int)(s >>> PHASE_SHIFT) &&
                //phase滞后于root
                //尝试cas.
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                //确定新state的前32位,使用root的phase.
                s = (((long)phase << PHASE_SHIFT) |
                    //新phase<0,后32位直接取this的state表示的counts.
                     ((phase < 0) ? (s & COUNTS_MASK) :
                    //phase有效,this的state表示的parties为0,则后32位使用empty
                      (((p = (int)s >>> PARTIES_SHIFT) == 0) ? EMPTY :
                        //否则,后32位使用parties.
                       ((s & PARTIES_MASK) | p))))))
            s = state;
    }
    return s;
}


//onAdvance勾子方法,参数为当前phase和注册的parties数.
//默认实现为parties数为0,方法返回true时,调用者会尝试终止Phaser.(参考前面的doArrive).随后调用isTerminated方法将返回true.
//执行此方法时抛出的运行时异常或Error将直接上抛给尝试advance相应的phase的线程,这种情况下不会发生phase的advance.
//方法的入参表示的是Phaser当前的state(未advance前),因此若在onAdvance方法中执行arrive,regist,waiting这三种操作的行为是不确定的也不可靠的.
//如果当前Phaser是一个级联的成员,那么onAdvance只会由root在每次advance时调用.
//方法的默认实现返回true的场景目前只能是经过数次arriveAndDeregister调用造成parties归零的结果.我们继承Phaser可以轻易地重写此行为,
//比如简单粗暴地返回false,那么将永远允许新的注册.
protected boolean onAdvance(int phase, int registeredParties) {
    return registeredParties == 0;
}

经过前面的代码分析,已经对Phaser的核心函数doRegister,doArrive有了全面的了解.

两者都会在一开始同步root的phase,且如果出现落后root的情况,同步了新的phase的同时,也会重新初始化unarrived,并且使用parties的值.

doArrive方法会每次调整unarrived数量(也可包含parties数量,如果使用了解除注册),当Phaser调用自身的arrive/arriveAndDeregister时,会做出相应的减少,并根据是否为root而决定向上递归.

Phaser减少自身unarrived信号(也可能同时有parties信号)后,若发现这已经是最后一个unarrived信号,则进行接下来的判断:

1.当前Phaser是root,advance并唤醒waiter.(重要的唤醒操作执行点,root一轮完成)

2.当前Phaser不是root,且它已经不具备继续下一轮的条件(计算nextUnarrived为0,即parties已经被arriveAndDeregister置0),则从父Phaser减少一个unarrived和parties.

3.当前Phaser不是root,但它仍具有parties,满足进行下一轮的条件(计算nextUnarrived不是0),则从父Phaser减少一个unarrived,但不减少parties.

显然,子Phaser的最后一个unarrived的消失一定会造成父的unarrived减少,子Phaser不能继续下一phase的register和arrive时,从父Phaser中卸载.

若不是本Phaser的最后一个unarrived信号,则直接结束,相当于只进行了上面的减少信号操作.

doRegister方法的逻辑大致相反,不同于doArrive,它的参数registrations同时作用于parties和unarrived,即两个位上同时加上registrations参数.它的大致逻辑:

1.当前注册并非首次注册,且出现unarrived==0,即本轮已经完成了arrive,那么本轮将不能注册,需要等待root更新到下轮.(这也是我们碰到的第一个阻塞)

2.当前注册并非首次注册,unarrived也不是0,则在本phase进行注册,增加相应的parties和unarrived.

3.当前注册是root的首次注册,给root的state加上相应的parties和unarrived.

4.当前注册是非root的首次注册,加锁(this),对自己的state加上相应的parties和unarrived(同上,以registrations为单位),而对parent加上一个parties和unarrived单位.

很明显,对于单Phaser的情况非常好理解,每次减少unarrived数量(先不考虑减少parties),则最终导致Phaser自身进入下一个phase,然后重新初始化unarrived到下一轮,unarrived的新值是前一轮剩下的parties数量.

当我们同时也尝试减少parties数量,即解除parties的注册,最终导致没有parties,那么Phaser将进入终止态.

整个过程中,只要Phaser没进入终止态,随时可以进行新的注册,并增加parties和unarrived的数量.每个arrive可以减少unarrived的数量为任何正整数,不一定是1.

对于多Phaser的情况,有两个特殊点:

1.对任意Phaser树中的某一个Phaser调用注册操作,会令自身加上相应参数个parties和unarrived单位,仅会在该Phaser第一次注册时增加父Phaser(极端可能,仅从一个叶子节点第一个注册的情况下可一直递归到root)的parties数和unarrived数各1单位(不论参数是多少).

2.对任意Phaser树中的某一个Phaser调用arrive操作,会令自身减去相应的参数个parties和unarrived单位,同时仅当本Phaser此时是最后一个unarrived时,会减去父Phaser的一个unarrived单位(当前子Phaser仍旧有parties可以构建下一phase),或减去父Phaser一个Parties和unarrived单位.(极端情况下,每一级都是最后一个unarrived时,减少叶子节点的最后一个unarrived会递归到root).

每新增一个子Phaser,父Phaser就会增加一个要完成触发phase的advance前必须要等到arrive的单位;每一个子Phaser中所有的arrive完成,父Phaser都将减少一个要等待advance所必需触发的arrive.

目前没有看到await方法,但可以提前说明,等待操作完全依赖于root是否完成本轮.也就是所有子Phaser都完成了同一轮(arrive打满),才能让父Phaser本身减去一个所有arrive单位,再触发父Phaser本轮的完成,此时对任何已完成的Phaser进入注册,都会进入上述的root.internalAwaitAdvance(phase, null)方法等待root进入下一phase.如果对已经完成所有arrive的Phaser继续进行arrive操作,因为unarrived已经是0,则会抛出异常.

所以对于使用子Phaser的场景,如果发生很巧妙的情况,Phaser树上当前子Phaser的arrive结束条件满足了,使得新来的注册只能等待下一轮次,而其他分支的子Phaser又偏偏不能完成本轮次,那么新的phaser.doRegister方法将阻塞在此.

好在我们使用Phaser可能会类似CyclicBarrier的使用方式,可对每一轮(phase)进行注册并等待(也许只等一轮,那么arrive就要带上deregister),每一轮最后一个线程arrive了,就会停止所有线程的等待,让所有线程继续执行,同时开启了下一轮次,这些线程此时又可以不经注册直接在新的轮次中进行等待,直到最后一个arrive了,再次唤醒所有线程并继续执行,同时Phaser再前进一轮,如此往复.中间使用arrive并deregister的线程会从本轮起减少一个unarrive数量(因为parties也减少了,所以再下一轮初始化unarrive数量时也会减少一次).我们可以让这些线程参与任意的轮次,但要注意的是,如果有线程中途不参加了,一定要解除注册,否则因为每轮初始化时,要等待arrive的数量都是上一轮剩下的parties数量,有线程停止了执行,却不减少parties数,那么下轮所有等待的线程将永远等不到phaser满足唤醒的条件.

上述的过程中可以明显的看出,目前已介绍的两个重要核心函数:注册和arrive并没有直接记录和操作线程的操作,相应的操作在等待方法和唤醒方法中(前面提到过release),我们稍后介绍.

现在假设一个特殊的使用场景,也可以区别于CyclicBarrie和CountDownLatch的使用.还是上面的例子,但是我们准备的线程数与Phaser的parties数/unarrived数不同(一般前者要多些),会发生什么事?

首先创建了Phaser,不指定最初parties数,并用每个线程去注册(我甚至可以用一个线程去重复注册,每次的参数registrations还可以不同,注册的作用并不是将当前线程压入队列,而是为本phase设置一个unarrive数量,以控制到达下个phase前必须有多少次arrive的发生),则parties数和unarrived的初值完全与此有关,是一个依托于我们随意注册而产生的随意值.那么假定我们的线程数量大于这个parties数(假定调用注册方法的线程和arrive及等待的线程无关),并令有的线程执行arrive(完全可以一次arrive减去多个信号量,甚至一个线程多次arrive),有的线程执行await等待信号advance到下一个phase(一个线程在一个周期只能调用一次),有的线程执行了arrive也等待phase前进(这种情况一个线程一周期也只能一次.其实这些分别对应了还未介绍的arrive,waitAdvance,arriveAndWaitAdvance等方法),多带带进行await操作的线程可以是任意数量,执行arrive方法的线程加上执行arrive并wait的操作的线程和必须超过unarrived,这才能唤醒等待线程.

目前这些还比较抽象,等到我们看过相应的几个方法便了然了.

onAdvance的方法默认实现就是判断本阶段注册的parties数量,如果已经是0则说明没有parties了,Phaser应该结束.但是我们其实可以重新实现,比如参数中同时传入了当前的phase,我可以规定上面的例子中phase最多只有3轮次,那么不论什么时候arrive,发现了当前phase已进入3轮,Phaser就被终止.当然,这一过程是由root执行的,但是子Phaser的phase会在每次注册和arrive发生时同步root,因此本例中对于phase数的判断可以粗放到所有Phaser,对于parties数则只能作用于root(事实上调用onAdvance的一定是root).

接下来看全量构造方法和若干和上面有关的公有函数.

//初始化一个Phaser,指定parent,指定未到来的参与者数(unarrived parties),但这只是一个初值,
//当我们在任何时候调用注册方法时,还会相应的增加.
public Phaser(Phaser parent, int parties) {
    if (parties >>> PARTIES_SHIFT != 0)
        //太大了,超过了后16位能表示的整数.
        throw new IllegalArgumentException("Illegal number of parties");
    //初始phase为0.
    int phase = 0;
    this.parent = parent;
    if (parent != null) {
        //1.有parent的情况,共享parent的root,队列,并向parent中注册1个parties和unarrived,
        //同时同步一次phase(表面上是同步了parent的,实际上前面已经看过,会同步root).
        final Phaser root = parent.root;
        this.root = root;
        this.evenQ = root.evenQ;
        this.oddQ = root.oddQ;
        if (parties != 0)
            phase = parent.doRegister(1);
    }
    else {
        //2.无parent的情况,root就是this,并初始化奇偶等待队列.它使用原子引用的形式存放一个QNode,而QNode我们前面已介绍.
        this.root = this;
        this.evenQ = new AtomicReference();
        this.oddQ = new AtomicReference();
    }
    //统一初始化state,后32位的决定依托于parties,如果parties是0则给予EMPTY,直接不管高32位.
    //不为0则给予phase设置为前32位,parties设置parties位和unarrived位.
    this.state = (parties == 0) ? (long)EMPTY :
        ((long)phase << PHASE_SHIFT) |
        ((long)parties << PARTIES_SHIFT) |
        ((long)parties);
}


//注册方法,就是调用doRegister,参数1.
//它会向this添加一个unarrived的party,如果正巧root正在进行advance,它需要等待下个phase.
//如果this有parent,且它之前没有过注册的parties,则首次注册会触发自身向parent的注册.
//如果this已经终止了,那么尝试注册将会无效并返回负值.如果注册的数量大于了最大支持parties(后16位整数),
//会抛出IllegalStateException
public int register() {
    return doRegister(1);
}


//批量注册指定的信号量,并返回最新的phase.规则基本同上.
public int bulkRegister(int parties) {
    if (parties < 0)
        throw new IllegalArgumentException();
    if (parties == 0)
        //参数0直接查询最新的phase返回
        return getPhase();
    return doRegister(parties);
}


//arrive一个信号,不等待其他arrive事件,返回最新phase(终止态为负).
//当前Phaser的arrive事件已满,则对parent来说也会触发一个arrive.(如果有parent)
public int arrive() {
    return doArrive(ONE_ARRIVAL);
}


//arrive并解除一个注册parties,也不阻塞等待其他arrive.如果当前Phaser的解除注册操作
//将parties减至0,且this有parent,这将导致parent也减少一个parties(本phaser解除在parent的注册).
public int arriveAndDeregister() {
    return doArrive(ONE_DEREGISTER);
}

接下来要看上面已经做足了铺垫的等待方法了,并结合前面的队列一块看.

//令当前线程"到达"此phaser并等待其他parties,它等效于awaitAdvance(arrive()).
//注意,按照道格的注释,如果你在一个未进行注册(调用register)的线程里调用此方法其实是一个使用错误,
//但是从本方法和前面以及后面有关的方法来看,所有记录线程的方法均只与arrive和等待有关,与注册无关.
//因此Phaser本身无法规避这种使用错误,我们完全可以使用另一个线程去注册,而当前线程去arrive,将两个动作分开.
//方法会返回arrive时最新的phase号.终止时会是负值.
public int arriveAndAwaitAdvance() {
    //记录root,开始循环.
    final Phaser root = this.root;
    for (;;) {
        //1.预计算,首先同步state
        long s = (root == this) ? state : reconcileState();
        //计算phase
        int phase = (int)(s >>> PHASE_SHIFT);
        if (phase < 0)
            //已终结直接返回最终phase.
            return phase;
        //计算counts,unarrived
        int counts = (int)s;
        int unarrived = (counts == EMPTY) ? 0 : (counts & UNARRIVED_MASK);
        if (unarrived <= 0)
            //已经没有空余的unarrived信号了,不能再调用arrive,抛出异常.
            throw new IllegalStateException(badArrive(s));
        //2.减余arrive的有关逻辑.尝试cas减去一个arrive
        if (UNSAFE.compareAndSwapLong(this, stateOffset, s,
                                      s -= ONE_ARRIVAL)) {
            if (unarrived > 1)
                //2.1当前要减的信号不是本Phaser的最后一个信号量,调用root的等待方法.参数2是node,传空.
                return root.internalAwaitAdvance(phase, null);
            if (root != this)
                //2.2当前要减的信号量是非root的Phaser的最后一个,递归给parent(虽然用了return,但是parent也可能在进入2.1后阻塞).
                return parent.arriveAndAwaitAdvance();
            //2.3当前要减的信号量是root的最后一个.
            //2.3.1准备计算下一个状态,先取出state的parties信息.
            long n = s & PARTIES_MASK; 
            //计算nextUnarrived,它是现在的parties.
            int nextUnarrived = (int)n >>> PARTIES_SHIFT;
            //2.3.2前进phase逻辑.
            if (onAdvance(phase, nextUnarrived))
                //需要终止,给新state的计算基石n加上终止标记.
                n |= TERMINATION_BIT;
            else if (nextUnarrived == 0)
                //计算的nextUnarrived是0,即没有parties,加上空标记位.
                n |= EMPTY;
            else
                //下一轮能正常进行,加上nextUnarrived位.
                n |= nextUnarrived;
            //2.3.3给n加上下一个phase.
            int nextPhase = (phase + 1) & MAX_PHASE;
            n |= (long)nextPhase << PHASE_SHIFT;
            if (!UNSAFE.compareAndSwapLong(this, stateOffset, s, n))
                //用n进行cas不成功,将新的phase返回.
                //说明一下,因为方法执行到此前已经执行过2的入口cas,减去了最后一个unarrived,因此在2到此的过程中若有新的注册,
                //它内部会读到0个unarrived,就会等待下一个phase(参考前面介绍过的注册方法),因此cas失败不会是因为2之后有新的注册.
                //在整个arrive系列的方法中,最后一次arrive发生后,本Phaser不可能有其他线程再去执行类似2处的减余的情况.
                //故出现这种情况的原因目前来看有二,一是还未介绍的强制关闭Phaser的方法,此时也会突兀地改掉state造成cas恰巧失败,二是
                //出现一些用户做出的奇葩行为,比如重写了其他公有方法.我们自然忽略第二种情况,doug大神也是简单注释了一个"terminated".
                return (int)(state >>> PHASE_SHIFT); // terminated
            //cas成功,释放等待队列中的线程,返回下一个phase(因为在此过程中的register会等到advance,此时的phase已经是nextPhase了).
            releaseWaiters(phase);
            return nextPhase;
        }
        //3.减余失败说明出现竞态,直接开启下轮循环重新减余.
    }
}


//等待当前Phaser从给定的phase前进结束,如果当前phase不等于给定的phase,或者Phaser已终止立即返回.
//1.传入phase为负,返回它本身.
//2.传入的phase不是最新的phase,返回最新的.
//3.传入了最新的phase,等待到advance并返回advance后的phase.
public int awaitAdvance(int phase) {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase)
        //匹配成功,等root前进.参数node为null
        return root.internalAwaitAdvance(phase, null);
    return p;
}


//参考前面的几个方法,区别是可扰动.
public int awaitAdvanceInterruptibly(int phase)
    throws InterruptedException {
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        //1.参数phase小于0直接返回它本身.
        return phase;
    if (p == phase) {
        //2.参数phase匹配,回忆一个前面介绍的QNode,匹配当前Phaser和phase,配置为可扰动且不计时.
        QNode node = new QNode(this, phase, true, false, 0L);
        //3.放入root的等待队列阻塞.
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //4.等待结束,判断是否是扰动造成的结束,前面介绍过QNode的相关逻辑,
            //它实现了ForkJoinPool.ManagedBlocker,因此在managedBlock方法进行时,
            //会循环调用问询是否能release,当我们配置了可扰动且扰动了,就会标记这个wasInterrupted,释放线程引用并返回.
            //发现此种情况抛出异常.
            //同时,当发现等待成功,也会结束,释放线程引用并返回,但不带有扰动标记.
            throw new InterruptedException();
    }
    //5.返回1处之前读取的phase或3处得到的最新phase值.
    return p;
}

//同上方法,但带有计时.
public int awaitAdvanceInterruptibly(int phase,
                                     long timeout, TimeUnit unit)
    throws InterruptedException, TimeoutException {
    long nanos = unit.toNanos(timeout);
    final Phaser root = this.root;
    long s = (root == this) ? state : reconcileState();
    int p = (int)(s >>> PHASE_SHIFT);
    if (phase < 0)
        return phase;
    if (p == phase) {
        //不同于上面方法的地方,建立的QNode带有计时和等待时长.
        QNode node = new QNode(this, phase, true, true, nanos);
        p = root.internalAwaitAdvance(phase, node);
        if (node.wasInterrupted)
            //被扰动的情况.
            throw new InterruptedException();
        else if (p == phase)
            //时间到了phase没有前进,超时.
            throw new TimeoutException();
    }
    return p;
}

前面的几个核心方法粗略过完,补充一些重要内容.

首先在前面曾分析过有线程阻塞等待下一个phase的情况,并没有加上定时等待的考虑.在超时的情况下,阻塞的线程可能会收到异常并退出.

建立QNode可以限定是否定时和可扰动,这取决于我们使用哪个方法去await.

除最后一个线程arrive外,所有线程调用这些方法都会减少一个arrive并加入等待队列,直到(1)配置了定时且超时,(2)当前是可扰动等待且使用了Thread.interrupt(),(3)最后一个线程使用上述方法或arrive方法,使得Phaser前进了一个轮次,internalWaitAdvance结束.其中(1)(2)均会迁成arrive线程抛出异常,只有(3)才是正常的情况.

QNode前面已介绍,它是一个blocker,需要调用ForkJoinPool::managedBlock才会起作用(显然root的internalAwaitAdvance必然与此方法有关联).当然这个作用与任务是否运行在ForkJoinPool无关,如果等待phaser前进的线程是运行在ForkJoinPool中的ForkJoinWorkerThread,显然会在internalAwaitAdvance期间进行补偿.这一块可参考前面的"CompletableFuture与响应式编程"和"ForkJoin框架之ForkJoinPool"两篇文章.

另外,这些代码也再次说明了root的作用: (1)对一切非root的Phaser进行等待都会用root的internalAwaitAdvance;(2)每次注册或arrive一定会同步root的最新phase.

其中(1)也间接说明了为什么构建Phaser时只有root创建等待队列,所有子Phaser共享.

上面还保留了一个疑问,提到了"强制关闭Phaser"造成arriveAndAwaitAdvance出现cas失败的问题,doug大神直接注释了一个terminated,我们马上来看这一块,以及一些周边的公共函数,加深理解,然后再来解决关于等待队列最后的一些问题.

//强制关闭Phaser,让Phaser进入终止态,但是这个过程不影响它已注册的parties,如果此Phaser是
//一个Phaser树中的成员,那么所有phaser集中的Phaser都会关闭,如果它已经关闭,此方法无效.此方法可以
//用于若干任务出现意料之外异常的情况下的协调恢复.
public void forceTermination() {
    // Only need to change root state
    final Phaser root = this.root;
    long s;
    //已是终止态直接忽略.
    while ((s = root.state) >= 0) {
        //直接尝试给root的state加上终止位.显然加上了它,子Phaser在注册和arrive等方法同步回新的phase就是个负数,
        //因此更改root的phase为负相当于判了所有Phaser的死刑.唯一需要解决的是已经阻塞在root.internalAwaitAdvandce的线程.
        if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                      s, s | TERMINATION_BIT)) {
            // 加上终止位成功,先后唤醒偶数等待队列和奇数等待队列.
            releaseWaiters(0); // Waiters on evenQ
            releaseWaiters(1); // Waiters on oddQ
            //返回
            return;
        }
    }
}


//返回当前phase,直接用root的state去取.
public final int getPhase() {
    return (int)(root.state >>> PHASE_SHIFT);
}

//查询注册的parties数量.调用前面介绍过的partiesOf
public int getRegisteredParties() {
    return partiesOf(state);
}

//查询已经arrived的parties数量.调用介绍过的arriveOf

public int getArrivedParties() {
    return arrivedOf(reconcileState());
}

//查询未arrive的parties数量,调用前面介绍过的unarrivedOf
public int getUnarrivedParties() {
    return unarrivedOf(reconcileState());
}

//返回parent
public Phaser getParent() {
    return parent;
}

//返回root
public Phaser getRoot() {
    return root;
}

//判断当前Phaser是否终止,直接取root的state是否为负,可见,终止态完全取决于root.
public boolean isTerminated() {
    return root.state < 0L;
}

这些方法都比较简单,只有forceTermination需要再强调一翻,前面介绍arrayAndAwaitAdvance时曾提过在减去最后一个unarrived信号后去cas到下一个phase失败的情况,doug大神简单注释了一句terminated,直接返回了当前的phase(显然只能是负),在周边方法重重加锁的前提下,那一次cas的失败唯一一处就是强制关闭,因为它只改关闭标记位,相当于动了phase,而没有动unarrived标记位和parties标记位.所以重写Phaser的方法要谨慎,很可能不小心打破了这个封装.

从上面的有关方法可以看出,子Phaser的终止态严重依赖于root,目前可以确定的是root的phase一旦表现出终止态,所有新来的注册,arrive,arrive并await将会立即返回,唯一需要关注的就是root被设置了终止标记后,正陷入等待的线程怎么办的问题.

我们下面就来看Phaser的等待机制,这里面又能见到道格大神非常有趣的玩法.

//工具方法,移除某个phase的等待者.
private void releaseWaiters(int phase) {
    QNode q;  //保存队列中的队首
    Thread t;  // 保存线程引用.
    //取队列,用phase的奇偶决定,phase是偶数就取偶数队列,否则取奇数队列.而这个phase其实只用来取队列了,后续的操作与它无关.
    AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
    //循环,找出所有phase不等于root的phase的(其实root是最大的,所以就是找出非最新phase加入进来的waiter QNode)
    while ((q = head.get()) != null &&
           q.phase != (int)(root.state >>> PHASE_SHIFT)) {
        //找出了,利用原子引用将head指向next.
        if (head.compareAndSet(q, q.next) &&
            (t = q.thread) != null) {
            //发现阻塞者,唤醒线程.回忆下前面实现blocker方法中的isReleaseble和block方法都有将线程置空的操作.(三种情况,唤醒扰动超时都会置空)
            //但是那些方法并没有将代表该阻塞线程的QNode移除队列,因此可能会发现thread已经是null(代表无阻塞者)的情况,只需要移除队列即可.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}


//上面releaseWaiters方法的一个变种,但它只会处理遍历过程中位于头部的元素,出现正常的等待节点就会立即返回.
//此方法在这一块可以有效的减少内存的占用.退出时返回当前的phase.
private int abortWait(int phase) {
    //同样,参数phase只是用来选择要处理的队列.
    AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
    for (;;) {
        Thread t;
        QNode q = head.get();
        //计算最新phase的值p
        int p = (int)(root.state >>> PHASE_SHIFT);
        if (q == null || ((t = q.thread) != null && q.phase == p))
            //1.出现q为null代表整队列元素已出队,直接返回p;
            //或者在出队过程中head(q)记录的线程引用还在,说明未超时或扰动,且是本phase的等待节点,终止循环并返回最新phase.
            return p;
        if (head.compareAndSet(q, q.next) && t != null) {
            //进入条件,参考1的条件,因为1会直接返回.故进入2的条件其实是q非空且处于旧的phase.只有这种情况才可以出队.
            //2.将q出队,置空线程引用,释放线程.
            q.thread = null;
            LockSupport.unpark(t);
        }
    }
}

//计算有效cpu,控制自旋.
private static final int NCPU = Runtime.getRuntime().availableProcessors();

//常量,每轮arrive等待的字旋数,取决于NCPU,小于2则取1,不小于2取2的8次幂.
static final int SPINS_PER_ARRIVAL = (NCPU < 2) ? 1 : 1 << 8;


//珊珊来迟的内部等待方法.它可能会一直阻塞到phase的advance发生(除非取消了等待).
//此方法仅限root调用.参数phase表示当前的phase,参数node表示等待节点,用于追踪节点的扰动或超时.
//如果是null,表示是一次不可扰动的等待.返回值为当前最新的phase.
private int internalAwaitAdvance(int phase, QNode node) {
    // 1.调用releaseWaiters,传入参数phase的前一个phase,显然这只是决定释放哪一个队列.参数绝对实时准确的情况下会先将老的队列释放掉.
    releaseWaiters(phase-1); 
    //节点入队标记,入队了就会变为true
    boolean queued = false;   
    //记录每一轮循环的unarrived数量,用于决定是否扩增自旋等待次数.
    int lastUnarrived = 0; 
    //自旋数,参考上面的计算逻辑.
    int spins = SPINS_PER_ARRIVAL;
    long s;
    int p;
    //开启循环,直到phase前进为止或内部判断已取消等待.
    while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
        //2.传入node是null,即非可扰动的模式逻辑.只有非可扰动模式才有自旋.
        if (node == null) {  
            //2.1每轮自读进入都会尝试计算新的unarrived,如果发现出现了变动(变大或者变小),
            //会将它保存到前面的lastUnarrived.        
            int unarrived = (int)s & UNARRIVED_MASK;
            if (unarrived != lastUnarrived &&
                (lastUnarrived = unarrived) < NCPU)
                //发现新变化的unarrived head = (phase & 1) == 0 ? evenQ : oddQ;
            QNode q = node.next = head.get();
            //这一行不起眼的if条件代码真的是一个悄无声息解决了一个大暗坑的地方,后面说.
            if ((q == null || q.phase == phase) &&
                (int)(state >>> PHASE_SHIFT) == phase) 
                //double check避免脏入队,入队条件是(1)无头,(2)或者头元素的phase等于参数phase(因为相邻的两个phase绝对不会入同一个队).
                //满足(1)(2)的同时,还要满足(3),参数phase就是当前的state表示的phase(因为此方法只能root使用,故为root表示的最新phase).
                //条件满足,入队,取代原来的head,原来head代表的node成为node的next.而条件不满足进入下一循环,很可能while条件就不满足了退出循环.
                queued = head.compareAndSet(q, node);
        }
        //5.已经在某一轮循环入队了,使用ForkJoinPool的managedBlock管理block,其间可能会释放线程引用.
        else {
            try {
                //5.1它内部也有循环,且会调用前面看到过的isReleasable和block实现,显然它一旦结束(包含扰动),一定会造成下轮外循环终止于3处.
                ForkJoinPool.managedBlock(node);
            } catch (InterruptedException ie) {
                //5.2出现扰动异常catch住,并保存.下轮循环也会终止在3处.
                node.wasInterrupted = true;
            }
        }
    }
    //6.走出上面的while循环,可能是root已经advance到下一个phase(2前的循环),也可能是传入node的情况下出现了扰动或超时(5)造成(3)满足
    if (node != null) {
        //6.1node存在代表可能已经压入队列,结果要么是已出现扰动或超时(方法结束后会抛出异常),要么是已正常完成.
        //显然,代码执行到此处就要返回了,阻塞的线程会抛出异常结束(超时或扰动)或继续执行(正常advance),
        //没有必要去尝试唤醒能执行出前面while循环到达6马上要返回的线程.
        if (node.thread != null)
            //6.2取消node中的线程引用,避免外面的线程尝试唤醒.
            node.thread = null;       // avoid need for unpark()
        if (node.wasInterrupted && !node.interruptible)
            //6.3如果node本身设置了不可被扰动,但5.2处判断线程本身抛出了扰动异常,却被catch住了,此处扰动本线程.
            Thread.currentThread().interrupt();
        if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
            //6.4发现phase并未前进.还是参数传入的pahse,说明一定是扰动或超时的结果,abortWait对本phase使用的队列进行清理,
            //而清理的目标前面已论述过,是本队列头部开始的早于本phase的元素.(发现一个不满足条件的就停止了清理).
            return abortWait(phase); // possibly clean up on abort
    }
    //7.退出上面的while循环一定会到此帮助释放早于最新阶段的waiter.注意,是早于最新phase的,参数phase只是决定了选哪个队列(奇偶).
    //如果是6.4代表的那种扰动超时情况,此处其实释放的是旧的结果.被唤醒的线程其实一般是执行在5.1处阻塞的.当前线程能运行到此绝对不需要唤醒.
    releaseWaiters(phase);
    return p;
}

到此Phaser的代码解析已完毕,我们来分析关于队列,等待和唤醒的问题.

1.Phaser维护了两个"队列",不论加入等待队列还是弹出等待队列,都是从头部进行,新加入的成员会成功队列的新头,原来的头会成为它的next,弹出时next成为新头.所以相当于一个对头部的"后进先出",考虑官方起名和注释,我们依旧保持队列这个称呼.

2.唤醒时,会从队列的头部依次弹出node 的phase早于root的最新phase的node,.

3.等待时,入队的node成为新的头.

4.当轮次增加时,会使用和本轮不同的队列增加元素,同时也会唤醒本轮中等待的node.

因为唤醒和等待同时进行,且各自操作各自的队列(不同的phase),因此彼此之间没有竞态(尽管一个是头入一个是头出),可以说设计巧妙,下面我们来脑洞大开,思考一个极端情况.

我们假设一种极端的phase切换场景,奇数phase大量等待入队,偶数phase则迅速完成.假设当前phase对应的队列是奇数对列,轮次提升完成后,它去释放当前的队列元素,结果未等这个释放操作执行完毕,偶数队列的轮次很快执行完,奇数队列中积压了成千上万个node未能释放,轮次却又切回到了奇数队列,会出现什么事?

显然奇数队列如果一直保持这种极端场景,它会越来越庞大,逼近撑爆内存的同时,大量线程也会得不到释放,甚至于老一轮的线程需要等待新一轮的线程去释放.为什么老一轮的线程会去等待新一轮的线程释放呢?

releaseWaiter的方法我们已经看出,它只会释放phase早于最新的node,此时最新压入的元素属于当前最新的phase,显然不满足条件,那么会造成奇数队列中两轮前压入的元素不能得到清除,两轮前就在释放当时积压node的线程(那一轮最后一个arrive)发现不符合清理条件,就直接return并终止了,只能等待本轮最后一个arrive出现后继续进行释放.如果本轮最后一个arrive出现很晚,在下一轮依旧保持如此极端,往返数轮,确实会导致奇数队列中积压大量node,且第一轮就在等待该轮次结束的线程早就满足了释放条件(升到了2轮),事实上可能是第n轮才得到释放,这还符合Phaser的定义吗?我们使用它,就是要保证每一轮多带带使用,每一轮次达到条件,线程释放并执行,下一轮次是下一轮次.

然而doug的代码就是这个样子,想遍各种极端,觉得可能找到了bug,那么就需要仔细思考了.作者来简述一下这个趟坑的分析过程.

这个问题确实已经得到了极大的规避了,毕竟是个极端情况.

1.线程的唤醒真的很快,尽管此处除了唤醒还包含了原子引用的更新(每次出队都要cas).

2.如果没有注册,显然就没有arrive相关的情况,尽管可以多带带调用,但必须保证在arrive时传入的数量此时已经注册了,因此每一轮次(phase)中可能积压等待唤醒的线程的操作一定是在注册之后,但是我们回忆一下,注册方法的第一步就是要等待完成advance,而且传给internalAwaitAdvance的node会是null,即不能扰动和超时,所以当本轮次阻塞了一定数量的线程后,如果不去arrive,也不考虑超时和扰动的情况,那么线程将一直阻塞.我们不可能在轮次advance前进行注册,也就不可能在advance之前进行新一phase的arrive.

3.当本轮次的最后一个arrive线程触发了轮次的更新后,才可以开启注册以及新轮次的arrive,但是此时使用了另一个等待队列,而触发了轮次更新的上一轮的arrive线程将会立即进行前一个队列中积压的线程的唤醒操作.只有该唤醒操作足够慢,且新的轮次极快就完成了的情况,才可能造成在原arrive线程未能及时释放奇数队列的情况下,新一轮次再次向其中添加元素.

4.最重要的还在上面的internalAwaitAdvance方法,那一段被作者标上了入队条件的注释处,要想入队,必须if ((q == null || q.phase == phase) &&加上后面的条件,而这两个条件的限定已经很明显,要想入队,必须满足该等待队列没有元素或者队首是本轮的元素,而该方法又是下一轮首次注册时必须等待完成的,下一轮的arrive又必须发生在下一轮的首次注册之后,因此根本不会出现本轮wait的线程还要等下一轮甚至下N轮的线程去释放的极端情况,哪怕真的去做一个极端测试:让奇数轮大量积压线程,让偶数轮快速切换,然后测试第一轮压入的线程到底是不是本轮释放的.(作者差点就要写单元测试去做这个极端测试了!)

这一段不经意的if,一个小小的条件,如果不注意真的忽略了,小代码大功效,谁能想到,这么深的暗坑就这样被规避了.

总结

前面已经详述了Phaser的源码以及若干趟坑辛路.其实已经没什么好总结的了,就在此顺便对比常见同步器CyclicBarrier,CountDownLatch,Semaphore的特征和实现.

从使用特征上看:

1.CountDownLatch是一次性的,只能初始化决定parties数量,等待者可以是多个,每次释放都会减少一个信号量,直到归0时为止,最后一个释放者将唤醒其他等待的线程.它也不能继续使用.

2.CyclicBarrier是可重用的,分代的,每一代之间彼此独立,但是每一代的初始parties是相同的,不可在运行期内动态调整,每一代最后一个线程会去开启一下代,并可以在此时运行一个用户指定的action,与此同时唤醒其他线程继续执行.它可以在运行完一代后继续被使用.并且它还支持重置.

3.Semaphore是一个资源量的典型,如果说CountDownLatch和CyclicBarrier或者Phaser都是等到"人够了"再放行,Semaphore却是起到限流的作用,它控制了有限的令牌数,这个数量不可以动态地更改,在不能acquire到足够的令牌数时,线程将阻塞,直到其他线程释放了足量的令牌数并唤醒它为止.每一个持有了令牌的线程都可以唤醒阻塞等待获取的线程.

4.Phaser的功能上不同很明显,首先它的参与者数量几乎时刻可变(除了正在进入下一phase期间),随时可以增加减少parties数量,每一phase等待者可以是多个,每一phase中,每个能从internalAwaitAdvance方法中走出循环的线程都可以帮助唤醒,当然最终能进入唤醒操作还是要归功于最后一个arrive的线程(尽管它arrive后其他线程醒来后也会帮助唤醒).Phaser的唤醒者不一定是参与者.

从实现来看:

1.CountDownLatch借助了aqs来实现parties的释放,它使用cas+park的方式,不使用Lock.

2.CyclicBarrier需要借助重入锁和condition,每一个await的线程都要全局加锁,阻塞时await在condition上.

3.Semaphore在实现上类似CountDownLatch,也是基于aqs,只不过它允许获取和释放,对state有增有减,总量不变.也是cas+park的方式阻塞,也不使用Lock

4.Phaser因为功能的要求,不基于AQS(它不能有构建时就固定的state,尽管可以初始化一个state,但它必须支持改变),它依托于原子引用实现了一个内部的队列,相应的等待/入队/唤醒等操作通过cas自旋+park的方式,同样不使用Lock.并利用双队列的方式规避了前一轮的释放和后一轮的响醒的阻塞.

此外还有两点结合前面的推理和自测验证的结论:

1.Phaser中的每一个phase是保证了可见性的,经作者自测,在任何使用Phaser的代码中await前后,不会出现串phase读出的乱序情况(侧面说明每个phase不会依赖后一个或几个phase的释放).

2.Phaser需要对await的线程进行阻塞时,是将它打包成一个node(blocker),利用ForkJoinPool来block的.如果使用Phaser同步的任务是运行在ForkJoinPool中的,它将会利用到相应的补偿机制,经作者自测,这将保证Phaser中block的每一个任务必然得到执行,每一个阻塞的线程必然得到释放.

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75190.html

相关文章

  • Java多线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • (十一)java多线程Phaser

    摘要:本人邮箱欢迎转载转载请注明网址代码已经全部托管有需要的同学自行下载引言讲完了和今天讲一个跟这两个类有点类似的移相器中引入了一种新的可重复使用的同步屏障称为移相器拥有与和类似的功劳但是这个类提供了更加灵活的应用和都是只适用于固定数量的参与者 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github....

    eccozhou 评论0 收藏0
  • Java多线程进阶(二二)—— J.U.Csynchronizer框架:Phaser

    摘要:分层支持分层一种树形结构,通过构造函数可以指定当前待构造的对象的父结点。当一个的参与者数量变成时,如果有该有父结点,就会将它从父结点中溢移除。当首次将某个结点链接到树中时,会同时向该结点的父结点注册一个参与者。 showImg(https://segmentfault.com/img/remote/1460000016010947); 本文首发于一世流云专栏:https://segme...

    Mr_zhang 评论0 收藏0
  • Java多线程编程同步器

    摘要:倒计时锁,线程中调用使进程进入阻塞状态,当达成指定次数后通过继续执行每个线程中剩余的内容。实现分阶段的的功能测试代码拿客网站群三产创建于年月日。 同步器 为每种特定的同步问题提供了解决方案 Semaphore Semaphore【信号标;旗语】,通过计数器控制对共享资源的访问。 测试类: package concurrent; import concurrent.th...

    liangdas 评论0 收藏0
  • [phaser3学习]使用phaser3做一款飞刀小游戏

    摘要:前言作为一款流行的游戏动画框架受到很多开发者的青睐最近笔者在逛意大利开发者论坛的时候发现了这款小游戏所以就照着说明做了一下在这里记录下来开发准备插件脚本飞刀和靶子的图像或者这个项目里面有的脚本和需要的图像文件开始制作搭建基本的项目创建一个 前言 phaser作为一款流行的游戏/动画框架,受到很多web开发者的青睐,最近笔者在逛意大利开发者:emanueleferonato论坛的时候发现...

    BothEyes1993 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<