摘要:我们可以将的作用理解为在多线程的环境下保证线程等待获取锁添加进入队列以及线程获取锁,并队列中出去都是线程安全的。是如何做到线程安全的主要是通过死循环以及状态值,来做到线程安全。
1、什么是aqs
aqs是一个FIFO的双向链表队列。aqs将等待获取锁的线程封装成结点,放在队列中。
我们可以将aqs的作用理解为在多线程的环境下保证线程等待获取锁(添加进入队列)以及线程获取锁,并队列中出去都是线程安全的。
更简单的可以理解为aqs为了保证在多线程的环境下入队列和出队列的线程安全性提供了一个基本功能框架。
2、aqs是如何做到线程安全的
aqs主要是通过cas + 死循环以及state状态值,来做到线程安全。
3、aqs为什么会被设计为FIFO双向链表队列(以下是个人理解)
①aqs的锁实现,包含公平锁和非公平锁。为了实现公平锁,必须使用队列来保证获取锁的顺序(入队列的顺序)
②用链表的方式,主要是因为,操作更多是删除与增加。链表时间复杂度O(1)的效率会比数组O(n)的低。
③用双向队列的原因是,aqs的设计思想,或则说为了解决羊群效应(为了争夺锁,大量线程同时被唤醒)。每个结点(线程)只需要关心自己的前一个结点的状态(后续会说),线程唤醒也只唤醒队头等待线程。
请参考 http://www.importnew.com/2400...
4、aqs是如何提供一个基础框架的
aqs 通过模板设计进行提供的,实现类只需实现特定的方法即可。
以下是aqs的模板方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } 。。。 其他的省略了 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
tryAcquire(int arg),tryRelease(int arg) 是我们要实现的模板方法,当然还有分享锁的,这里只介绍了独占锁的。
5、从源码角度剖析aqs。aqs是如何通过双向链表队列,cas,state状态值,以及结点状态来保证入队列出队列的线程安全的!
注:以下只介绍独占式的不公平锁
①aqs 如何获取锁?
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire(arg) 内部调用了nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 锁未被获取 // cas(自旋) 获取锁,并修改state 状态值 if (compareAndSetState(0, acquires)) { // 设置当前占有的线程 setExclusiveOwnerThread(current); return true; } } // 重入 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
解释:利用cas自旋式的获取锁。
②aqs 获取锁失败,如何处理?
在看代码前,先解释一下:将当前线程包装成Node结点,并插入同步队列中,并用CAS形式尝试获取锁,获取失败,则挂起当前线程(以上只是说了大概)
先看第1个方法(将当前线程包装成Node结点,并插入同步队列)
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { // 尾节点不为空 node.prev = pred; // 用 CAS 将当前线程插入队尾 if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 尾节点为空,说明当前队列还是空的,需要初始化 enq(node); return node; } private Node enq(final Node node) { // 死循环 for (;;) { // 初始化 Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { // 这里主要是担心有多个线程同时进到enq(final Node node) 方法 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
解释:队列若为空,先初始化,不为空,用 CAS 将当前结点插入到队尾
再看第二个方法final boolean acquireQueued(final Node node, int arg);
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取前置结点 final Node p = node.predecessor(); // 前置结点是头结点,则尝试获取锁 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 前置结点不是头结点 或者 前置结点是头结点但是尝试获取锁失败 // 则,应当将当前线程挂起(毕竟不能一直死循环获取吧~) if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { // 当前线程的前置节点的状态!!! // 第waitStatus 初始化值为0, // 也因此当第1次进到这个方法时,会将前置结点的状态置为 Node.SIGNAL。 // 第 2次进来的时候,前置节点的waitStatus的状态就为 Node.SIGNAL)。 // 也就是说。aqs 只会让你尝试2次,都失败后,就会被挂起 int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 线程被挂起调用该方法!! private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
让我们总结一下,以及再回顾一下,为什么aqs会被设计为双向链表队列。
aqs为了保证结点(即线程)的入队列的安全。采用了CAS 以及死循环的方式(从代码中可看到,处处使用CAS)。
上面有说到,一个线程是否该被唤醒或者其他操作,只需要看前置结点的状态即可。从shouldParkAfterFailedAcquire() 方法就可以看出这个设计。当前线程该做什么操作,是看前置结点的状态的。
③aqs如何释放锁
看代码前,先解释一下,aqs是如何做的。aqs的做法就是,释放当前锁,然后唤醒头结点的后继结点,如果后继结点为空,或者是被取消的,则从尾节点向前寻找一个未被取消的结点
public final boolean release(int arg) { // 尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒后继结点 unparkSuccessor(h); return true; } return false; }
①ReentratLock 是如何实现锁的释放的
注:这里看的是ReentrantLock的实现
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
解释:设置 state 的状态,如果 state == 0, 那么说明锁被释放了。否则锁还未被释放(锁重入!)
②aqs 如何唤醒其他结点
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ // 清除状态,还记得等待的线程会把前置节点的状态置为 Node.SIGNAL(-1)吗 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 正常情况下,下一个结点就是被唤醒的节点。 // 但是如果下一个结点为null, 或者是被取消的 // 那么从尾节点向前查找一个未被取消的节点唤醒。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒 LockSupport.unpark(s.thread); }
release的释放比较简单。还是可以看到,aqs被设计成双向链表队列的好处!!!
看源代码,不能一下子就扎进去看,要先明白个大概,为什么看源代码?还不是为了学习作者是如何设计的。细节无论谁都记不清,最主要的是知道一个整体的流程,关键的代码!毕竟优秀的开源项目这么多,难道每行代码都看??
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76678.html
摘要:关于,最后有两点规律需要注意当的等待队列队首结点是共享结点,说明当前写锁被占用,当写锁释放时,会以传播的方式唤醒头结点之后紧邻的各个共享结点。当的等待队列队首结点是独占结点,说明当前读锁被使用,当读锁释放归零后,会唤醒队首的独占结点。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首发于一世流云的专栏:...
摘要:为了避免一篇文章的篇幅过长,于是一些比较大的主题就都分成几篇来讲了,这篇文章是笔者所有文章的目录,将会持续更新,以给大家一个查看系列文章的入口。 前言 大家好,笔者是今年才开始写博客的,写作的初衷主要是想记录和分享自己的学习经历。因为写作的时候发现,为了弄懂一个知识,不得不先去了解另外一些知识,这样以来,为了说明一个问题,就要把一系列知识都了解一遍,写出来的文章就特别长。 为了避免一篇...
摘要:在时,引入了包,该包中的大多数同步器都是基于来构建的。框架提供了一套通用的机制来管理同步状态阻塞唤醒线程管理等待队列。指针用于在结点线程被取消时,让当前结点的前驱直接指向当前结点的后驱完成出队动作。 showImg(https://segmentfault.com/img/remote/1460000016012438); 本文首发于一世流云的专栏:https://segmentfau...
摘要:当线程使用完共享资源后,可以归还许可,以供其它需要的线程使用。所以,并不会阻塞调用线程。立即减少指定数目的可用许可数。方法用于将可用许可数清零,并返回清零前的许可数六的类接口声明类声明构造器接口声明 showImg(https://segmentfault.com/img/bVbfdnC?w=1920&h=1200); 本文首发于一世流云的专栏:https://segmentfault...
阅读 895·2022-06-21 15:13
阅读 1801·2021-10-20 13:48
阅读 953·2021-09-22 15:47
阅读 1340·2019-08-30 15:55
阅读 3095·2019-08-30 15:53
阅读 498·2019-08-29 12:33
阅读 693·2019-08-28 18:15
阅读 3441·2019-08-26 13:58