资讯专栏INFORMATION COLUMN

Java多线程框架源码阅读之---ReentrantLock非公平锁

zacklee / 2738人阅读

摘要:注意是一个假节点,阻塞的节点是作为后面的节点出现的。总之在非公平锁场景下尝试去获取锁,如果获取上了,则置一下状态,并设置自己为独占线程,并支持重入锁功能。方法用于创建一个节点值为当前线程并维护一个双向链表。阻塞了当前线程。

部分段落来自于http://javadoop.com/post/Abst...,他的文章相当不错。

ReentrantLock基于Sync内部类来完成锁。Sync继承于AbstractQueuedSynchronizer。Sync有两个不同的子类NonfairSync和FairSync。

ReentrantLock的大部分方法都是基于AbstractQueuedSynchronizer实现,大部分仅仅是对AbstractQueuedSynchronizer的转发。因此,了解AbstractQueuedSynchronizer就非常重要。

作为AbstractQueuedSynchronizer的实现者需要实现isHeldExclusively,tryAcquire,tryRelease,(可选tryAcquireShared,tryReleaseShared)

那么我们看看对于一个常用的套路,ReentrantLock是如何实现同步的

lock.lock();
try{
   i++;
}finally {
   lock.unlock();
}

lock.lock()内部实现为调用了sync.lock(),之后又会调用NonfairSync或FairSync的lock(),你看果然重度使用了AQS吧,这里我们先记住这个位置,一会我们还会回来分析。

public void lock() {
    sync.lock();
}

先介绍一下AQS里面的属性,不复杂就4个主要的属性:AQS里面阻塞的节点是作为队列出现的,维护了一个head节点和tail节点,同时维护了一个阻塞状态,如果state=0表示没有锁,如果state>0表示锁被重入了几次。
注意head是一个假节点,阻塞的节点是作为head后面的节点出现的。

// 头结点,你直接把它当做 当前持有锁的线程 可能是最好理解的
private transient volatile Node head;
// 阻塞的尾节点,每个新的节点进来,都插入到最后,也就形成了一个隐视的链表
private transient volatile Node tail;
// 这个是最重要的,不过也是最简单的,代表当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁
// 之所以说大于0,而不是等于1,是因为锁可以重入嘛,每次重入都加上1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer

接着看一下FairSync和NonfairSync的实现,FairSync和NonfairSync都继承了Sync,而且Sync又继承了AbstractQueuedSynchronizer。可以看到FairSync和NonfairSync直接或间接的实现了isHeldExclusively,tryAcquire,tryRelease这三个方法。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();

    /**
     * Performs non-fair tryLock.  tryAcquire is implemented in
     * subclasses, but both need nonfair try for trylock method.
     */
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
     //如果没有锁上,则设置为锁上并设置自己为独占线程
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
     //如果锁上了,而且独占线程是自己,那么重新设置state+1,并且返回true
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
     //否则返回false
        return false;
    }

    protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    protected final boolean isHeldExclusively() {
        // While we must in general read state before owner,
        // we don"t need to do so to check if current thread is owner
        return getExclusiveOwnerThread() == Thread.currentThread();
    }

    final ConditionObject newCondition() {
        return new ConditionObject();
    }

    // Methods relayed from outer class

    final Thread getOwner() {
        return getState() == 0 ? null : getExclusiveOwnerThread();
    }

    final int getHoldCount() {
        return isHeldExclusively() ? getState() : 0;
    }

    final boolean isLocked() {
        return getState() != 0;
    }

    /**
     * Reconstitutes the instance from a stream (that is, deserializes it).
     */
    private void readObject(java.io.ObjectInputStream s)
        throws java.io.IOException, ClassNotFoundException {
        s.defaultReadObject();
        setState(0); // reset to unlocked state
    }
}
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //如果没有人锁上,那么就设置我自己为独占线程,否则再acquire一次
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //调用到了AQS的acquire里面
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }

    /**
     * Fair version of tryAcquire.  Don"t grant access unless
     * recursive call or no waiters or is first.
     */
    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

之前我们说到回到ReentrantLock的lock()调用了sync.lock();现在我们回来看看非公平锁的逻辑是:如果抢到锁了,则设置自己的线程为占有锁的线程,否则调用acquire(1),这个是AQS的方法。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire会调用tryAcquire,而这个是对于不同的实现是不一样的,非公平锁NonfairSync里面的tryAcquire,而tryAcquire又会调用到Sync的nonfairTryAcquire。总之tryAcquire在非公平锁场景下尝试去获取锁,如果获取上了,则置一下AQS状态state,并设置自己为独占线程,并支持重入锁功能。

addWaiter方法用于创建一个节点(值为当前线程)并维护一个双向链表。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

现在说一下Node的结构,主要有用的field为waitStatus,prev,next,thread。waitStatus目前仅要了解1,0,-1就够了。 0是默认状态,1代表争取锁取消,-1表示它的后继节点对应的线程需要被唤醒。也就是说这个waitStatus其实代表的不是自己的状态,而是后继节点的状态。可以看见默认进队的节点的waitStatus都是0

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 标识节点当前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 标识节点当前在独占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的几个int常量是给waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代码此线程取消了争抢这个锁
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor"s thread needs unparking */
    // 官方的描述是,其表示当前node的后继节点对应的线程需要被唤醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,所以略过吧
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同样的不分析,略过吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
    // 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
    // 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驱节点的引用
    volatile Node prev;
    // 后继节点的引用
    volatile Node next;
    // 这个就是线程本尊
    volatile Thread thread;
}

acquireQueued的作用是从等待队列中尝试去把入队的那个节点去做park。另外当节点unpark以后,也会在循环中将自己设置成头结点,然后自己拿到锁

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //对于队首节点,刚才也许没有抢到锁,现在也许能抢到了,再试一次
                if (p == head && tryAcquire(arg)) {
                    //如果抢到了锁,这个入队的节点根本不需要park,直接可以执行
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果不是队首节点,或者是队首但是没有抢过其他节点
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire。这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?第一个参数是前驱节点,第二个参数才是代表当前线程的节点。注意因为默认加入的节点的状态都是0,这个方法会进来两次,第一次进来走到else分支里面修改前置节点的waitStatus为-1.第二次进来直接返回true。对于刚加入队列的节点,修改head节点的waitStatus为-1,对于后来加入的节点,修改它前一个节点的waitStatus为-1。

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don"t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt的代码很简单,这个this就是ReentrantLock类的实例。阻塞了当前线程。

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

再来看看怎么解锁。

public void unlock() {
    sync.release(1);
}

调用到AQS里面,如果锁被完全释放了,那么就unpark head的下一个

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease是由Sync覆盖的。重置AQS里面的state,返回锁是否被完全释放了的判断。

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        //下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)
        // 从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的 
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

等到unpark以后,parkAndCheckInterrupt的阻塞解除,将继续for无限循环,因为是队列里是一个一个阻塞的,此时阻塞节点的前置依次都是head,因此if (p == head && tryAcquire(arg)) 这句话如果它醒来抢锁成功了将执行成功,阻塞的线程获取锁并执行,将自己设置成head,同时也将自己从队列中清除出去。 注意这里是非公平锁,因此在tryAcquire有可能还没有抢过其他线程,那么抢到的那个将会直接执行,而没有抢到的,又在循环里锁住了。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68033.html

相关文章

  • Java线程进阶(三)—— J.U.Clocks框架ReentrantLock

    摘要:公平策略在多个线程争用锁的情况下,公平策略倾向于将访问权授予等待时间最长的线程。使用方式的典型调用方式如下二类原理的源码非常简单,它通过内部类实现了框架,接口的实现仅仅是对的的简单封装,参见原理多线程进阶七锁框架独占功能剖析 showImg(https://segmentfault.com/img/remote/1460000016012582); 本文首发于一世流云的专栏:https...

    jasperyang 评论0 收藏0
  • Java线程——重入ReentrantLock源码阅读

    摘要:所谓的重入,就是当本线程想再次获得锁,不需要重新申请,它本身就已经锁了,即重入该锁。如果不为,则表示有线程已经占有了。总结回顾下要点是一个可重入的锁被当前占用的线程重入。 上一章《AQS源码阅读》讲了AQS框架,这次讲讲它的应用类(注意不是子类实现,待会细讲)。ReentrantLock,顾名思义重入锁,但什么是重入,这个锁到底是怎样的,我们来看看类的注解说明showImg(http:...

    sushi 评论0 收藏0
  • Java 重入 ReentrantLock 原理分析

    摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...

    lx1036 评论0 收藏0
  • Java线程—ReentrantReadWriteLock源码阅读

    摘要:不同的是它还多了内部类和内部类,以及读写对应的成员变量和方法。另外是给和内部类使用的。内部类前面说到的操作是分配到里面执行的。他们都是接口的实现,所以其实最像应该是这个两个内部类。而且大体上也没什么差异,也是用的内部类。 之前讲了《AQS源码阅读》和《ReentrantLock源码阅读》,本次将延续阅读下ReentrantReadWriteLock,建议没看过之前两篇文章的,先大概了解...

    Ververica 评论0 收藏0
  • Java线程进阶(十)—— J.U.Clocks框架:基于AQS的读写(5)

    摘要:关于,最后有两点规律需要注意当的等待队列队首结点是共享结点,说明当前写锁被占用,当写锁释放时,会以传播的方式唤醒头结点之后紧邻的各个共享结点。当的等待队列队首结点是独占结点,说明当前读锁被使用,当读锁释放归零后,会唤醒队首的独占结点。 showImg(https://segmentfault.com/img/remote/1460000016012293); 本文首发于一世流云的专栏:...

    dunizb 评论0 收藏0

发表评论

0条评论

zacklee

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<