摘要:所以,在并发量适中的情况下,一般具有较好的性能。字段指向队列头,指向队列尾,通过来操作字段值以及对象的字段值。单线程的情况下,元素入队比较好理解,直接线性地在队首插入元素即可。
本文首发于一世流云专栏:https://segmentfault.com/blog...一、ConcurrentLinkedQueue简介
ConcurrentLinkedQueue是JDK1.5时随着J.U.C一起引入的一个支持并发环境的队列。从名字就可以看出来,ConcurrentLinkedQueue底层是基于链表实现的。
Doug Lea在实现ConcurrentLinkedQueue时,并没有利用锁或底层同步原语,而是完全基于自旋+CAS的方式实现了该队列。回想一下AQS,AQS内部的CLH等待队列也是利用了这种方式。
由于是完全基于无锁算法实现的,所以当出现多个线程同时进行修改队列的操作(比如同时入队),很可能出现CAS修改失败的情况,那么失败的线程会进入下一次自旋,再尝试入队操作,直到成功。所以,在并发量适中的情况下,ConcurrentLinkedQueue一般具有较好的性能。
二、ConcurrentLinkedQueue原理 队列结构我们来看下ConcurrentLinkedQueue的内部结构,:
public class ConcurrentLinkedQueueextends AbstractQueue implements Queue , java.io.Serializable { /** * 队列头指针 */ private transient volatile Node head; /** * 队列尾指针. */ private transient volatile Node tail; // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } /** * 队列结点定义 */ private static class Node { volatile E item; // 元素值 volatile Node next; // 后驱指针 Node(E item) { UNSAFE.putObject(this, itemOffset, item); } boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } //... }
可以看到,ConcurrentLinkedQueue内部就是一个简单的单链表结构,每入队一个元素就是插入一个Node类型的结点。字段head指向队列头,tail指向队列尾,通过Unsafe来CAS操作字段值以及Node对象的字段值。
构造器定义ConcurrentLinkedQueue包含两种构造器:
/** * 构建一个空队列(head,tail均指向一个占位结点). */ public ConcurrentLinkedQueue() { head = tail = new Node(null); }
/** * 根据已有集合,构造队列 */ public ConcurrentLinkedQueue(Collection extends E> c) { Nodeh = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node (e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node (null); head = h; tail = t; }
我们重点看下空构造器,通过空构造器建立的ConcurrentLinkedQueue对象,其head和tail指针并非指向null,而是指向一个item值为null的Node结点——哨兵结点,如下图:
元素的入队是在队尾插入元素,关于队列的操作,如果读者不熟悉,可以参考《Algorithms 4th》或我的这篇博文:https://www.jianshu.com/p/f9b...
ConcurrentLinkedQueue的入队代码很简单,却非常精妙:
/** * 入队一个元素. * * @throws NullPointerException 元素不能为null */ public boolean add(E e) { return offer(e); }
/** * 在队尾入队元素e, 直到成功 */ public boolean offer(E e) { checkNotNull(e); final NodenewNode = new Node (e); for (Node t = tail, p = t; ; ) { // 自旋, 直到插入结点成功 Node q = p.next; if (q == null) { // CASE1: 正常情况下, 新结点直接插入到队尾 if (p.casNext(null, newNode)) { // CAS竞争插入成功 if (p != t) // CAS竞争失败的线程会在下一次自旋中进入该逻辑 casTail(t, newNode); // 重新设置队尾指针tail return true; } // CAS竞争插入失败,则进入下一次自旋 } else if (p == q) // CASE2: 发生了出队操作 p = (t != (t = tail)) ? t : head; else // 将p重新指向队尾结点 p = (p != t && t != (t = tail)) ? t : q; } }
我们来分析下上面offer方法的实现。单线程的情况下,元素入队比较好理解,直接线性地在队首插入元素即可。现在我们假设有两个线程ThreadA和ThreadB同时进行入队操作:
①ThreadA先多带带入队两个元素9、2
此时队列的结构如下:
②ThreadA入队元素“10”,ThreadB入队元素“25”
此时ThreadA和ThreadB若并发执行,我们看下会发生什么:
1、ThreadA和ThreadB同时进入自旋中的以下代码块:
if (q == null) { // CASE1: 正常情况下, 新结点直接插入到队尾 if (p.casNext(null, newNode)) { // CAS竞争插入成功 if (p != t) // CAS竞争失败的线程会在下一次自旋中进入该逻辑 casTail(t, newNode); // 重新设置队尾指针tail return true; } // CAS竞争插入失败,则进入下一次自旋 }
2、ThreadA执行cas操作(p.casNext)成功,插入新结点“10”
ThreadA执行完成后,直接返回true,队列结构如下:
3、ThreadB执行cas操作(p.casNext)失败
由于CAS操作同时修改队尾元素,导致ThreadB操作失败,则ThreadB进入下一次自旋;
在下一次自旋中,进入以下代码块:
else // 将p重新指向队尾结点 p = (p != t && t != (t = tail)) ? t : q;
上述分支的作用就是让p指针重新定位到队尾结点,此时队列结构如下:
然后ThreadB会继续下一次自旋,并再次进入以下代码块:
if (q == null) { // CASE1: 正常情况下, 新结点直接插入到队尾 if (p.casNext(null, newNode)) { // CAS竞争插入成功 if (p != t) // CAS竞争失败的线程会在下一次自旋中进入该逻辑 casTail(t, newNode); // 重新设置队尾指针tail return true; } // CAS竞争插入失败,则进入下一次自旋 }
此时,CAS操作成功,队列结构如下:
由于此时p!=t ,所以会调用casTail方法重新设置队尾指针:
casTail(t, newNode); // 重新设置队尾指针tail
最终队列如下:
从上面的分析过程可以看到,由于入队元素一定是要链接到队尾的,但并发情况下队尾结点可能随时变化,所以就需要指针定位最新的队尾结点,并在入队时判断队尾结点是否改变了,如果改变了,就需要重新设置定位指针,然后在下一次自旋中继续尝试入队操作。
上面整个执行步骤有一段分支还没有覆盖到:
else if (p == q) // CASE2: 发生了出队操作 p = (t != (t = tail)) ? t : head;
这个分支只有在元素入队的同时,针对该元素也发生了“出队”操作才会执行,我们后面会分析元素的“出队”,理解了“出队”操作再回头来看这个分支就容易理解很多了。
出队操作队列中元素的“出队”是从队首移除元素,我们来看下ConcurrentLinkedQueue是如何实现出队的:
/** * 在队首出队元素, 直到成功 */ public E poll() { restartFromHead: for (; ; ) { for (Nodeh = head, p = h, q; ; ) { E item = p.item; if (item != null && p.casItem(item, null)) { // CASE2: 队首是非哨兵结点(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // CASE1: 队首是一个哨兵结点(item==null) updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
还是通过示例来看,假设初始的队列结构如下:
①ThreadA先多带带进行出队操作
由于head所指的是item==null的结点,所以ThreadA会执行以下分支:
else p = q;
然后进入下一次自旋,在自旋中执行以下分支,如果CAS操作成功,则移除首个有效元素,并重新设置头指针:
if (item != null && p.casItem(item, null)) { // CASE2: 队首是非哨兵结点(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
此时队列的结构如下:
如果ThreadA的CAS操作失败呢?
CAS操作失败则会进入以下分支,并重新开始自旋:
else if (p == q) continue restartFromHead;
最终前面两个null结点会被GC回收,队列结构如下:
②ThreadA继续进行出队操作
ThreadA继续执行“出队”操作,还是执行以下分支:
if (item != null && p.casItem(item, null)) { // CASE2: 队首是非哨兵结点(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
但是此时p==h,所以仅将头结点置null,这其实是一种“懒删除”的策略。
出队元素“2”:
出队元素“10”:
最终队列结果如下:
③ThreadA进行出队,其它线程进行入队
这是最特殊的一种情况,当队列中只剩下一个元素时,如果同时发生出队和入队操作,会导致队列出现下面这种结构:(假设ThreadA进行出队元素“25”,ThreadB进行入队元素“11”)
此时tail.next=tail自身,所以ThreadB在执行入队时,会进入到offer方法的以下分支:
else if (p == q) // CASE2: 发生了出队操作 p = (t != (t = tail)) ? t : head;三、总结
ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法来保证线程并发访问时的数据一致性。由于队列本身是一种链表结构,所以虽然算法看起来很简单,但其实需要考虑各种并发的情况,实现复杂度较高,并且ConcurrentLinkedQueue不具备实时的数据一致性,实际运用中,队列一般在生产者-消费者的场景下使用得较多,所以ConcurrentLinkedQueue的使用场景并不如阻塞队列那么多。
另外,关于ConcurrentLinkedQueue还有以下需要注意的几点:
ConcurrentLinkedQueue的迭代器是弱一致性的,这在并发容器中是比较普遍的现象,主要是指在一个线程在遍历队列结点而另一个线程尝试对某个队列结点进行修改的话不会抛出ConcurrentModificationException,这也就造成在遍历某个尚未被修改的结点时,在next方法返回时可以看到该结点的修改,但在遍历后再对该结点修改时就看不到这种变化。
size方法需要遍历链表,所以在并发情况下,其结果不一定是准确的,只能供参考。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76960.html
摘要:在之前,除了类外,并没有其它适合并发环境的栈数据结构。作为双端队列,可以当作栈来使用,并且高效地支持并发环境。 showImg(https://segmentfault.com/img/bVbguF7?w=1280&h=853); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、引言 在开始讲ConcurrentLinkedDeque之前...
摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...
摘要:我们来看下的类继承图可以看到,实现了接口,在多线程进阶二五之框架中,我们提到过实现了接口,以提供和排序相关的功能,维持元素的有序性,所以就是一种为并发环境设计的有序工具类。唯一的区别是针对的仅仅是键值,针对键值对进行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首发于一世流云专栏:https://seg...
摘要:仅仅当有多个线程同时进行写操作时,才会进行同步。可以看到,上述方法返回一个迭代器对象,的迭代是在旧数组上进行的,当创建迭代器的那一刻就确定了,所以迭代过程中不会抛出并发修改异常。另外,迭代器对象也不支持修改方法,全部会抛出异常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首发于一世流云专栏:https://...
摘要:我们之前已经介绍过了,底层基于跳表实现,其操作平均时间复杂度均为。事实上,内部引用了一个对象,以组合方式,委托对象实现了所有功能。线程安全内存的使用较多迭代是对快照进行的,不会抛出,且迭代过程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首发于一世流云专栏:https://segmentfa...
阅读 2664·2021-11-24 09:38
阅读 1979·2019-08-30 15:53
阅读 1234·2019-08-30 15:44
阅读 3229·2019-08-30 14:10
阅读 3579·2019-08-29 16:29
阅读 1799·2019-08-29 16:23
阅读 1099·2019-08-29 16:20
阅读 1471·2019-08-29 11:13