摘要:注意是一个假节点,阻塞的节点是作为后面的节点出现的。总之在非公平锁场景下尝试去获取锁,如果获取上了,则置一下状态,并设置自己为独占线程,并支持重入锁功能。方法用于创建一个节点值为当前线程并维护一个双向链表。阻塞了当前线程。
部分段落来自于http://javadoop.com/post/Abst...,他的文章相当不错。
ReentrantLock基于Sync内部类来完成锁。Sync继承于AbstractQueuedSynchronizer。Sync有两个不同的子类NonfairSync和FairSync。
ReentrantLock的大部分方法都是基于AbstractQueuedSynchronizer实现,大部分仅仅是对AbstractQueuedSynchronizer的转发。因此,了解AbstractQueuedSynchronizer就非常重要。
作为AbstractQueuedSynchronizer的实现者需要实现isHeldExclusively,tryAcquire,tryRelease,(可选tryAcquireShared,tryReleaseShared)
那么我们看看对于一个常用的套路,ReentrantLock是如何实现同步的
lock.lock(); try{ i++; }finally { lock.unlock(); }
lock.lock()内部实现为调用了sync.lock(),之后又会调用NonfairSync或FairSync的lock(),你看果然重度使用了AQS吧,这里我们先记住这个位置,一会我们还会回来分析。
public void lock() { sync.lock(); }
先介绍一下AQS里面的属性,不复杂就4个主要的属性:AQS里面阻塞的节点是作为队列出现的,维护了一个head节点和tail节点,同时维护了一个阻塞状态,如果state=0表示没有锁,如果state>0表示锁被重入了几次。
注意head是一个假节点,阻塞的节点是作为head后面的节点出现的。
// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的 private transient volatile Node head; // 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个隐视的链表 private transient volatile Node tail; // 这个是最重要的,不过也是最简单的,代表当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁 // 之所以说大于0,而不是等于1,是因为锁可以重入嘛,每次重入都加上1 private volatile int state; // 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入 // reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
接着看一下FairSync和NonfairSync的实现,FairSync和NonfairSync都继承了Sync,而且Sync又继承了AbstractQueuedSynchronizer。可以看到FairSync和NonfairSync直接或间接的实现了isHeldExclusively,tryAcquire,tryRelease这三个方法。
abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //如果没有锁上,则设置为锁上并设置自己为独占线程 if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果锁上了,而且独占线程是自己,那么重新设置state+1,并且返回true else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //否则返回false return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don"t need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes the instance from a stream (that is, deserializes it). */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { //如果没有人锁上,那么就设置我自己为独占线程,否则再acquire一次 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //调用到了AQS的acquire里面 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /** * Fair version of tryAcquire. Don"t grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
之前我们说到回到ReentrantLock的lock()调用了sync.lock();现在我们回来看看非公平锁的逻辑是:如果抢到锁了,则设置自己的线程为占有锁的线程,否则调用acquire(1),这个是AQS的方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
acquire会调用tryAcquire,而这个是对于不同的实现是不一样的,非公平锁NonfairSync里面的tryAcquire,而tryAcquire又会调用到Sync的nonfairTryAcquire。总之tryAcquire在非公平锁场景下尝试去获取锁,如果获取上了,则置一下AQS状态state,并设置自己为独占线程,并支持重入锁功能。
addWaiter方法用于创建一个节点(值为当前线程)并维护一个双向链表。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
现在说一下Node的结构,主要有用的field为waitStatus,prev,next,thread。waitStatus目前仅要了解1,0,-1就够了。 0是默认状态,1代表争取锁取消,-1表示它的后继节点对应的线程需要被唤醒。也就是说这个waitStatus其实代表的不是自己的状态,而是后继节点的状态。可以看见默认进队的节点的waitStatus都是0
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 标识节点当前在共享模式下 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 标识节点当前在独占模式下 static final Node EXCLUSIVE = null; // ======== 下面的几个int常量是给waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代码此线程取消了争抢这个锁 static final int CANCELLED = 1; /** waitStatus value to indicate successor"s thread needs unparking */ // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,所以略过吧 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 同样的不分析,略过吧 static final int PROPAGATE = -3; // ===================================================== // 取值为上面的1、-1、-2、-3,或者0(以后会讲到) // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待, // 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。 volatile int waitStatus; // 前驱节点的引用 volatile Node prev; // 后继节点的引用 volatile Node next; // 这个就是线程本尊 volatile Thread thread; }
acquireQueued的作用是从等待队列中尝试去把入队的那个节点去做park。另外当节点unpark以后,也会在循环中将自己设置成头结点,然后自己拿到锁
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //对于队首节点,刚才也许没有抢到锁,现在也许能抢到了,再试一次 if (p == head && tryAcquire(arg)) { //如果抢到了锁,这个入队的节点根本不需要park,直接可以执行 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //如果不是队首节点,或者是队首但是没有抢过其他节点 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
shouldParkAfterFailedAcquire。这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?第一个参数是前驱节点,第二个参数才是代表当前线程的节点。注意因为默认加入的节点的状态都是0,这个方法会进来两次,第一次进来走到else分支里面修改前置节点的waitStatus为-1.第二次进来直接返回true。对于刚加入队列的节点,修改head节点的waitStatus为-1,对于后来加入的节点,修改它前一个节点的waitStatus为-1。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don"t park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
parkAndCheckInterrupt的代码很简单,这个this就是ReentrantLock类的实例。阻塞了当前线程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
再来看看怎么解锁。
public void unlock() { sync.release(1); }
调用到AQS里面,如果锁被完全释放了,那么就unpark head的下一个
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
tryRelease是由Sync覆盖的。重置AQS里面的state,返回锁是否被完全释放了的判断。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ //下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1) // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
等到unpark以后,parkAndCheckInterrupt的阻塞解除,将继续for无限循环,因为是队列里是一个一个阻塞的,此时阻塞节点的前置依次都是head,因此if (p == head && tryAcquire(arg)) 这句话如果它醒来抢锁成功了将执行成功,阻塞的线程获取锁并执行,将自己设置成head,同时也将自己从队列中清除出去。 注意这里是非公平锁,因此在tryAcquire有可能还没有抢过其他线程,那么抢到的那个将会直接执行,而没有抢到的,又在循环里锁住了。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/68033.html
摘要:公平策略在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。使用方式的典型调用方式如下二类原理的源码非常简单,它通过内部类实现了框架,接口的实现仅仅是对的的简单封装,参见原理多线程进阶七锁框架独占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首发于一世流云的专栏:https...
摘要:所谓的重入,就是当本线程想再次获得锁,不需要重新申请,它本身就已经锁了,即重入该锁。如果不为,则表示有线程已经占有了。总结回顾下要点是一个可重入的锁被当前占用的线程重入。 上一章《AQS源码阅读》讲了AQS框架,这次讲讲它的应用类(注意不是子类实现,待会细讲)。ReentrantLock,顾名思义重入锁,但什么是重入,这个锁到底是怎样的,我们来看看类的注解说明showImg(http:...
摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...
摘要:不同的是它还多了内部类和内部类,以及读写对应的成员变量和方法。另外是给和内部类使用的。内部类前面说到的操作是分配到里面执行的。他们都是接口的实现,所以其实最像应该是这个两个内部类。而且大体上也没什么差异,也是用的内部类。 之前讲了《AQS源码阅读》和《ReentrantLock源码阅读》,本次将延续阅读下ReentrantReadWriteLock,建议没看过之前两篇文章的,先大概了解...
摘要:关于,最后有两点规律需要注意当的等待队列队首结点是共享结点,说明当前写锁被占用,当写锁释放时,会以传播的方式唤醒头结点之后紧邻的各个共享结点。当的等待队列队首结点是独占结点,说明当前读锁被使用,当读锁释放归零后,会唤醒队首的独占结点。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首发于一世流云的专栏:...
阅读 2686·2021-10-22 09:55
阅读 2013·2021-09-27 13:35
阅读 1269·2021-08-24 10:02
阅读 1482·2019-08-30 15:55
阅读 1200·2019-08-30 14:13
阅读 3473·2019-08-30 13:57
阅读 1977·2019-08-30 11:07
阅读 2450·2019-08-29 17:12