资讯专栏INFORMATION COLUMN

【java并发编程实战6】AQS之独占锁ReentrantLock实现

sixleaves / 2556人阅读

摘要:锁与很好的隔离使用者与实现者所需要关注的领域。那么这个就是包装线程并且放入到队列的过程实现的方法。也证实了就是获取锁的线程的节点。如果发生异常取消请求,也就是将当前节点重队列中移除。

前言

自从JDK1.5后,jdk新增一个并发工具包java.util.concurrent,提供了一系列的并发工具类。而今天我们需要学习的是java.util.concurrent.lock也就是它下面的lock包,其中有一个最为常见类ReentrantLock

我们知道ReentrantLock的功能是实现代码段的并发访问控制,也就是通常意义上所说的锁。之前我们也学习过一种锁的实现,也就是synchronized关键词,synchronized是在字节码层面,通过对象的监视器锁实现的。那么ReentrantLock又是怎么实现的呢?

如果不看源码,可能会以为它的实现是通过类似于synchronized,通过对象的监视器锁实现的。但事实上它仅仅是一个工具类!没有使用更“高级”的机器指令,不是关键字,也不依靠JDK编译时的特殊处理,仅仅作为一个普普通通的类就完成了代码块的并发访问控制,这就更让人疑问它怎么实现的代码块的并发访问控制的了。

我们查看源码发现,它是通过继承抽象类实现的AbstractQueuedSynchronizer,为了方便描述,接下来我将用AQS代替AbstractQueuedSynchronizer

关于AQS
AQS,它是用来构建锁或者其他同步组建的基础框架,我们见过许多同步工具类都是基于它构建的。包括ReentrantLock、CountDownLatch等。在深入了解AQS了解之前,我们需要知道锁跟AQS的区别。锁,它是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节;而AQS面像的是锁的实现者,它简化了锁的实现。锁与AQS很好的隔离使用者与实现者所需要关注的领域。那么我们今天就作为一个锁的实现者,一步一步分析锁的实现。

AQS又称同步器,它的内部有一个int成员变量state表示同步状态,还有一个内置的FIFO队列来实现资源获取线程的排队工作。通过它们我们就能实现锁。

在实现锁之前,我们需要考虑做为锁的使用者,锁会有哪几种?

通常来说,锁分为两种,一种是独占锁(排它锁,互斥锁),另一种就是共享锁了。根据这两类,其实AQS也给我们提供了两套API。而我们作为锁的实现者,通常都是要么全部实现它的独占api,要么实现它的共享api,而不会出现一起实现的。即使juc内置的ReentrantReadWriteLock也是通过两个子类分别来实现的。

锁的实现 独占锁

独占锁又名互斥锁,同一时间,只有一个线程能获取到锁,其余的线程都会被阻塞等待。其中我们常用的ReentrantLock就是一种独占锁,我们一起来ReentrantLock 分析ReentrantLock的同时看一看AQS的实现,再推理出AQS独特的设计思路和实现方式。最后,再看其共享控制功能的实现。

首先我们来看看获取锁的过程

加锁

我们查看ReentrantLock的源码。来分析它的lock方法

  public void lock() {
        sync.lock();
   }

与我们之前分析的一样,锁的具体实现由内部的代理类完成,lock只是暴露给锁的使用者的一套api。使用过ReentrantLock的同学应该知道,ReentrantLock又分为公平锁和非公平锁,所以,ReentrantLock内部只有两个sync的实现。

    /**
     * Sync object for non-fair locks
     */
    static final class NonfairSync extends Sync{..}
     /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync{..}

公平锁 :每个线程获取锁的顺序是按照调用lock方法的先后顺序来的。

非公平锁:每个线程获取锁的顺序是不会按照调用lock方法的先后顺序来的。完全看运气。

所以我们完全可以猜测到,这个公平与不公平的区别就体现在锁的获取过程。我们以公平锁为例,来分析获取锁过程,最后对比非公平锁的过程,寻找差异。

lock

查看FairSync的lock方法

    final void lock() {
            acquire(1);
        }

这里它调用到了父类AQS的acquire方法,所以我们继续查看acquire方法的代码

acquire
/**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
            selfInterrupt();
    }

查看方法方法的注释我们可以知道这个方法的作用,这里我简单的翻译一下.

Acquires方法是一个独占锁模式的方法,它是不会响应中断的。它至少执行一次tryAcquire去获取锁,如果返回true,则代表获取锁成功,否则它将会被加入等待队列阻塞,直到重新尝试获取锁成功。所以我们需要看看尝试获取锁的方法tryAcquire的实现

tryAcruire
   protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

抛出一个异常,没有实现。所以我们需要查看它的子类,在我们这里就是FairSync的实现。

这里也会大家会有疑惑,没有实现为什么不写成抽象方法呢,前面我们提到过,我们不会同时在一个类中实现独占锁跟共享锁的api,那么tryAcruire是属于独占锁,那么如果我想一个共享锁也要重新独占锁的方法吗?所以大师的设计是绝对没有问题的。
protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//获取当前线程
            int c = getState();  //获取父类AQS中的标志位
            if (c == 0) {
                if (!hasQueuedPredecessors() && 
                    //如果队列中没有其他线程  说明没有线程正在占有锁!
                    compareAndSetState(0, acquires)) { 
                    //修改一下状态位,注意:这里的acquires是在lock的时候传递来的,从上面的图中可以知道,这个值是写死的1
                    setExclusiveOwnerThread(current);
                    //如果通过CAS操作将状态为更新成功则代表当前线程获取锁,因此,将当前线程设置到AQS的一个变量中,说明这个线程拿走了锁。
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
             //如果不为0 意味着,锁已经被拿走了,但是,因为ReentrantLock是重入锁,
             //是可以重复lock,unlock的,只要成对出现行。一次。这里还要再判断一次 获取锁的线程是不是当前请求锁的线程。
                int nextc = c + acquires;//如果是的,累加在state字段上就可以了。
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

目前为止,如果获取锁成功,则返回true,获取锁的过程结束,如果获取失败,则返回false

按照之前的逻辑,如果线程获取锁失败,则会被放入到队列中,但是在放入之前,需要给线程包装一下。

那么这个addWaiter就是包装线程并且放入到队列的过程实现的方法。

addWaiter
   /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

注释: 把当前线程作为一个节点添加到队列中,并且为这个节点设置模式

模式: 也就是独占模式/共享模式,在这里模式是形参,所以我们看看起调方

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) Node.EXCLUSIVE 就代表这是独占锁模式。

创建好节点后,将节点加入到队列尾部,此处,在队列不为空的时候,先尝试通过cas方式修改尾节点为最新的节点,如果修改失败,意味着有并发,这个时候才会进入enq中死循环,“自旋”方式修改。

将线程的节点接入到队里中后,当然还需要做一件事:将当前线程挂起!这个事,由acquireQueued来做。

在解释acquireQueued之前,我们需要先看下AQS中队列的内存结构,我们知道,队列由Node类型的节点组成,其中至少有两个变量,一个封装线程,一个封装节点类型。

而实际上,它的内存结构是这样的(第一次节点插入时,第一个节点是一个空节点,代表有一个线程已经获取锁,事实上,队列的第一个节点就是代表持有锁的节点):

黄色节点为队列默认的头节点,每次有线程竞争失败,进入队列后其实都是插入到队列的尾节点(tail后面)后面。这个从enq方法可以看出来,上文中有提到enq方法为将节点插入队列的方法:

enq
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 一个空的节点,通常代表获取锁的线程
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
acquireQueued

接着我们来看看当节点被放入到队列中,如何将线程挂起,也就是看看acquireQueued方法的实现。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 获取当前节点前驱结点
                final Node p = node.predecessor();
                // 如果前驱节点是head,那么它就是等待队列中的第一个线程
                // 因为我们知道head就是获取线程的节点,那么它就有机会再次获取锁
                if (p == head && tryAcquire(arg)) {
                    //成功后,将上图中的黄色节点移除,Node1变成头节点。 也证实了head就是获取锁的线程的节点。
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 1、检查前一个节点的状态,判断是否要挂起
                // 2、如果需要挂起,则通过JUC下的LockSopport类的静态方法park挂起当前线程,直到被唤醒。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果发生异常
            if (failed)
                // 取消请求,也就是将当前节点重队列中移除。
                cancelAcquire(node);
        }
    }

这里我还需要解释的是:

1、Node节点除了存储当前线程之外,节点类型,前驱后驱指针之后,还存储一个叫waitStatus的变量,该变量用于描述节点的状态。共有四种状态。

       /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor"s thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;

分别表示:

1 = 取消状态,该节点将会被队列移除。

-1 = 等待状态,后驱节点处于等待状态。

-2 = 等待被通知,该节点将会阻塞至被该锁的condition的await方法唤醒。

-3 = 共享传播状态,代表该节点的状态会向后传播。

到此为止,一个线程对于锁的一次竞争才告于段落,结果有两种,要么成功获取到锁(不用进入到AQS队列中),要么,获取失败,被挂起,等待下次唤醒后继续循环尝试获取锁,值得注意的是,AQS的队列为FIFO队列,所以,每次被CPU假唤醒,且当前线程不是出在头节点的位置,也是会被挂起的。AQS通过这样的方式,实现了竞争的排队策略。

释放锁

看完了加锁,再看释放锁。我们先不看代码也可以猜测到释放锁需要的步骤。

队列的头节点是当前获取锁的线程,所以我们需要移除头节点

释放锁,唤醒头节点后驱节点来竞争锁

接下来我们查看源码来验证我们的猜想是否在正确。

unlock
public void unlock() {
    sync.release(1);
}

unlock方法调用AQS的release方法,因为我们的acquire的时候传入的是1,也就是同步状态量+1,那么对应的解锁就要-1。

release
  public final boolean release(int arg) {
        // 尝试释放锁
        if (tryRelease(arg)) {
            // 释放锁成功,获取当前队列的头节点
            Node h = head;
            if (h != null && h.waitStatus != 0)
                // 唤醒当前节点的下一个节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
tryRelease

同样的它是交给子类实现的

    protected final boolean tryRelease(int releases) {
        
            int c = getState() - releases;
            // 当前线程不是获取锁的线程 抛出异常
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            // 因为是重入的关系,不是每次释放锁c都等于0,直到最后一次释放锁时,才通知AQS不需要再记录哪个线程正在获取锁。
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
    }
unparkSuccessor

释放锁成功之后,就唤醒头节点后驱节点来竞争锁

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

值得注意的是,寻找的顺序是从队列尾部开始往前去找的最前面的一个waitStatus小于0的节点。因为大于0 就是1状态的节点是取消状态。

公平锁与非公平锁

到此我们锁获取跟锁的释放已经分析的差不多。那么公平锁跟非公平锁的区别在于加锁的过程。对比代码

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
}
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
}

从代码中也可以看出来,非公平在公平锁的加锁的逻辑之前先直接cas修改一次state变量(尝试获取锁),成功就返回,不成功再排队,从而达到不排队直接抢占的目的。

最后欢迎大家关注一下我的个人公众号。一起交流一起学习,有问必答。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77100.html

相关文章

  • BATJ都爱问的多线程面试题

    摘要:今天给大家总结一下,面试中出镜率很高的几个多线程面试题,希望对大家学习和面试都能有所帮助。指令重排在单线程环境下不会出先问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。使用可以禁止的指令重排,保证在多线程环境下也能正常运行。 下面最近发的一些并发编程的文章汇总,通过阅读这些文章大家再看大厂面试中的并发编程问题就没有那么头疼了。今天给大家总结一下,面试中出镜率很高的几个多线...

    高胜山 评论0 收藏0
  • Java 重入 ReentrantLock 原理分析

    摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...

    lx1036 评论0 收藏0
  • Java多线程进阶(七)—— J.U.Clocks框架:AQS独占功能剖析(2)

    摘要:开始获取锁终于轮到出场了,的调用过程和完全一样,同样拿不到锁,然后加入到等待队列队尾然后,在阻塞前需要把前驱结点的状态置为,以确保将来可以被唤醒至此,的执行也暂告一段落了安心得在等待队列中睡觉。 showImg(https://segmentfault.com/img/remote/1460000016012467); 本文首发于一世流云的专栏:https://segmentfault...

    JayChen 评论0 收藏0
  • Java,真的有这么复杂吗?

    摘要:撤销锁偏向锁使用了一种等到竞争出现才释放锁的机制,所以当其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。轻量级锁线程在执行同步代码块之前,会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的复制到锁记录中,官方称为。 前言 作者前面也写了几篇关于Java并发编程,以及线程和volatil的基础知识,有兴趣可以阅读作者的原文博客,今天关于Java中的两种锁进行详解,希望对...

    Darkgel 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<