摘要:是无阻塞队列的一种实现,依赖与算法实现。这样比从往后找更有效率出队规则定义补充一项,也表示节点已经被删除参考方法。
ConcurrentLinkedQueue是无阻塞队列的一种实现, 依赖与CAS算法实现。
入队offerif(q==null)当前是尾节点 -> CAS赋值tail.next = newNode, 成功就跳出循环
elseif(p == q)尾节点被移除 -> 从tail或head重新往后找
else不是尾节点 -> 往next找
规则定义:当一个节点的next指向自身时, 表示节点已经被移除, 注释中还会强调这一点。
完整代码(JDK8):/** * Inserts the specified element at the tail of this queue. * As the queue is unbounded, this method will never return {@code false}. * * @return {@code true} (as specified by {@link Queue#offer}) * @throws NullPointerException if the specified element is null */ /* * 变量说明: * 成员变量: * head: 首节点 * tail: 尾节点, 不一定指向末尾, 两次入队才更新一次 * 局部变量 * t= tail; //保存循环开始时, 当时的tail值 * p= t; // 每次查找的起始位置, 可能指向首节点head或者临时尾节点t * q= p.next; // 每次循环下一个节点 * newNode= new Node; // 新节点 * * * 重要概念: * 当p = p.next时, 表示节点已经被移除 */ public boolean offer(E e) { checkNotNull(e); final Node这两段代码看了很久, 特别记录下:newNode = new Node (e); for (Node t = tail, p = t;;) { Node q = p.next; if (q == null) { // 情况1: p是尾节点 // p is last node // p是尾节点就直接将新节点放入末尾 if (p.casNext(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this queue, // and for newNode to become "live". if (p != t) // hop two nodes at a time // 一次跳两个节点, 即插入两次, tail更新一次 casTail(t, newNode); // Failure is OK. // 失败也无妨, 说明被别的线程更新了 return true; // 退出循环 } // Lost CAS race to another thread; re-read next } else if (p == q) // 情况2: p节点被删除了 // We have fallen off list. If tail is unchanged, it // will also be off-list, in which case we need to // jump to head, from which all live nodes are always // reachable. Else the new tail is a better bet. // 保存的节点p、t都已经失效了,这时需要重新检索,重新检索的起始位置有两种情况 // 1.1. 如果tail==t,表示tail也是失效的, 那么从head开始找 // 1.2. 否则tail就是被其他线程更新了, 可以又试着从tail找 p = (t != (t = tail)) ? t : head; else // 情况3: 沿着p往下找 // Check for tail updates after two hops. // 这段简单看作p = q就好理解了, 这么写是为了提高效率: // 1. 情况二中p可能指向了head(由于tail节点失效导致的) // 2. 现在tail可能被其他线程更新,也许重新指向了队尾 // 3. 如果是, 尝试则从队尾开始找, 以减少迭代次数 p = (p != t && t != (t = tail)) ? t : q; } }
情况2中的p = (t != (t = tail)) ? t : head;
(t != (t = tail))可以分三步来看
1.1. 首先取出t 1.2. 将tail赋值给t 1.3. 将先前取出的t与更新后的t比较
情况3中p = (p != t && t != (t = tail)) ? t : q;
首先: p != t: 这种情况只有可能发生在执行了情况2后出队poll 规则定义:
现状: 这时p指向head或者中间的元素, t指向一个被删除了的节点
那么如果tail被其他线程更新了, 我们可以将t重新指向tail, p指向t, 就像刚进循环一样, 从尾节点开始检索。
这样比从head往后找更有效率
补充一项, item==null,也表示节点已经被删除(参考remove方法)。
/** * updateHead * */ public E poll() { restartFromHead: for (;;) { for (Node出队设值操作:h = head, p = h, q;;) { E item = p.item; if (item != null && p.casItem(item, null)) { // Successful CAS is the linearization point // for item to be removed from this queue. if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } } /** * Tries to CAS head to p. If successful, repoint old head to itself * as sentinel for succ(), below. */ final void updateHead(Node h, Node p) { if (h != p && casHead(h, p)) h.lazySetNext(h); }
先更新head, 再将旧head的next指向自己
Note: CAS算法实现依靠Unsafe.compareAndSwapObject实现UNSAFE.compareAndSwapObject(对象, 字段偏移量, 当前值, 新值)
可以为对象中的某个字段实现CAS操作lazySet依赖UNSAFE.putOrderedObject实现
UNSAFE.putOrderedObject(对象, 字段偏移量, 新值)
这个只能用在volatile字段上
个人理解: volatile的设值会导致本地缓存失效, 那么需要重新从主存读取, 使用这个方法可以使寄存器缓存依旧有效, 不必急于从主存取值。
使用目的: 移除节点时, 需要更新节点的next指向自身, 但现在next指向的数据实际是有效的; 高并发情况下,如果offser方法已经缓存了这个next值, 直接设置next会导致缓存行失效, CPU需要重新读取next; 而使用putOrderedObject可以让offser从这个next继续检索
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/72604.html
摘要:注意这里指的不是当次而是之后,所以如果我们使用队列的方法返回,就知道队列是否为空,但是不知道之后是否为空,并且,当关注的操作发生时,在插入或取出操作的返回值里告知此信息,来指导是否继续注册写操作。 前言 本文写给对ConcurrentLinkedQueue的实现和非阻塞同步算法的实现原理有一定了解,但缺少实践经验的朋友,文中包括了实战中的尝试、所走的弯路,经验和教训。 背景介绍 ...
摘要:线程安全的线程安全的,在读多写少的场合性能非常好,远远好于高效的并发队列,使用链表实现。这样带来的好处是在高并发的情况下,你会需要一个全局锁来保证整个平衡树的线程安全。 该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:如果停止了版本更新,可使用方法来解除所有因而阻塞的线程,包括指定版本号的。如果自己维护版本号,则应该保证递增。 前言 相比上一篇而言,本文不需要太多的准备知识,但技巧性更强一些。因为分析、设计的过程比较复杂繁琐,也限于篇幅,所以,主要展示如何解决这些需求,和讲解代码。另外,所讲的内容也是后一篇实战中需要用到的一个工具类。 需求介绍 我需要编写一个同步工具,它需要提供这样几个方法:...
摘要:序是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元素时,它会返回队列头部的元素。的模型,默认的是用这个来实现的。 序 ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部,当我们获取一个元...
阅读 1786·2021-11-11 16:55
阅读 1436·2019-08-30 15:54
阅读 739·2019-08-29 15:34
阅读 2216·2019-08-29 13:11
阅读 2890·2019-08-26 13:28
阅读 1864·2019-08-26 10:49
阅读 965·2019-08-26 10:40
阅读 2535·2019-08-23 18:21