资讯专栏INFORMATION COLUMN

非阻塞队列ConcurrentLinkedQueue与CAS算法应用分析

Ali_ / 874人阅读

摘要:是无阻塞队列的一种实现,依赖与算法实现。这样比从往后找更有效率出队规则定义补充一项,也表示节点已经被删除参考方法。

ConcurrentLinkedQueue是无阻塞队列的一种实现, 依赖与CAS算法实现。

入队offer

if(q==null)当前是尾节点 -> CAS赋值tail.next = newNode, 成功就跳出循环

elseif(p == q)尾节点被移除 -> 从tail或head重新往后找

else不是尾节点 -> 往next找

规则定义:

当一个节点的next指向自身时, 表示节点已经被移除, 注释中还会强调这一点。

完整代码(JDK8):
/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
/*
* 变量说明:
*   成员变量: 
*       head: 首节点
*       tail: 尾节点, 不一定指向末尾, 两次入队才更新一次
*   局部变量
*         t= tail; //保存循环开始时, 当时的tail值
*         p= t; // 每次查找的起始位置, 可能指向首节点head或者临时尾节点t
*         q= p.next; // 每次循环下一个节点
*        newNode= new Node; // 新节点
*
*
* 重要概念:
*     当p = p.next时, 表示节点已经被移除
*/
public boolean offer(E e) {
    checkNotNull(e);
    final Node newNode = new Node(e);

    for (Node t = tail, p = t;;) {
        Node q = p.next;
        if (q == null) {    // 情况1:  p是尾节点
            // p is last node
            // p是尾节点就直接将新节点放入末尾
            if (p.casNext(null, newNode)) {
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time // 一次跳两个节点, 即插入两次, tail更新一次
                    casTail(t, newNode);  // Failure is OK. // 失败也无妨, 说明被别的线程更新了
                return true;  // 退出循环
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)  // 情况2:  p节点被删除了
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            // 保存的节点p、t都已经失效了,这时需要重新检索,重新检索的起始位置有两种情况
            //     1.1. 如果tail==t,表示tail也是失效的, 那么从head开始找
            //     1.2. 否则tail就是被其他线程更新了, 可以又试着从tail找
            p = (t != (t = tail)) ? t : head;
        else             // 情况3:   沿着p往下找
            // Check for tail updates after two hops.
            // 这段简单看作p = q就好理解了,  这么写是为了提高效率:
            //     1. 情况二中p可能指向了head(由于tail节点失效导致的)
            //     2. 现在tail可能被其他线程更新,也许重新指向了队尾
            //     3. 如果是, 尝试则从队尾开始找, 以减少迭代次数
            p = (p != t && t != (t = tail)) ? t : q;
    }
}
这两段代码看了很久, 特别记录下:

情况2中的p = (t != (t = tail)) ? t : head;
(t != (t = tail))可以分三步来看

  1.1. 首先取出t
  1.2. 将tail赋值给t
  1.3. 将先前取出的t与更新后的t比较

情况3中p = (p != t && t != (t = tail)) ? t : q;

首先:  p != t: 这种情况只有可能发生在执行了情况2后  
现状: 这时p指向head或者中间的元素, t指向一个被删除了的节点
那么如果tail被其他线程更新了, 我们可以将t重新指向tail, p指向t, 就像刚进循环一样, 从尾节点开始检索。
这样比从head往后找更有效率

出队poll 规则定义:

补充一项, item==null,也表示节点已经被删除(参考remove方法)。

/**
* updateHead
*   
*/
public E poll() {
    restartFromHead:
    for (;;) {
        for (Node h = head, p = h, q;;) {
            E item = p.item;

            if (item != null && p.casItem(item, null)) {
                // Successful CAS is the linearization point
                // for item to be removed from this queue.
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            else if (p == q)
                continue restartFromHead;
            else
                p = q;
        }
    }
}

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node h, Node p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}
出队设值操作:

先更新head, 再将旧head的next指向自己

Note: CAS算法实现依靠Unsafe.compareAndSwapObject实现

UNSAFE.compareAndSwapObject(对象, 字段偏移量, 当前值, 新值)

可以为对象中的某个字段实现CAS操作
lazySet依赖UNSAFE.putOrderedObject实现

UNSAFE.putOrderedObject(对象, 字段偏移量, 新值)

这个只能用在volatile字段上  
个人理解: volatile的设值会导致本地缓存失效, 那么需要重新从主存读取, 使用这个方法可以使寄存器缓存依旧有效, 不必急于从主存取值。
使用目的: 移除节点时, 需要更新节点的next指向自身, 但现在next指向的数据实际是有效的; 高并发情况下,如果offser方法已经缓存了这个next值, 直接设置next会导致缓存行失效, CPU需要重新读取next; 而使用putOrderedObject可以让offser从这个next继续检索

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72604.html

相关文章

  • 阻塞同步算法实战(一):ConcurrentLinkedQueue

    摘要:注意这里指的不是当次而是之后,所以如果我们使用队列的方法返回,就知道队列是否为空,但是不知道之后是否为空,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。 前言 本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。 背景介绍 ...

    EscapedDog 评论0 收藏0
  • 通俗易懂,JDK 并发容器总结

    摘要:线程安全的线程安全的,在读多写少的场合性能非常好,远远好于高效的并发队列,使用链表实现。这样带来的好处是在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 评论0 收藏0
  • 阻塞同步算法实战(二):BoundlessCyclicBarrier

    摘要:如果停止了版本更新,可使用方法来解除所有因而阻塞的线程,包括指定版本号的。如果自己维护版本号,则应该保证递增。 前言 相比上一篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。 需求介绍 我需要编写一个同步工具,它需要提供这样几个方法:...

    yintaolaowanzi 评论0 收藏0
  • Java并发

    摘要:对象改变条件对象当前线程要等待线程终止之后才能从返回。如果线程在上的操作中被中断,通道会被关闭,线程的中断状态会被设置,并得到一个。清除线程的中断状态。非公平性锁虽然可能造成饥饿,但极少的线程切换,保证其更大的吞吐量。 声明:Java并发的内容是自己阅读《Java并发编程实战》和《Java并发编程的艺术》整理来的。 showImg(https://segmentfault.com/im...

    SKYZACK 评论0 收藏0
  • ConcurrentLinkedQueue使用实例

    摘要:序是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。的模型,默认的是用这个来实现的。 序 ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元...

    Raaabbit 评论0 收藏0

发表评论

0条评论

Ali_

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<