资讯专栏INFORMATION COLUMN

JDK源码那些事儿之并发ConcurrentHashMap下篇

Zack / 3006人阅读

摘要:上一篇文章已经就进行了部分说明,介绍了其中涉及的常量和变量的含义,有些部分需要结合方法源码来理解,今天这篇文章就继续讲解并发前言本文主要介绍中的一些重要方法,结合上篇文章中的讲解部分进行更进一步的介绍回顾下上篇文章,我们应该已经知道的整体结

上一篇文章已经就ConcurrentHashMap进行了部分说明,介绍了其中涉及的常量和变量的含义,有些部分需要结合方法源码来理解,今天这篇文章就继续讲解并发ConcurrentHashMap

前言

本文主要介绍ConcurrentHashMap中的一些重要方法,结合上篇文章中的讲解部分进行更进一步的介绍

回顾下上篇文章,我们应该已经知道ConcurrentHashMap的整体结构和HashMap基本一致,不同的是处理多线程并发下保证操作的正确性,ConcurrentHashMap通过CAS和synchronized进行并发控制,当然,这种情况下各种处理都会变的更为复杂,下面我们就通过方法来深入理解ConcurrentHashMap的操作

重要方法

在一些方法中展示了各个变量以及常量的使用,能让我们更好的理解其中的操作

tabAt/casTabAt/setTabAt

下列方法用于读写table数组,使用Unsafe提供的更新获取volatile变量,CAS更新数组元素等操作

    // 读取table[i]
    @SuppressWarnings("unchecked")
    static final  Node tabAt(Node[] tab, int i) {
        return (Node)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
    }
    // CAS更新table[i]
    static final  boolean casTabAt(Node[] tab, int i,
                                        Node c, Node v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }
    // 插入table[i]
    static final  void setTabAt(Node[] tab, int i, Node v) {
        U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
    }
size

size方法返回了一个不精确的值,在多线程环境下,返回一个不精确的值,通过sumCount迭代counterCells统计sum值。

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
    }

这里很多人可能会问,为什么需要叠加counterCells数组的值呢?

其实这和ConcurrentHashMap特点有关,多线程环境下,同时插入值,执行CAS操作,执行成功的更新了baseCount,而执行失败的则将值放入到了counterCells数组中,可以查阅CounterCell内部类源码,只有一个long类型变量,每次进行插入或者删除时调用addCount通过CAS操作更新baseCount,失败时执行fullAddCount方法,初始化counterCells数组,并将1(相当于插入或删除一个元素)插入到CounterCell类中,这样尽可能保证了Map长度的正确性,这里理解流程即可,不深入,addCount部分有具体操作可查看

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }
get

参考HashMap,类似操作流程,需要注意的也就是在eh < 0处,如果是特殊节点,比如TreeBin或者ForwardingNode节点,则调用其具体类实现的find方法完成遍历查询,内部类解释可以参考我的上一篇文章

计算hash值

判断table是否为空,不为空,找到对应hash桶根节点判断

非根节点继续遍历树或者链表,存在对应值则返回对应值,否则返回null

    public V get(Object key) {
        Node[] tab; Node e, p; int n, eh; K ek;
        // 计算key的hash值
        int h = spread(key.hashCode());
        // table非空并且对应的hash桶根节点不为空
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            // 查找节点为根节点
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // 树节点或者扩容中(FN节点)
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // 链表遍历查找
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    // 参考HashMap的hash方法,不同之处在于和HASH_BITS进行了一次与操作,最高位变为了0,即为正数,因为前一篇文章也已经说过负数hash值有特殊意义
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }
containsValue

这里通过静态内部类实现Traverser来遍历数组,具体的内部实现查看上篇文章里中的内部类说明,advance相当于查找到下一个非空节点

    public boolean containsValue(Object value) {
        if (value == null)
            throw new NullPointerException();
        Node[] t;
        if ((t = table) != null) {
            Traverser it = new Traverser(t, t.length, 0, t.length);
            for (Node p; (p = it.advance()) != null; ) {
                V v;
                if ((v = p.val) == value || (v != null && value.equals(v)))
                    return true;
            }
        }
        return false;
    }

遍历时遇见特殊节点的处理上一篇文章中已经画图说明,如下:

putVal

putVal整体同HashMap的putVal操作,操作流程上基本类似,只是在多线程操作下需要正确的处理插入值操作,同时如果发现有线程在进行扩容操作时,需帮助扩容,然后再进行插入值的流程操作

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 空值判断
        if (key == null || value == null) throw new NullPointerException();
        // hash值计算,保证了hash值为正数
        int hash = spread(key.hashCode());
        // 当前bin中元素的个数,判断是否树化处理
        int binCount = 0;
        // 无限循环直到被正确处理
        for (Node[] tab = table;;) {
            Node f; int n, i, fh;
            // 空表进行初始化操作
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 计算出的hash桶位置链表头节点无值则通过CAS插入值
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 如果hash桶链表头节点为MOVED状态,即说明有线程在进行扩容操作,则通过helpTransfer帮助扩容操作
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // hash桶链表头节点加锁,在多线程环境下其他线程不能同时操作当前相同的头节点代码块
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        // 正常链表插入操作
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node e = f;; ++binCount) {
                                K ek;
                                // key和hash值相同则进行替换
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node pred = e;
                                // 没匹配到则直接插入到链表尾部
                                if ((e = e.next) == null) {
                                    pred.next = new Node(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 红黑树插入操作
                        else if (f instanceof TreeBin) {
                            Node p;
                            binCount = 2;
                            if ((p = ((TreeBin)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    // 达到树化阈值,则可能进行树化操作
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // size+1操作
        addCount(1L, binCount);
        return null;
    }
resizeStamp

前一篇文章在对sizeCtl说明时在多个线程帮忙扩容时其值小于0时做过一些说明,在源码中涉及到了下面这个方法,先理解清楚这个方法比较重要

这里的参数每次传入的都是当前数组的长度,也就是说每次这里生成的数都与当时扩容时的数组长度有关,Integer.numberOfLeadingZeros(n),返回二进制表示,前面有多少个连续的0,RESIZE_STAMP_BITS固定为16,没发现有提供方法来修改这个变量,位或运算得到一个值,这个值表示了与扩容时数组的长度相关,这里需记得是左移了(RESIZE_STAMP_BITS - 1),因为后边代码中我们需要反向操作右移来重新获取

这里通过这个方法与数组长度关联,同时sizeCtl也会与之关联,同时也记录当前扩容中的线程数,故sizeCtl在扩容中同时兼顾了两种作用,一是判断是否是在同一个批次的扩容中(都是从16扩容到32),同时判断当前扩容中参与的线程数来确定是否结束和初始化操作

    /**
     * Returns the stamp bits for resizing a table of size n.
     * Must be negative when shifted left by RESIZE_STAMP_SHIFT.
     */
    static final int resizeStamp(int n) {
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
addCount

整体来看主要进行了两部分内容,一是更新baseCount,二是检查是否进行扩容操作。其实这个方法里还是相当复杂的,涉及到了线程私有的伪随机数生成器ThreadLocalRandom,并发效率更高的LongAdder,不过初学者可以不用研究那么深入,这里不详细说明,大概了解就好

    private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        // 通过CAS更新baseCount
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            // 更新baseCount失败
            CounterCell a; long v; int m;
            boolean uncontended = true;
            // 相当于每个线程的probe就是它在CounterCell数组中的hash code,用来定位counterCells数组
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                // 更新cellvalue失败则执行fullAddCount,具体不看了,比较复杂,不停尝试更新计数
                // 源码注释上也写了类似LongAdder
                fullAddCount(x, uncontended);
                return;
            }
            // 执行到此说明更新计数器成功,判断是否退出,为什么是1其实还是有点困惑
            if (check <= 1)
                return;
            s = sumCount();
        }
        // check大于0代表着对应hash桶下的节点数,检查是否扩容
        // 满足条件帮助扩容,不满足退出
        if (check >= 0) {
            Node[] tab, nt; int n, sc;
            // 注意,这里条件中3个变量赋值同时while判断
            // sc = sizeCtl,tab = table,n = tab.length
            // 在并发操作中可能会出现变量错误的情况造成扩容处理出错,通过resizeStamp保证扩容时版本一致
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                   (n = tab.length) < MAXIMUM_CAPACITY) {
                // resizeStamp根据n返回一个扩容版本戳,保证唯一性,上边一个方法我已经说明了
                int rs = resizeStamp(n);
                // 说明有别的线程在扩容
                if (sc < 0) {
                    // 判断是否帮助扩容,满足条件,不帮助扩容,这里会分析下,看下面的分析部分
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    // 帮助扩容,线程数+1
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                // 无线程帮助扩容,当前线程尝试成为第一个扩容的线程
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
                s = sumCount();
            }
        }
    }

在上面这段代码中,不帮助扩容的条件中有些地方让人困惑

    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
        transferIndex <= 0)
        break;
(sc >>> RESIZE_STAMP_SHIFT) != rs

首先需要明白上边整个扩容中的第一个线程会通过U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)设置sizeCtl,之后扩容线程增加则通过U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)更新

sc右移RESIZE_STAMP_SHIFT(由于RESIZE_STAMP_BITS不提供修改方法,RESIZE_STAMP_SHIFT也只能取到16),第一个条件为什么是这个?需要结合扩容代码来看,首个线程抢到扩容任务时需先创建nextTable,设置transferIndex,在执行之前需要将sizeCtl更新,即U.compareAndSwapInt(this, SIZECTL, sc,(rs << RESIZE_STAMP_SHIFT) + 2),代码存在于addCount和tryPresize方法中,sizeCtl在每次扩容时会更新成(rs << RESIZE_STAMP_SHIFT) + 2。判断条件里sc无符号右移,如果是相同的一次扩容过程,则与rs相等是肯定的,rs是由resizeStamp根据长度n计算得来,其实最终这里比较的也就是table的长度,防止多次扩容下错误的帮助了扩容

另外在已经有线程扩容的情况下增加扩容线程会会更新sizeCtl,U.compareAndSwapInt(this, SIZECTL, sc, sc + 1) 看出在首次更新的基础上加1即可,扩容线程完成自己的任务同理减1,结合上边对resizeStamp的说明应该算很清楚了

以上部分也证实了上篇文章中sizeCtl注释是不正确的

sc == rs + 1 || sc == rs + MAX_RESIZERS

这个条件是有问题的,sc小于0,rs大于0,两个条件一直为false,没有true的可能,从这个条件上看,应该是判断扩容完毕和扩容线程数达到最大时不能帮助扩容。

我们想一下,第一次线程扩容时已经将sc更新成(rs << RESIZE_STAMP_SHIFT) + 2,这里判断的话需要改为sc == ( rs << RESIZE_STAMP_SHIFT ) +1 才对,不能将sc右移,右移将会导致低16位记录的线程数数据丢失,最大线程数判断同理,应改为sc == ( rs << RESIZE_STAMP_SHIFT ) + MAX_RESIZERS

我在Oracle官网bug库里看到已经提到了这个问题:https://bugs.java.com/bugdata...

(nt = nextTable) == null

此时状态可能表明扩容已经结束或者第一个线程在扩容中,不能帮助扩容

transferIndex <= 0

transfer任务已经被分配完毕,不能分配任务给当前线程,不能帮助扩容,帮助扩容部分下面会说到

helpTransfer

如果正在进行扩容操作,则帮助扩容

    /**
     * Helps transfer if a resize is in progress.
     */
    final Node[] helpTransfer(Node[] tab, Node f) {
        Node[] nextTab; int sc;
        // 判断是否为ForwardingNode并且nextTable是否已经被创建
        if (tab != null && (f instanceof ForwardingNode) &&
            (nextTab = ((ForwardingNode)f).nextTable) != null) {
            // 根据长度获取扩容戳
            int rs = resizeStamp(tab.length);
            // 再次验证是否正在扩容
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                // 几个条件上边已经解释过了,满足不帮助扩容
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                // sizeCtl加1,表示当前线程加入扩容,多了一个线程帮忙
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
tryPresize

预先扩容,putAll和treeifyBin中使用到,不满足table容量时,进行一次扩容操作

    /**
     * Tries to presize table to accommodate the given number of elements.
     *
     * @param size number of elements (doesn"t need to be perfectly accurate)
     */
    private final void tryPresize(int size) {
        // 判断长度是否超过最大值,超过则赋值为最大值,正常则通过tableSizeFor计算扩容后的长度
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        // 未初始化或扩容完成才能执行本次扩容操作
        while ((sc = sizeCtl) >= 0) {
            Node[] tab = table; int n;
            // table未初始化
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                // 置为-1表示数组初始化,前一篇文章已经说明
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node[] nt = (Node[])new Node[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        // 此时相当于阈值
                        sizeCtl = sc;
                    }
                }
            }
            // 已经初始化,扩容长度小于阈值或者大于最大值,不进行扩容操作
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            // 再次验证table未改变
            else if (tab == table) {
                int rs = resizeStamp(n);
                // 同上边代码部分,判断是否帮助扩容
                if (sc < 0) {
                    Node[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }
transfer

table迁移操作,通过transferIndex来完成任务的分配,之前文章变量中也提及了MIN_TRANSFER_STRIDE(最小步长),对每个扩容线程申请迁移的hash桶数量做了限制,每次需要扩容线程执行完毕已经领取完的hash桶迁移任务才可以继续领取任务帮助迁移,最后一个迁移线程在迁移完毕后会进行检查

    private final void transfer(Node[] tab, Node[] nextTab) {
        int n = tab.length, stride;
        // 设置步长,即每个迁移任务迁移多少个hash桶,默认最小迁移步长16
        // 即每个扩容线程最小迁移16个hash桶
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE; // subdivide range
        // nextTab未初始化,则进行初始化操作,这里不需要CAS,调用的地方已经做了控制,保证只有一个线程能执行
        if (nextTab == null) {            // initiating
            try {
                @SuppressWarnings("unchecked")
                // 新数组长度扩容为原有数组的2倍
                Node[] nt = (Node[])new Node[n << 1];
                nextTab = nt;
            } catch (Throwable ex) {      // try to cope with OOME
                // 内存溢出时不能继续扩容
                sizeCtl = Integer.MAX_VALUE;
                return;
            }
            nextTable = nextTab;
            transferIndex = n;
        }
        int nextn = nextTab.length;
        // 设置ForwardingNode节点
        ForwardingNode fwd = new ForwardingNode(nextTab);
        // 线程可以继续分配迁移任务的标识
        boolean advance = true;
        // 设置结束标识
        boolean finishing = false; // to ensure sweep before committing nextTab
        // i表示数组下标,bound表示迁移任务的最小下标
        for (int i = 0, bound = 0;;) {
            Node f; int fh;
            // advance为false则表明当前线程分配的迁移任务未完成或已经扩容完毕
            while (advance) {
                int nextIndex, nextBound;
                // --i 大于等于 bound 则表明本次分配的迁移任务还未完成,将advance置为false
                // 表明不能继续分配迁移任务
                if (--i >= bound || finishing)
                    advance = false;
                // 设置nextIndex
                // 如果小于等于0则表示迁移hash桶已被分配完毕,不用继续,将advance置为false
                else if ((nextIndex = transferIndex) <= 0) {
                    i = -1;
                    advance = false;
                }
                // 设置迁移任务区间bound到i
                else if (U.compareAndSwapInt
                         (this, TRANSFERINDEX, nextIndex,
                          nextBound = (nextIndex > stride ?
                                       nextIndex - stride : 0))) {
                    bound = nextBound;
                    i = nextIndex - 1;
                    advance = false;
                }
            }
            // 上边计算了区间和任务状态
            // i < 0 上边代码已经说明,transfer任务已经执行完毕,退出
            // i >= n 这里n表示的是传入的tab数组长度,而i有可能因为transferIndex改变而改变
            // 比如连续扩容从16扩容到32,然后又从32扩容到64,此时这个条件是可能成立的,这里的i有可能在32到64之间,大于n的32
            // 不在一个扩容维度内,需退出。最后一个条件没看明白是什么情况出现这种状态
            if (i < 0 || i >= n || i + n >= nextn) {
                int sc;
                if (finishing) {
                    // 扩容迁移完毕设置table和sizeCtl
                    nextTable = null;
                    table = nextTab;
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }
                // 线程数减1,表明当前线程退出
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 判断当前线程是否为最后一个扩容线程,不是,则退出,条件可以看上边的说明,已经讲解过
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // 确定当前线程为最后一个扩容线程,则需要进行检查工作
                    // 检查所有的旧数组hash桶是否被正确的迁移
                    finishing = advance = true;
                    i = n; // recheck before commit
                }
            }
            // i处的hash桶为null则直接放置ForwardingNode节点
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // i处的hash桶为ForwardingNode节点
            else if ((fh = f.hash) == MOVED)
                // 最后的线程执行检查
                advance = true; // already processed
            else {
                synchronized (f) {
                    // 再次验证hash桶头节点为f
                    if (tabAt(tab, i) == f) {
                        // 进行迁移任务,类似HashMap,分高位和低位,不明白的可以看我HashMap的文章
                        Node ln, hn;
                        if (fh >= 0) {
                            // 正常链表操作
                            // runBit表明首节点的位置,0则表示在低位,非0表示在高位
                            int runBit = fh & n;
                            Node lastRun = f;
                            // 找到尾部最后一个高低位不同的节点,之后的节点不需要进行操作,直接进行复用
                            for (Node p = f.next; p != null; p = p.next) {
                                int b = p.hash & n;
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            // 低位lastRun在下面循环时使用
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            // 高位lastRun在下面循环时使用
                            else {
                                hn = lastRun;
                                ln = null;
                            }
                            // 确定lastRun为了提高效率,复用原有链表
                            for (Node p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;
                                // 低位链表
                                if ((ph & n) == 0)
                                    ln = new Node(ph, pk, pv, ln);
                                // 高位链表
                                else
                                    hn = new Node(ph, pk, pv, hn);
                            }
                            // 新数组上设置低位链表
                            setTabAt(nextTab, i, ln);
                            // 新数组上设置高位链表
                            setTabAt(nextTab, i + n, hn);
                            // 旧数组i处设置为ForwardingNode节点
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                        // 红黑树通过TreeBin操作
                        else if (f instanceof TreeBin) {
                            TreeBin t = (TreeBin)f;
                            // 同样划分为高低位进行处理,通过链表来操作
                            TreeNode lo = null, loTail = null;
                            TreeNode hi = null, hiTail = null;
                            int lc = 0, hc = 0;
                            for (Node e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode p = new TreeNode
                                    (h, e.key, e.val, null, null);
                                // 判断是低位还是高位然后修改链表关系
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                }
                                else {
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }
                            // 同链表类似,判断下是否需转成链表,通过TreeBin将高低位链表构建成红黑树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin(hi) : t;
                            setTabAt(nextTab, i, ln);
                            setTabAt(nextTab, i + n, hn);
                            setTabAt(tab, i, fwd);
                            advance = true;
                        }
                    }
                }
            }
        }
    }

迁移任务是从数组尾部向头部进行,这样做的目的应该是与迭代正向操作相反来减少冲突,当迭代时是已经迁移好的hash桶,迁移时是已经迭代完毕的hash桶

clear

清空操作,比较简单

    public void clear() {
        // 删除节点数记录最后需要更新
        long delta = 0L; // negative number of deletions
        // 数组下标
        int i = 0;
        Node[] tab = table;
        while (tab != null && i < tab.length) {
            int fh;
            Node f = tabAt(tab, i);
            // hash桶首节点为null表明不需要执行
            if (f == null)
                ++i;
            // 扩容中帮助扩容然后重新开始循环清空操作
            else if ((fh = f.hash) == MOVED) {
                tab = helpTransfer(tab, f);
                i = 0; // restart
            }
            // 正常链表或TreeBin节点
            else {
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        Node p = (fh >= 0 ? f :
                                       (f instanceof TreeBin) ?
                                       ((TreeBin)f).first : null);
                        // 获取hash桶的节点数
                        while (p != null) {
                            --delta;
                            p = p.next;
                        }
                        // 将hash桶置null
                        setTabAt(tab, i++, null);
                    }
                }
            }
        }
        // 更新数组长度
        if (delta != 0L)
            addCount(delta, -1);
    }
总结

本文紧接上一篇文章讲解了ConcurrentHashMap的重要的方法,对于一些变量和常量结合方法进行了更多的解释说明,本身而言还是比较复杂,其中部分笔者也不能完全理解,不过整体的流程有了一个更清晰的认知,重点需要理解的在下面几点:

涉及到Map长度的计算:通过counterCells完成以及通过addCount进行长度的更新

扩容操作:sizeCtl的设置以及更新和各种情况下对应的含义

迁移操作:迁移步长,线程检查

节点类型:几种节点类型的不同处理方式

当然,有些条件可能比较复杂,难以理解,只能尽力多看多想,希望对各位有所帮助

以上内容如有问题欢迎指出,笔者验证后将及时修正,谢谢

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75191.html

相关文章

  • JDK源码那些事儿并发ConcurrentHashMap上篇

    前面已经说明了HashMap以及红黑树的一些基本知识,对JDK8的HashMap也有了一定的了解,本篇就开始看看并发包下的ConcurrentHashMap,说实话,还是比较复杂的,笔者在这里也不会过多深入,源码层次上了解一些主要流程即可,清楚多线程环境下整个Map的运作过程就算是很大进步了,更细的底层部分需要时间和精力来研究,暂不深入 前言 jdk版本:1.8 JDK7中,ConcurrentH...

    Leck1e 评论0 收藏0
  • JDK源码那些事儿红黑树基础下篇

    摘要:强调一下,红黑树中的叶子节点指的都是节点。故删除之后红黑树平衡不用调整。将达到红黑树平衡。到此关于红黑树的基础已经介绍完毕,下一章我将就源码中的进行讲解说明,看一看红黑树是如何在源码中实现的。 说到HashMap,就一定要说到红黑树,红黑树作为一种自平衡二叉查找树,是一种用途较广的数据结构,在jdk1.8中使用红黑树提升HashMap的性能,今天就来说一说红黑树,上一讲已经给出插入平衡...

    罗志环 评论0 收藏0
  • JDK源码那些事儿常用的ArrayList

    摘要:前面已经讲解集合中的并且也对其中使用的红黑树结构做了对应的说明,这次就来看下简单一些的另一个集合类,也是日常经常使用到的,整体来说,算是比较好理解的集合了,一起来看下前言版本类定义继承了,实现了,提供对数组队列的增删改查操作实现接口,提供随 前面已经讲解集合中的HashMap并且也对其中使用的红黑树结构做了对应的说明,这次就来看下简单一些的另一个集合类,也是日常经常使用到的ArrayL...

    hizengzeng 评论0 收藏0
  • ConcurrentHashMap 源码阅读小结

    摘要:如果冲突了,同步头节点,进行链表操作,如果链表长度达到,分成红黑树。每次加入一个线程都会将的低位加一。扩容最大的帮助线程是,这是低位的最大值限制的。线程处理完之后,如果没有可选区间,且任务没有完成,就会将整个表检查一遍,防止遗漏。 前言 每一次总结都意味着重新开始,同时也是为了更好的开始。ConcurrentHashMap 一直是我心中的痛。虽然不敢说完全读懂了,但也看了几个重要的方法...

    The question 评论0 收藏0

发表评论

0条评论

Zack

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<