资讯专栏INFORMATION COLUMN

Java多线程进阶(三五)—— J.U.C之collections框架:SynchronousQue

missonce / 1442人阅读

摘要:三总结主要用于线程之间的数据交换,由于采用无锁算法,其性能一般比单纯的其它阻塞队列要高。它的最大特点时不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程。

本文首发于一世流云专栏:https://segmentfault.com/blog...
一、SynchronousQueue简介

SynchronousQueue是JDK1.5时,随着J.U.C包一起引入的一种阻塞队列,它实现了BlockingQueue接口,底层基于队列实现:

没有看错,SynchronousQueue的底层实现包含两种数据结构——队列。这是一种非常特殊的阻塞队列,它的特点简要概括如下:

入队线程和出队线程必须一一匹配,否则任意先到达的线程会阻塞。比如ThreadA进行入队操作,在有其它线程执行出队操作之前,ThreadA会一直等待,反之亦然;

SynchronousQueue内部不保存任何元素,也就是说它的容量为0,数据直接在配对的生产者和消费者线程之间传递,不会将数据缓冲到队列中。

SynchronousQueue支持公平/非公平策略。其中非公平模式,基于内部数据结构——“栈”来实现,公平模式,基于内部数据结构——“队列”来实现;

SynchronousQueue基于一种名为“Dual stack and Dual queue”的无锁算法实现。

注意:上述的特点1,和我们之前介绍的Exchanger其实非常相似,可以类比Exchanger的功能来理解。
二、SynchronousQueue原理 构造

之前提到,SynchronousQueue根据公平/非公平访问策略的不同,内部使用了两种不同的数据结构:栈和队列。我们先来看下对象的构造,SynchronousQueue只有2种构造器:

/**
 * 默认构造器.
 * 默认使用非公平策略.
 */
public SynchronousQueue() {
    this(false);
}
/**
 * 指定策略的构造器.
 */
public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue() : new TransferStack();
}

可以看到,对于公平策略,内部构造了一个TransferQueue对象,而非公平策略则是构造了TransferStack对象。这两个类都继承了内部类Transferer,SynchronousQueue中的所有方法,其实都是委托调用了TransferQueue/TransferStack的方法:

public class SynchronousQueue extends AbstractQueue
        implements BlockingQueue, java.io.Serializable {
 
    /**
     * tranferer对象, 构造时根据策略类型确定.
     */
    private transient volatile Transferer transferer;
 
    /**
     * Shared internal API for dual stacks and queues.
     */
    abstract static class Transferer {
        /**
         * Performs a put or take.
         *
         * @param e 非null表示 生产者 -> 消费者;
         *          null表示, 消费者 -> 生产者.
         * @return 非null表示传递的数据; null表示传递失败(超时或中断).
         */
        abstract E transfer(E e, boolean timed, long nanos);
    }
 
    /**
     * Dual stack(双栈结构).
     * 非公平策略时使用.
     */
    static final class TransferStack extends Transferer {
        // ...
    }
 
    /**
     * Dual Queue(双端队列).
     * 公平策略时使用.
     */
    static final class TransferQueue extends Transferer {
        // ...
    }
 
    // ...
}
栈结构

非公平策略由TransferStack类实现,既然TransferStack是栈,那就有结点。TransferStack内部定义了名为SNode的结点:

static final class SNode {
    volatile SNode next;
    volatile SNode match;       // 与当前结点配对的结点
    volatile Thread waiter;     // 当前结点对应的线程
    Object item;                // 实际数据或null
    int mode;                   // 结点类型
 
    SNode(Object item) {
        this.item = item;
    }
  
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;
 
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }

    // ...

}

上述SNode结点的定义中有个mode字段,表示结点的类型。TransferStack一共定义了三种结点类型,任何线程对TransferStack的操作都会创建下述三种类型的某种结点:

REQUEST:表示未配对的消费者(当线程进行出队操作时,会创建一个mode值为REQUEST的SNode结点 )

DATA:表示未配对的生产者(当线程进行入队操作时,会创建一个mode值为DATA的SNode结点 )

FULFILLING:表示配对成功的消费者/生产者

static final class TransferStack extends Transferer {
 
    /**
     * 未配对的消费者
     */
    static final int REQUEST = 0;
    /**
     * 未配对的生产者
     */
    static final int DATA = 1;
    /**
     * 配对成功的消费者/生产者
     */
    static final int FULFILLING = 2;
 
     volatile SNode head;
 
    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long headOffset;
 
    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = TransferStack.class;
            headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
 
    // ...
}
核心操作——put/take

SynchronousQueue的入队操作调用了put方法:

/**
 * 入队指定元素e.
 * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
 */
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

SynchronousQueue的出队操作调用了take方法:

/**
 * 出队一个元素.
 * 如果没有另一个线程进行出队操作, 则阻塞该入队线程.
 */
public E take() throws InterruptedException {
    E e = transferer.transfer(null, false, 0);
    if (e != null)
        return e;
    Thread.interrupted();
    throw new InterruptedException();
}

可以看到,SynchronousQueue一样不支持null元素,实际的入队/出队操作都是委托给了transfer方法,该方法返回null表示出/入队失败(通常是线程被中断或超时):

/**
 * 入队/出队一个元素.
 */
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; // s表示新创建的结点
    // 入参e==null, 说明当前是出队线程(消费者), 否则是入队线程(生产者)
    // 入队线程创建一个DATA结点, 出队线程创建一个REQUEST结点
    int mode = (e == null) ? REQUEST : DATA;

    for (; ; ) {    // 自旋
        SNode h = head;
        if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
            if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
                if (h != null && h.isCancelled())
                    casHead(h, h.next);
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
                SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
                if (m == s) {                                   // 阻塞过程中被中断
                    clean(s);
                    return null;
                }

                // 此时m为配对结点
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);

                // 入队线程null, 出队线程返回配对结点的值
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
            // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行

        } else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
            if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
                casHead(h, h.next);
            else if (casHead(h, s = snode(s, e,
                h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
                for (; ; ) {
                    SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
                    if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                        casHead(s, null);
                        s = null;
                        break;
                    }

                    SNode mn = m.next;
                    if (m.tryMatch(s)) {    // 进行结点匹配
                        casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                        return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
                    } else                  // 匹配失败
                        s.casNext(m, mn);   // 移除原待匹配结点
                }
            }
        } else {                            // CASE3: 其它线程正在匹配
            SNode m = h.next;
            if (m == null)                  // 栈顶的next==null, 则直接弹出, 重新进入下一次自旋
                casHead(h, null);
            else {                          // 尝试和其它线程竞争匹配
                SNode mn = m.next;
                if (m.tryMatch(h))
                    casHead(h, mn);         // 匹配成功
                else
                    h.casNext(m, mn);       // 匹配失败(被其它线程抢先匹配成功了)
            }
        }
    }
}

整个transfer方法考虑了限时等待的情况,且入队/出队其实都是调用了同一个方法,其主干逻辑就是在一个自旋中完成以下三种情况之一的操作,直到成功,或者被中断或超时取消:

栈为空,或栈顶结点类型与当前入队结点相同。这种情况,调用线程会阻塞;

栈顶结点还未配对成功,且与当前入队结点可以配对。这种情况,直接进行配对操作;

栈顶结点正在配对中。这种情况,直接进行下一个结点的配对。

出/入队示例讲解

为了便于理解,我们来看下面这个调用示例(假设不考虑限时等待的情况),假设一共有三个线程ThreadA、ThreadB、ThreadC:

①初始栈结构

初始栈为空,head为栈顶指针,始终指向栈顶结点:

②ThreadA(生产者)执行入队操作

由于此时栈为空,所以ThreadA会进入CASE1,创建一个类型为DATA的结点:

if (h == null || h.mode == mode) {          // CASE1: 栈为空 或 栈顶结点类型与当前mode相同
    if (timed && nanos <= 0) {              // case1.1: 限时等待的情况
        if (h != null && h.isCancelled())
            casHead(h, h.next);
        else
            return null;
    } else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
        SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
        if (m == s) {                                   // 阻塞过程中被中断
            clean(s);
            return null;
        }

        // 此时m为配对结点
        if ((h = head) != null && h.next == s)
            casHead(h, s.next);

        // 入队线程null, 出队线程返回配对结点的值
        return (E) ((mode == REQUEST) ? m.item : s.item);
    }
    // 执行到此处说明入栈失败(多个线程同时入栈导致CAS操作head失败),则进入下一次自旋继续执行
}

CASE1分支中,将结点压入栈后,会调用awaitFulfill方法,该方法会阻塞调用线程:

/**
 * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
 *
 * @param s 等待的结点
 * @return 返回配对的结点 或 当前结点(说明线程被中断了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能优化操作(计算自旋次数)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存当前结点的匹配结点.
         * s.match==null说明还没有匹配结点
         * s.match==s说明当前结点s对应的线程被中断了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
            s.waiter = w;           // s.waiter保存当前阻塞线程
        else if (!timed)
            LockSupport.park(this); // 阻塞当前线程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}      

此时栈结构如下,结点的waiter字段保存着创建该结点的线程ThreadA,ThreadA等待着被配对消费者线程唤醒:

③ThreadB(生产者)执行入队操作

此时栈顶结点的类型和ThreadB创建的结点相同(都是DATA类型的结点),所以依然走CASE1分支,直接将结点压入栈:

④ThreadC(消费者)执行出队操作

此时栈顶结点的类型和ThreadC创建的结点匹配(栈顶DATA类型,ThreadC创建的是REQUEST类型),所以走CASE2分支,该分支会将匹配的两个结点弹出栈:

else if (!isFulfilling(h.mode)) {          // CASE2: 栈顶结点还未配对成功
    if (h.isCancelled())                     // case2.1: 元素取消情况(因中断或超时)的处理
        casHead(h, h.next);
    else if (casHead(h, s = snode(s, e,
        h, FULFILLING | mode))) {      // case2.2: 将当前结点压入栈中
        for (; ; ) {
            SNode m = s.next;       // s.next指向原栈顶结点(也就是与当前结点匹配的结点)
            if (m == null) {        // m==null说明被其它线程抢先匹配了, 则跳出循环, 重新下一次自旋
                casHead(s, null);
                s = null;
                break;
            }

            SNode mn = m.next;
            if (m.tryMatch(s)) {    // 进行结点匹配
                casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
                return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
            } else                  // 匹配失败
                s.casNext(m, mn);   // 移除原待匹配结点
        }
    }
} 

上述isFulfilling方法就是判断结点是否匹配:

/**
 * 判断m是否已经配对成功.
 */
static boolean isFulfilling(int m) {
    return (m & FULFILLING) != 0;
}

ThreadC创建结点并压入栈后,栈的结构如下:

此时,ThreadC会调用tryMatch方法进行匹配,该方法的主要作用有两点:

将待结点的match字段置为与当前配对的结点(如上图中,结点m是待配对结点,最终m.math == s

唤醒待配对结点中的线程(如上图中,唤醒结点m中ThreadB线程)

/**
 * 尝试将当前结点和s结点配对.
 */
boolean tryMatch(SNode s) {
    if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
        Thread w = waiter;
        if (w != null) {    // 唤醒当前结点对应的线程
            waiter = null;
            LockSupport.unpark(w);
        }
        return true;
    }
    return match == s;      // 配对成功返回true
}

匹配完成后,会将匹配的两个结点弹出栈,并返回匹配值:

if (m.tryMatch(s)) {    // 进行结点匹配
    casHead(s, mn);     // 匹配成功, 将匹配的两个结点全部弹出栈
    return (E) ((mode == REQUEST) ? m.item : s.item);   // 返回匹配值
}

最终,ThreadC拿到了等待配对结点中的数据并返回,此时栈的结构如下:

注意: CASE2分支中ThreadC创建的结点的mode值并不是REQUEST,其mode值为FULFILLING | modeFULFILLING | mode的主要作用就是给栈顶结点置一个标识(二进制为11或10),表示当前有线程正在对栈顶匹配,这时如果有其它线程进入自旋(并发情况),则CASE2一定失败,因为isFulfilling的结果必然为true,所以会进入CASE3分支——跳过栈顶结点进行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))

⑤ThreadB(生产者)唤醒后继续执行

ThreadB被唤醒后,会从原阻塞处继续执行,并进入下一次自旋,在下一次自旋中,由于结点的match字段已经有了匹配结点,所以直接返回配对结点:

/**
 * 阻塞当前调用线程, 并将线程信息记录在s.waiter字段上.
 *
 * @param s 等待的结点
 * @return 返回配对的结点 或 当前结点(说明线程被中断了)
 */
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();

    // 性能优化操作(计算自旋次数)
    int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (; ; ) {
        if (w.isInterrupted())
            s.tryCancel();
        /**
         * s.match保存当前结点的匹配结点.
         * s.match==null说明还没有匹配结点
         * s.match==s说明当前结点s对应的线程被中断了
         */
        SNode m = s.match;
        if (m != null)
            return m;
        if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel();
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins - 1) : 0;
        else if (s.waiter == null)  // 还没有匹配结点, 则保存当前线程
            s.waiter = w;           // s.waiter保存当前阻塞线程
        else if (!timed)
            LockSupport.park(this); // 阻塞当前线程
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos);
    }
}

最终,在下面分支中返回:

else if (casHead(h, s = snode(s, e, h, mode))) {  // case1.2 将当前结点压入栈
    SNode m = awaitFulfill(s, timed, nanos);        // 阻塞当前调用线程
    if (m == s) {                                   // 阻塞过程中被中断
        clean(s);
        return null;
    }

    // 此时m为配对结点
    if ((h = head) != null && h.next == s)
        casHead(h, s.next);

    // 入队线程null, 出队线程返回配对结点的值
    return (E) ((mode == REQUEST) ? m.item : s.item);
}
注意:对于入队线程(生产者),返回的是它入队时携带的原有元素值。
队列结构

SynchronousQueue的公平策略由TransferQueue类实现,TransferQueue内部定义了名为QNode的结点,一个head队首指针,一个tail队尾指针:

/**
 * Dual Queue(双端队列).
 * 公平策略时使用.
 */
static final class TransferQueue extends Transferer {

    /**
     * Head of queue
     */
    transient volatile QNode head;
    /**
     * Tail of queue
     */
    transient volatile QNode tail;
    /**
     * Reference to a cancelled node that might not yet have been
     * unlinked from queue because it was the last inserted node
     * when it was cancelled.
     */
    transient volatile QNode cleanMe;

    /**
     * 队列结点定义.
     */
    static final class QNode {
        volatile QNode next;          // next node in queue
        volatile Object item;         // CAS"ed to or from null
        volatile Thread waiter;       // to control park/unpark
        final boolean isData;
        // ...
    }
    
    // ...
}
关于TransferQueue的transfer方法就不再赘述了,其思路和TransferStack大致相同,总之就是入队/出队必须一一匹配,否则任意一方就会加入队列并等待匹配线程唤醒。读者可以自行阅读TransferQueued的源码。
三、总结

TransferQueue主要用于线程之间的数据交换,由于采用无锁算法,其性能一般比单纯的其它阻塞队列要高。它的最大特点时不存储实际元素,而是在内部通过栈或队列结构保存阻塞线程。后面我们讲JUC线程池框架的时候,还会再次看到它的身影。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77078.html

相关文章

  • Java线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java线程进阶(二六)—— J.U.Ccollections框架:ConcurrentSkip

    摘要:我们来看下的类继承图可以看到,实现了接口,在多线程进阶二五之框架中,我们提到过实现了接口,以提供和排序相关的功能,维持元素的有序性,所以就是一种为并发环境设计的有序工具类。唯一的区别是针对的仅仅是键值,针对键值对进行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首发于一世流云专栏:https://seg...

    levius 评论0 收藏0
  • Java线程进阶(二七)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:仅仅当有多个线程同时进行写操作时,才会进行同步。可以看到,上述方法返回一个迭代器对象,的迭代是在旧数组上进行的,当创建迭代器的那一刻就确定了,所以迭代过程中不会抛出并发修改异常。另外,迭代器对象也不支持修改方法,全部会抛出异常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首发于一世流云专栏:https://...

    garfileo 评论0 收藏0
  • Java线程进阶(二八)—— J.U.Ccollections框架:CopyOnWriteArr

    摘要:我们之前已经介绍过了,底层基于跳表实现,其操作平均时间复杂度均为。事实上,内部引用了一个对象,以组合方式,委托对象实现了所有功能。线程安全内存的使用较多迭代是对快照进行的,不会抛出,且迭代过程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首发于一世流云专栏:https://segmentfa...

    NeverSayNever 评论0 收藏0
  • Java线程进阶(三七)—— J.U.Ccollections框架:LinkedBlocking

    摘要:接口截止目前为止,我们介绍的阻塞队列都是实现了接口。该类在构造时一般需要指定容量,如果不指定,则最大容量为。另外,由于内部通过来保证线程安全,所以的整体实现时比较简单的。另外,双端队列相比普通队列,主要是多了队尾出队元素队首入队元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首发于一世流云专栏:ht...

    light 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<