摘要:数据结构重要成员变量代表整个哈希表。科普,解决多线程并行情况下使用锁造成性能损耗的一种机制,操作包含三个操作数内存位置预期原值和新值。
ConcurrenHashMap 。下面分享一下我对ConcurrentHashMap 的理解,主要用于个人备忘。如果有不对,请批评。
HashMap“严重”的勾起了我对HashMap家族的好奇心,下面分享一下我对ConcurrentHashMap 的理解,主要用于个人备忘。如果有不对,请批评。
想要解锁更多新姿势?请访问https://blog.tengshe789.tech/
总起HashMap是我们平时开发过程中用的比较多的集合,但它是非线程安全的,在涉及到多线程并发的情况,进行get操作有可能会引起死循环,导致CPU利用率接近100%。
因此需要支持线程安全的并发容器 ConcurrentHashMap 。
数据结构 重要成员变量/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node[] table;
table代表整个哈希表。 默认为null,初始化发生在第一次插入操作,默认大小为16的数组,用来存储Node节点数据,扩容时大小总是2的幂次方。
/** * The next table to use; non-null only while resizing. */ private transient volatile Node[] nextTable;
nextTable是一个连接表,用于哈希表扩容,默认为null,扩容时新生成的数组,其大小为原数组的两倍。
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount;
baseCount保存着整个哈希表中存储的所有的结点的个数总和,有点类似于 HashMap 的 size 属性。 这个数通过CAS算法更新
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
初始化哈希表和扩容 rehash 的过程,都需要依赖sizeCtl。该属性有以下几种取值:
0:默认值
-1:代表哈希表正在进行初始化
大于0:相当于 HashMap 中的 threshold,表示阈值
小于-1:代表有多个线程正在进行扩容。(譬如:-N 表示有N-1个线程正在进行扩容操作 )
构造方法public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//MAXIMUM_CAPACITY = 1 << 30 this.sizeCtl = cap;//ConcurrentHashMap在构造函数中只会初始化sizeCtl值,并不会直接初始化table,而是延缓到第一次put操作。 } public ConcurrentHashMap(Map extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY;//DEFAULT_CAPACITY = 16 putAll(m); }
构造方法是三个。重点是第二个,带参的构造方法。这个带参的构造方法会调用tableSizeFor()方法,确保table的大小总是2的幂次方(假设参数为100,最终会调整成256)。算法如下:
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }PUT()方法
put()调用putVal()方法,让我们看看:
final V putVal(K key, V value, boolean onlyIfAbsent) { //对传入的参数进行合法性判断 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//计算键所对应的 hash 值 int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; //如果哈希表还未初始化,那么初始化它 if (tab == null || (n = tab.length) == 0) tab = initTable(); //根据hash值计算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果这个位置没有值 ,那么以CAS无锁式向该位置添加一个节点 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } //检测到桶结点是 ForwardingNode 类型,协助扩容(MOVED = -1; // hash for forwarding nodes) else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //桶结点是普通的结点,锁住该桶头结点并试图在该链表的尾部添加一个节点 else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //向普通的链表中添加元素 if (fh >= 0) { binCount = 1; //遍历链表所有的结点 for (Node e = f;; ++binCount) { K ek; //如果hash值和key值相同,则修改对应结点的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //如果遍历到了最后一个结点,那么就证明新的节点需要插入链表尾部 if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } //如果这个节点是树节点,就按照树的方式插入值 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果链表长度已经达到临界值8,就需要把链表转换为树结构(TREEIFY_THRESHOLD = 8) if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //CAS 式更新baseCount,并判断是否需要扩容 addCount(1L, binCount); return null; }
其实putVal()也多多少少掉用了其他方法,让我们继续探究一下。
CAS(compare and swap)科普compare and swap,解决多线程并行情况下使用锁造成性能损耗的一种机制,CAS操作包含三个操作数——内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。CAS有效地说明了“我认为位置V应该包含值A;如果包含该值,则将B放到这个位置;否则,不要更改该位置,只告诉我这个位置现在的值即可。
spread首先,第四行出现的int hash = spread(key.hashCode());这是传统的计算hash的方法。key的hash值高16位不变,低16位与高16位异或作为key的最终hash值。(h >>> 16,表示无符号右移16位,高位补0,任何数跟0异或都是其本身,因此key的hash值高16位不变。)
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }initTable
第十行, tab = initTable();这个方法的亮点是,可以让put并发执行,实现table只初始化一次 。
initTable()核心思想就是,只允许一个线程对表进行初始化,如果有其他线程进来了,那么会让其他线程交出 CPU 等待下次系统调度。这样,保证了表同时只会被一个线程初始化。
private final Node[] initTable() { Node [] tab; int sc; //如果表为空才进行初始化操作 while ((tab = table) == null || tab.length == 0) { //如果一个线程发现sizeCtl<0,意味着另外的线程执行CAS操作成功,当前线程只需要让出cpu时间片(放弃 CPU 的使用) if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //否则说明还未有线程对表进行初始化,那么本线程就来做这个工作 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //sc 大于零说明容量已经初始化了,否则使用默认容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //计算阈值,等效于 n*0.75 sc = n - (n >>> 2); } } finally { //设置阈值 sizeCtl = sc; } break; } } return tab; }
接下来,第19行。 tab = helpTransfer(tab, f);这句话。要了解这个,首先需要知道ForwardingNode 这个节点类型。它一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张hash表。而且这个节点的key、value、next指针全部为null,它的hash值为MOVED(static final int MOVED = -1)。
static final class ForwardingNodehelpTransferextends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找 Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
在扩容操作中,我们需要对每个桶中的结点进行分离和转移。如果某个桶结点中所有节点都已经迁移完成了(已经被转移到新表 nextTable 中了),那么会在原 table 表的该位置挂上一个 ForwardingNode 结点,说明此桶已经完成迁移。
helpTransfer什么作用呢?是检测到当前哈希表正在扩容,然后让当前线程去协助扩容 !
final Node[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) {//新的table,nextTab已经存在前提下才能帮助扩容 int rs = resizeStamp(tab.length);//返回一个 16 位长度的扩容校验标识 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {//sizeCtl 如果处于扩容状态的话 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) //前 16 位是数据校验标识,后 16 位是当前正在扩容的线程总数 //这里判断校验标识是否相等,如果校验符不等或者扩容操作已经完成了,直接退出循环,不用协助它们扩容了 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//sc + 1 标识增加了一个线程进行扩容 transfer(tab, nextTab);//调用扩容方法 break; } } return nextTab; } return table; }
helpTransfer精髓的是可以调用多个工作线程一起帮助进行扩容,这样的效率就会更高,而不是只有检查到要扩容的那个线程进行扩容操作,其他线程就要等待扩容操作完成才能工作。
transfer既然这里涉及到扩容的操作,我们也一起来看看扩容方法transfer() :
private final void transfer(Node[] tab, Node [] nextTab) { int n = tab.length, stride; //计算单个线程允许处理的最少table桶首节点个数,不能小于 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //刚开始扩容,初始化 nextTab if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //transferIndex 指向最后一个桶,方便从后向前遍历 transferIndex = n; } int nextn = nextTab.length; //定义 ForwardingNode 用于标记迁移完成的桶 ForwardingNode fwd = new ForwardingNode (nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //i 指向当前桶,bound 指向当前线程需要处理的桶结点的区间下限 for (int i = 0, bound = 0;;) { Node f; int fh; //遍历当前线程所分配到的桶结点 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; //transferIndex <= 0 说明已经没有需要迁移的桶了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //更新 transferIndex //为当前线程分配任务,处理的桶结点区间为(nextBound,nextIndex) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //当前线程所有任务完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //待迁移桶为空,那么在此位置 CAS 添加 ForwardingNode 结点标识该桶已经被处理过了 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果扫描到 ForwardingNode,说明此桶已经被处理过了,跳过即可 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; //链表的迁移操作 if (fh >= 0) { int runBit = fh & n; Node lastRun = f; //整个 for 循环为了找到整个桶中最后连续的 fh & n 不变的结点 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //红黑树的复制算法, else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
至此,put方法讲完了
参考资料~参考资料
感谢
结束
此片完了~
想要了解更多精彩新姿势?请访问我的个人博客
本篇为原创内容,已经于07-06在个人博客率先发表,随后CSDN,segmentfault,juejin同步发出。如有雷同,
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76697.html
摘要:变量提升变量的声明写在可以在使用变量之后函数提升函数可以先调用,后声明上面先解释了下我理解的这两个概念的定义。参考前端基础进阶三变量对象详解关于变量提升的理解 变量提升:变量的声明写在可以在使用变量之后;函数提升:函数可以先调用,后声明; 上面先解释了下我理解的这两个概念的定义。要真正理解它们,最好从变量对象的角度出发。引出变量对象的概念,要先理解执行上下文,也就是当控制器执行到可执行...
摘要:当容量超过容量负载因子时,进行扩容操作确定何时将冲突的链表转换成红黑树用来确何时将红黑树转换成链表当链表转换成红黑树时,需要判断数组容量。桶排序核心思想是根据数据规模划分,个相同大小的区间每个区间为一个桶,桶可理解为容器。 刚刚看到QQ群有人吹Hashmap,一想我啥都不懂,就赶快补了一波。下面分享一下我对Hashmap的理解,主要用于个人备忘。如果有不对,请批评。想要解锁更多新姿势?...
摘要:对于一棵有效的红黑树二叉树而言我们必须增加如下规则每个节点都只能是红色或者黑色根节点是黑色每个叶节点节点,空节点是黑色的。这些约束强制了红黑树的关键性质从根到叶子的最长的可能路径不多于最短的可能路径的两倍长。 群里的大哥说了,要想懂红黑树的应用,先要看TreeMap。 想要解锁更多新姿势?请访问http://blog.tengshe789.tech/ OK,现在开始: 红黑树简介 红黑...
摘要:关于变量的值的类型的总结。所以此时指向新的对象还是指向被添加了属性的老对象, 关于变量的值的类型的总结。 //1.当多个变量的值是非引用类型var a=1;var b=a; //系统复制了a的值并赋值给ba++; //a自身的值被改变,而b的值不受影响 a b的值虽相等但互不影响console.log(a)//2console.log(b)//1 ...
阅读 3404·2021-11-19 09:40
阅读 1297·2021-10-11 11:07
阅读 4806·2021-09-22 15:07
阅读 2871·2021-09-02 15:15
阅读 1947·2019-08-30 15:55
阅读 518·2019-08-30 15:43
阅读 863·2019-08-30 11:13
阅读 1430·2019-08-29 15:36