Java JUC学习 - ConcurrentLinkedDeque 详解 0x00 前言
0x01 ConcurrentLinkedDeque概述ConcurrentLinkedDeque是从Java SE 7/JDK 7开始提供的一种Non-blocking Thread-safe List(非阻塞式线程安全链表),隶属于java.util.concurrent包。其类原型为:
public class ConcurrentLinkedDequeextends AbstractCollection implements Deque , Serializable
An unbounded concurrent deque based on linked nodes. Concurrent insertion, removal, and access operations execute safely across multiple threads. A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.
Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.
Bulk operations that add, remove, or examine multiple elements, such as addAll(java.util.Collection extends E>), removeIf(java.util.function.Predicate super E>) or forEach(java.util.function.Consumer super E>), are not guaranteed to be performed atomically.
大概意思是,批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection extends E>)、removeIf(java.util.function.Predicate super E>)或者removeIf(java.util.function.Predicate super E>)或forEach(java.util.function.Consumer super E>)方法,这个类型并不保证以原子方式执行。由此可见如果想保证原子访问,不得使用批量操作的方法。
0x02 ConcurrentLinkedDeque源代码解析在本节我将从普通链表的实现的角度解析这个非阻塞的线程安全链表的实现。首先对于链表而言一定是由结点构成的,定义如下:
static final class Node{ // 结点的前继结点,Java没有指针,但是普通对象就相当于指针了。 volatile Node prev; // 存储的泛型元素。 volatile E item; // 结点的后继结点 volatile Node next; }
/** * Returns {@code true} if this deque contains the specified element. * More formally, returns {@code true} if and only if this deque contains * at least one element {@code e} such that {@code o.equals(e)}. * * @param o element whose presence in this deque is to be tested * @return {@code true} if this deque contains the specified element */ public boolean contains(Object o) { //如果是空对象直接返回,因为不允许存储空对象 if (o != null) { //比较经典的for循环结构了,从首个结点开始依次寻找。 for (Nodep = first(); p != null; p = succ(p)) { final E item; //如果结点元素不为空并且还是我们要的元素,就返回true if ((item = p.item) != null && o.equals(item)) return true; } } return false; }
/** * Returns {@code true} if this collection contains no elements. * * @return {@code true} if this collection contains no elements */ public boolean isEmpty() { //直接检查能否取到首元素即可,下面会讲解peekFirst()方法的实现 return peekFirst() == null; }
/** * Returns the number of elements in this deque. If this deque * contains more than {@code Integer.MAX_VALUE} elements, it * returns {@code Integer.MAX_VALUE}. * *Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the * asynchronous nature of these deques, determining the current * number of elements requires traversing them all to count them. * Additionally, it is possible for the size to change during * execution of this method, in which case the returned result * will be inaccurate. Thus, this method is typically not very * useful in concurrent applications. * * @return the number of elements in this deque */ public int size() { //这里应该是Java中的标签(label)语句的用法,我们平时编程比较少用到 restartFromHead: for (;;) { //count计数器 int count = 0; //从首结点 for (Node
p = first(); p != null;) { //如果不是空结点 if (p.item != null) //如果count加1之后达到了最大整数值就退出 if (++count == Integer.MAX_VALUE) break; // @see Collection.size() //如果p的下一个结点仍然为p,则从头开始 if (p == (p = p.next)) continue restartFromHead; } return count; } }
public E peekFirst() { //从第一个结点开始循环,找到第一个非空的结点 for (Nodep = first(); p != null; p = succ(p)) { final E item; if ((item = p.item) != null) return item; } return null; }
public E peekLast() { //从最后一个结点开始,找到第一个非空的结点 for (Nodep = last(); p != null; p = pred(p)) { final E item; if ((item = p.item) != null) return item; } return null; }
/** * Links e as first element. */ private void linkFirst(E e) { // 新建一个结点,并且要求结点不为空。 final NodenewNode = newNode(Objects.requireNonNull(e)); restartFromHead: for (;;) //从头结点开始遍历链表 for (Node h = head, p = h, q;;) { //如果p不是第一个结点则应该退回第一个结点 if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node //将新结点的下个结点设为p NEXT.set(newNode, p); // CAS piggyback //设置新结点的上个结点 if (PREV.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time; failure is OK //调整头结点 HEAD.weakCompareAndSet(this, h, newNode); return; } // Lost CAS race to another thread; re-read prev } } }
/** * Links e as last element. */ private void linkLast(E e) { // 新建一个结点,并且要求结点不为空。 final NodenewNode = newNode(Objects.requireNonNull(e)); restartFromTail: for (;;) for (Node t = tail, p = t, q;;) { //如果p不是最后一个结点则应该退回最后一个结点 if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p.prev == p) // NEXT_TERMINATOR continue restartFromTail; else { // p is last node //将新结点的上个结点设为p PREV.set(newNode, p); // CAS piggyback if (NEXT.compareAndSet(p, null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != t) // hop two nodes at a time; failure is OK //调整尾结点 TAIL.weakCompareAndSet(this, t, newNode); return; } // Lost CAS race to another thread; re-read next } } }
0x03 ConcurrentLinkedDeque在实际中的应用不多做解释了,直接上应用。
package cn.xiaolus.cld; import java.util.concurrent.ConcurrentLinkedDeque; public class CLDMain { private static ConcurrentLinkedDequecld = new ConcurrentLinkedDeque<>(); public static void main(String[] args) { int numThread = Runtime.getRuntime().availableProcessors(); Thread[] threads = new Thread[numThread]; for (int i = 0; i < threads.length; i++) { (threads[i] = new Thread(addTask(), "Thread "+i)).start(); } } public static Runnable addTask() { return new Runnable() { @Override public void run() { int num = Runtime.getRuntime().availableProcessors(); for (int i = 0; i < num; i++) { StringBuilder item = new StringBuilder("Item ").append(i); cld.addFirst(item.toString()); callbackAdd(Thread.currentThread().getName(), item); } callbackFinish(Thread.currentThread().getName()); } }; } public static void callbackAdd(String threadName, StringBuilder item) { StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item); System.out.println(builder); } public static void callbackFinish(String threadName) { StringBuilder builder = new StringBuilder(threadName).append(" has Finished"); System.out.println(builder); System.out.println(new StringBuilder("CurrentSize ").append(cld.size())); } }
Thread 0 added :Item 0 Thread 0 added :Item 1 Thread 0 added :Item 2 Thread 0 added :Item 3 Thread 0 has Finished CurrentSize 6 Thread 1 added :Item 0 Thread 2 added :Item 0 Thread 2 added :Item 1 Thread 2 added :Item 2 Thread 2 added :Item 3 Thread 1 added :Item 1 Thread 1 added :Item 2 Thread 2 has Finished Thread 1 added :Item 3 Thread 1 has Finished CurrentSize 13 CurrentSize 13 Thread 3 added :Item 0 Thread 3 added :Item 1 Thread 3 added :Item 2 Thread 3 added :Item 3 Thread 3 has Finished CurrentSize 16
源代码链接:Github - xiaolulwr (路伟饶) / JUC-Demo。
