资讯专栏INFORMATION COLUMN

Java JUC学习 - ConcurrentLinkedDeque 详解

Drummor / 3303人阅读

摘要:概述是从开始提供的一种非阻塞式线程安全链表,隶属于包。当许多线程同时访问一个公共集合时,是一个合适的选择。程序的一次输出为该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是的典型使用场景。

Java JUC学习 - ConcurrentLinkedDeque 详解 0x00 前言

如何实现并发程序,对于Java以及其他高级语言来说都是一件并不容易的事情。在大一上学期的时候,我们学习了链表。可以说链表是一种很常用的数据结构了,不管是实现链式的队列和栈还是实现树都需要这种数据结构。但是在最初的时候我们并没有考虑过链表的并发问题,比如下面是一个经典的链表插入过程:

//C Source Code
//查找函数
LinkNode *SearchNode(LinkList *list, int i);
//插入函数
void LinkListInsert(LinkList *list, LinkNode *node, int i) {
//  首先找到插入点之前的结点
    LinkNode *previous = SearchNode(list, i-1);
//    插入操作
    node->next = previous->next;
    previous->next = node;
}

如果只有一个线程操作链表的话那一定是没有问题的,但是因为链表的插入操作实际上是非原子(nonatomic)访问,所以多线程操作同一个链表的话可能会出现问题。解决这种问题的一种方法是使用线程同步,在访问的时候加锁。不过这样一来编程就会变的很繁琐,也会出现很多不可预料的问题。在Java中为我们提供了JUC类库,里面有很多可以用于多线程访问的类型。今天介绍的就是其中的ConcurrentLinkedDeque类型。

0x01 ConcurrentLinkedDeque概述

ConcurrentLinkedDeque是从Java SE 7/JDK 7开始提供的一种Non-blocking Thread-safe List(非阻塞式线程安全链表),隶属于java.util.concurrent包。其类原型为:

public class ConcurrentLinkedDeque
extends AbstractCollection
implements Deque, Serializable

在API简介中,对ConcurrentLinkedDeque的表述是:

An unbounded concurrent deque based on linked nodes. Concurrent insertion, removal, and access operations execute safely across multiple threads. A ConcurrentLinkedDeque is an appropriate choice when many threads will share access to a common collection. Like most other concurrent collection implementations, this class does not permit the use of null elements.

大概意思是,ConcurrentLinkedDeque是一种基于链接节点的无限并发链表。可以安全地并发执行插入、删除和访问操作。当许多线程同时访问一个公共集合时,ConcurrentLinkedDeque是一个合适的选择。和大多数其他并发的集合类型一样,这个类不允许使用空元素。

听起来很美好,但是在使用过程中还要注意一个元素数量的问题,在API文档中这样表述:

Beware that, unlike in most collections, the size method is NOT a constant-time operation. Because of the asynchronous nature of these deques, determining the current number of elements requires a traversal of the elements, and so may report inaccurate results if this collection is modified during traversal.

大概意思是:与大多数集合类型不同,其size方法不是一个常量操作。因为链表的异步性质,确定当前元素的数量需要遍历所有的元素,所以如果在遍历期间有其他线程修改了这个集合,size方法就可能会报告不准确的结果。

同时,对链表的批量操作也不是一个原子操作,在使用的时候要注意,在API文档中这样表述:

Bulk operations that add, remove, or examine multiple elements, such as addAll(java.util.Collection), removeIf(java.util.function.Predicate) or forEach(java.util.function.Consumer), are not guaranteed to be performed atomically.

大概意思是,批量的操作:包括添加、删除或检查多个元素,比如addAll(java.util.Collection)removeIf(java.util.function.Predicate)或者removeIf(java.util.function.Predicate)forEach(java.util.function.Consumer)方法,这个类型并不保证以原子方式执行。由此可见如果想保证原子访问,不得使用批量操作的方法。

浏览API的介绍,我们对这个类型有了大体上的了解。实际上ConcurrentLinkedDeque本质上也是一个链表,对这个类型的基本操作也和基本的链表一样,基本上就是“创建、插入、删除、遍历、清空”等,下面直接借助具体的例子来表述。

0x02 ConcurrentLinkedDeque源代码解析

在本节我将从普通链表的实现的角度解析这个非阻塞的线程安全链表的实现。首先对于链表而言一定是由结点构成的,定义如下:

static final class Node {
//        结点的前继结点,Java没有指针,但是普通对象就相当于指针了。
        volatile Node prev;
//        存储的泛型元素。
        volatile E item;
//        结点的后继结点
        volatile Node next;
    }

其次是该类的一些公有方法的实现,这些方法大多是实现的java.util.Deque中的接口方法。

检查链表中是否包含给定的元素

/**
     * Returns {@code true} if this deque contains the specified element.
     * More formally, returns {@code true} if and only if this deque contains
     * at least one element {@code e} such that {@code o.equals(e)}.
     *
     * @param o element whose presence in this deque is to be tested
     * @return {@code true} if this deque contains the specified element
     */
    public boolean contains(Object o) {
    //如果是空对象直接返回,因为不允许存储空对象
        if (o != null) {
        //比较经典的for循环结构了,从首个结点开始依次寻找。
            for (Node p = first(); p != null; p = succ(p)) {
                final E item;
        //如果结点元素不为空并且还是我们要的元素,就返回true
                if ((item = p.item) != null && o.equals(item))
                    return true;
            }
        }
        return false;
    }

检查链表是否为空

/**
     * Returns {@code true} if this collection contains no elements.
     *
     * @return {@code true} if this collection contains no elements
     */
    public boolean isEmpty() {
    //直接检查能否取到首元素即可,下面会讲解peekFirst()方法的实现
        return peekFirst() == null;
    }

返回链表中包含的元素个数

/**
     * Returns the number of elements in this deque.  If this deque
     * contains more than {@code Integer.MAX_VALUE} elements, it
     * returns {@code Integer.MAX_VALUE}.
     *
     * 

Beware that, unlike in most collections, this method is * NOT a constant-time operation. Because of the * asynchronous nature of these deques, determining the current * number of elements requires traversing them all to count them. * Additionally, it is possible for the size to change during * execution of this method, in which case the returned result * will be inaccurate. Thus, this method is typically not very * useful in concurrent applications. * * @return the number of elements in this deque */ public int size() { //这里应该是Java中的标签(label)语句的用法,我们平时编程比较少用到 restartFromHead: for (;;) { //count计数器 int count = 0; //从首结点 for (Node p = first(); p != null;) { //如果不是空结点 if (p.item != null) //如果count加1之后达到了最大整数值就退出 if (++count == Integer.MAX_VALUE) break; // @see Collection.size() //如果p的下一个结点仍然为p,则从头开始 if (p == (p = p.next)) continue restartFromHead; } return count; } }

获得链表首结点

public E peekFirst() {
    //从第一个结点开始循环,找到第一个非空的结点
        for (Node p = first(); p != null; p = succ(p)) {
            final E item;
            if ((item = p.item) != null)
                return item;
        }
        return null;
    }

获得链表尾结点

public E peekLast() {
    //从最后一个结点开始,找到第一个非空的结点
        for (Node p = last(); p != null; p = pred(p)) {
            final E item;
            if ((item = p.item) != null)
                return item;
        }
        return null;
    }

下面介绍一些protected方法和private方法。这里仅讲解插入到首结点和插入到尾结点的方法,其他方法思路大致相同,这里就不再介绍了。

    /**
     * Links e as first element.
     */
    private void linkFirst(E e) {
    // 新建一个结点,并且要求结点不为空。
        final Node newNode = newNode(Objects.requireNonNull(e));

        restartFromHead:
        for (;;)
        //从头结点开始遍历链表
            for (Node h = head, p = h, q;;) {
            //如果p不是第一个结点则应该退回第一个结点
                if ((q = p.prev) != null &&
                    (q = (p = q).prev) != null)
                    // Check for head updates every other hop.
                    // If p == q, we are sure to follow head instead.
                    p = (h != (h = head)) ? h : q;
                else if (p.next == p) // PREV_TERMINATOR
                    continue restartFromHead;
                else {
                    // p is first node
                    //将新结点的下个结点设为p
                    NEXT.set(newNode, p); // CAS piggyback
                    //设置新结点的上个结点
                    if (PREV.compareAndSet(p, null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this deque,
                        // and for newNode to become "live".
                        if (p != h) // hop two nodes at a time; failure is OK
                        //调整头结点
                            HEAD.weakCompareAndSet(this, h, newNode);
                        return;
                    }
                    // Lost CAS race to another thread; re-read prev
                }
            }
    }

    /**
     * Links e as last element.
     */
    private void linkLast(E e) {
    // 新建一个结点,并且要求结点不为空。
        final Node newNode = newNode(Objects.requireNonNull(e));

        restartFromTail:
        for (;;)
            for (Node t = tail, p = t, q;;) {
            //如果p不是最后一个结点则应该退回最后一个结点
                if ((q = p.next) != null &&
                    (q = (p = q).next) != null)
                    // Check for tail updates every other hop.
                    // If p == q, we are sure to follow tail instead.
                    p = (t != (t = tail)) ? t : q;
                else if (p.prev == p) // NEXT_TERMINATOR
                    continue restartFromTail;
                else {
                    // p is last node
                    //将新结点的上个结点设为p
                    PREV.set(newNode, p); // CAS piggyback
                    if (NEXT.compareAndSet(p, null, newNode)) {
                        // Successful CAS is the linearization point
                        // for e to become an element of this deque,
                        // and for newNode to become "live".
                        if (p != t) // hop two nodes at a time; failure is OK
                        //调整尾结点
                            TAIL.weakCompareAndSet(this, t, newNode);
                        return;
                    }
                    // Lost CAS race to another thread; re-read next
                }
            }
    }

果然Java库中的代码的实现还是比较复杂的,这次只是粗略的了解。有些地方还是不太理解,所以还需要进一步的学习。

0x03 ConcurrentLinkedDeque在实际中的应用

不多做解释了,直接上应用。

package cn.xiaolus.cld;

import java.util.concurrent.ConcurrentLinkedDeque;

public class CLDMain {
    private static ConcurrentLinkedDeque cld = new ConcurrentLinkedDeque<>();
    
    public static void main(String[] args) {
        int numThread = Runtime.getRuntime().availableProcessors();
        Thread[] threads = new Thread[numThread];
        for (int i = 0; i < threads.length; i++) {
            (threads[i] = new Thread(addTask(), "Thread "+i)).start();
        }
    }
    
    public static Runnable addTask() {
        return new Runnable() {
            
            @Override
            public void run() {
                int num = Runtime.getRuntime().availableProcessors();
                for (int i = 0; i < num; i++) {
                    StringBuilder item = new StringBuilder("Item ").append(i);
                    cld.addFirst(item.toString());
                    callbackAdd(Thread.currentThread().getName(), item);
                }
                callbackFinish(Thread.currentThread().getName());
            }
        };
    }
    
    public static void callbackAdd(String threadName, StringBuilder item) {
        StringBuilder builder = new StringBuilder(threadName).append(" added :").append(item);
        System.out.println(builder);
    }
    
    public static void callbackFinish(String threadName) {
        StringBuilder builder = new StringBuilder(threadName).append(" has Finished");
        System.out.println(builder);
        System.out.println(new StringBuilder("CurrentSize ").append(cld.size()));
    }
}

程序的一次输出为:

Thread 0 added :Item 0
Thread 0 added :Item 1
Thread 0 added :Item 2
Thread 0 added :Item 3
Thread 0 has Finished
CurrentSize 6
Thread 1 added :Item 0
Thread 2 added :Item 0
Thread 2 added :Item 1
Thread 2 added :Item 2
Thread 2 added :Item 3
Thread 1 added :Item 1
Thread 1 added :Item 2
Thread 2 has Finished
Thread 1 added :Item 3
Thread 1 has Finished
CurrentSize 13
CurrentSize 13
Thread 3 added :Item 0
Thread 3 added :Item 1
Thread 3 added :Item 2
Thread 3 added :Item 3
Thread 3 has Finished
CurrentSize 16

该程序实现了多线程并发添加大量元素到一个公共的链表,刚好是ConcurrentLinkedDeque的典型使用场景。同时也验证了上面的说法,即size()方法需要遍历链表,可能返回错误的结果。

源代码链接:Github - xiaolulwr (路伟饶) / JUC-Demo。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68454.html

相关文章

  • Java多线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java多线程进阶(三一)—— J.U.C之collections框架:BlockingQueue接

    摘要:和方法会一直阻塞调用线程,直到线程被中断或队列状态可用和方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。 showImg(https://segmentfault.com/img/bVbgyPy?w=1191&h=670); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、引言 从本节开始,我们将介绍juc-collectio...

    entner 评论0 收藏0
  • java高并发系列 - 第19天:JUC中的Executor框架详解1,全面掌握java并发相关技术

    摘要:有三种状态运行关闭终止。类类,提供了一系列工厂方法用于创建线程池,返回的线程池都实现了接口。线程池的大小一旦达到最大值就会保持不变,在提交新任务,任务将会进入等待队列中等待。此线程池支持定时以及周期性执行任务的需求。 这是java高并发系列第19篇文章。 本文主要内容 介绍Executor框架相关内容 介绍Executor 介绍ExecutorService 介绍线程池ThreadP...

    icattlecoder 评论0 收藏0
  • Java多线程进阶(三十)—— J.U.C之collections框架:ConcurrentLink

    摘要:在之前,除了类外,并没有其它适合并发环境的栈数据结构。作为双端队列,可以当作栈来使用,并且高效地支持并发环境。 showImg(https://segmentfault.com/img/bVbguF7?w=1280&h=853); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、引言 在开始讲ConcurrentLinkedDeque之前...

    CompileYouth 评论0 收藏0
  • java高并发系列 - 第21天:java中的CAS操作,java并发的基石

    摘要:方法由两个参数,表示期望的值,表示要给设置的新值。操作包含三个操作数内存位置预期原值和新值。如果处的值尚未同时更改,则操作成功。中就使用了这样的操作。上面操作还有一点是将事务范围缩小了,也提升了系统并发处理的性能。 这是java高并发系列第21篇文章。 本文主要内容 从网站计数器实现中一步步引出CAS操作 介绍java中的CAS及CAS可能存在的问题 悲观锁和乐观锁的一些介绍及数据库...

    zorro 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<