资讯专栏INFORMATION COLUMN

深入浅出AQS之条件队列

VEIGHTZ / 2023人阅读

摘要:从上面的代码可以看出,条件队列是建立在锁基础上的,而且必须是独占锁原因后面会通过源码分析。明天就是国庆长假了,我自己也计划出国玩一趟,散散心。提前祝广大朋友国庆快乐。

相比于独占锁跟共享锁,AbstractQueuedSynchronizer中的条件队列可能被关注的并不是很多,但它在阻塞队列的实现里起着至关重要的作用,同时如果想全面了解AQS,条件队列也是必须要学习的。

原文地址:http://www.jianshu.com/p/3f8b...

这篇文章会涉及到AQS中独占锁跟共享锁的一些知识,如果你已经对这两块内容很了解了,那就直接往下看。否则在读本文之前还是建议读者先去看看我之前写的两篇文章温习一下。
深入浅出AQS之独占锁模式
深入浅出AQS之共享锁模式

一、使用场景介绍

区别于前面两篇文章,可能之前很多人都没有太在意AQS中的这块内容,所以这篇文章我们先来看下条件队列的使用场景:

//首先创建一个可重入锁,它本质是独占锁
private final ReentrantLock takeLock = new ReentrantLock();
//创建该锁上的条件队列
private final Condition notEmpty = takeLock.newCondition();
//使用过程
public E take() throws InterruptedException {
        //首先进行加锁
        takeLock.lockInterruptibly();
        try {
            //如果队列是空的,则进行等待
            notEmpty.await();
            //取元素的操作...
            
            //如果有剩余,则唤醒等待元素的线程
            notEmpty.signal();
        } finally {
            //释放锁
            takeLock.unlock();
        }
        //取完元素以后唤醒等待放入元素的线程
    }

上面的代码片段截取自LinkedBlockingQueue,是Java常用的阻塞队列之一。
从上面的代码可以看出,条件队列是建立在锁基础上的,而且必须是独占锁(原因后面会通过源码分析)。

二、执行过程概述

等待条件的过程:

在操作条件队列之前首先需要成功获取独占锁,不然直接在获取独占锁的时候已经被挂起了。

成功获取独占锁以后,如果当前条件还不满足,则在当前锁的条件队列上挂起,与此同时释放掉当前获取的锁资源。这里可以考虑一下如果不释放锁资源会发生什么?

如果被唤醒,则检查是否可以获取独占锁,否则继续挂起。

条件满足后的唤醒过程(以唤醒一个节点为例,也可以唤醒多个):

把当前等待队列中的第一个有效节点(如果被取消就无效了)加入同步队列等待被前置节点唤醒,如果此时前置节点被取消,则直接唤醒该节点让它重新在同步队列里适当的尝试获取锁或者挂起。

注:说到这里必须要解释一个知识点,整个AQS分为两个队列,一个同步队列,一个条件队列。只有同步队列中的节点才能获取锁。前面两篇独占锁共享锁文章中提到的加入队列就是同步队列。条件队列中所谓的唤醒是把节点从条件队列移到同步队列,让节点有机会去获取锁。

二、源码深入分析

下面的代码稍微复杂一点,因为它考虑了中断的处理情况。我由于想跟文章开头的代码片段保持一致,所以选取了该方法进行说明。如果只想看核心逻辑的话,那推荐读者看看awaitUninterruptibly()方法的源码。

        //条件队列入口,参考上面的代码片段
        public final void await() throws InterruptedException {
            //如果当前线程被中断则直接抛出异常
            if (Thread.interrupted())
                throw new InterruptedException();
            //把当前节点加入条件队列
            Node node = addConditionWaiter();
            //释放掉已经获取的独占锁资源
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //如果不在同步队列中则不断挂起
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //中断处理,另一种跳出循环的方式
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //走到这里说明节点已经条件满足被加入到了同步队列中或者中断了
            //这个方法很熟悉吧?就跟独占锁调用同样的获取锁方法,从这里可以看出条件队列只能用于独占锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //走到这里说明已经成功获取到了独占锁,接下来就做些收尾工作
            //删除条件队列中被取消的节点
            if (node.nextWaiter != null) 
                unlinkCancelledWaiters();
            //根据不同模式处理中断
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

流程比较复杂,一步一步来分析,首先看下加入条件队列的代码:

        //注:1.与同步队列不同,条件队列头尾指针是firstWaiter跟lastWaiter
        //注:2.条件队列是在获取锁之后,也就是临界区进行操作,因此很多地方不用考虑并发
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            //如果最后一个节点被取消,则删除队列中被取消的节点
            //至于为啥是最后一个节点后面会分析
            if (t != null && t.waitStatus != Node.CONDITION) {
                //删除所有被取消的节点
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建一个类型为CONDITION的节点并加入队列,由于在临界区,所以这里不用并发控制
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

        //删除取消节点的逻辑虽然长,但比较简单,就不多带带说了,就是链表删除
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

把节点加入到条件队列中以后,接下来要做的就是释放锁资源:

    //入参就是新创建的节点,即当前节点
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //这里这个取值要注意,获取当前的state并释放,这从另一个角度说明必须是独占锁
            //可以考虑下这个逻辑放在共享锁下面会发生什么?
            int savedState = getState();
            //跟独占锁释放锁资源一样,不赘述
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                //如果这里释放失败,则抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            //如果释放锁失败,则把节点取消,由这里就能看出来上面添加节点的逻辑中只需要判断最后一个节点是否被取消就可以了
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

走到这一步,节点也加入条件队列中了,锁资源也释放了,接下来就该挂起了(先忽略中断处理,单看挂起逻辑):

     //如果不在同步队列就继续挂起(signal操作会把节点加入同步队列)
     while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //中断处理后面再分析
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }
    //判断节点是否在同步队列中
    final boolean isOnSyncQueue(Node node) {
        //快速判断1:节点状态或者节点没有前置节点
        //注:同步队列是有头节点的,而条件队列没有
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //快速判断2:next字段只有同步队列才会使用,条件队列中使用的是nextWaiter字段
        if (node.next != null) 
            return true;
        //上面如果无法判断则进入复杂判断
        return findNodeFromTail(node);
    }

    //注意这里用的是tail,这是因为条件队列中的节点是被加入到同步队列尾部,这样查找更快
    //从同步队列尾节点开始向前查找当前节点,如果找到则说明在,否则不在
    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

如果被唤醒且已经被转移到了同步队列,则会执行与独占锁一样的方法acquireQueued()进行同步队列独占获取。
最后我们来梳理一下里面的中断逻辑以及收尾工作的代码:

     while (!isOnSyncQueue(node)) {
           LockSupport.park(this);
           //这里被唤醒可能是正常的signal操作也可能是中断
           if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
     }

     //这里的判断逻辑是:
     //1.如果现在不是中断的,即正常被signal唤醒则返回0
     //2.如果节点由中断加入同步队列则返回THROW_IE,由signal加入同步队列则返回REINTERRUPT
     private int checkInterruptWhileWaiting(Node node) {
           return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
     }

     //修改节点状态并加入同步队列
     //该方法返回true表示节点由中断加入同步队列,返回false表示由signal加入同步队列
     final boolean transferAfterCancelledWait(Node node) {
        //这里设置节点状态为0,如果成功则加入同步队列
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            //与独占锁同样的加入队列逻辑,不赘述
            enq(node);
            return true;
        }
        //如果上面设置失败,说明节点已经被signal唤醒,由于signal操作会将节点加入同步队列,我们只需自旋等待即可
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
     }

在把唤醒后的中断判断做好以后,看await()中最后一段逻辑:

//在处理中断之前首先要做的是从同步队列中成功获取锁资源
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
//由于当前节点可能是由于中断修改了节点状态,所以如果有后继节点则执行删除已取消节点的操作
//如果没有后继节点,根据上面的分析在后继节点加入的时候会进行删除
if (node.nextWaiter != null) 
      unlinkCancelledWaiters();
if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);

//根据中断时机选择抛出异常或者设置线程中断状态
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
      if (interruptMode == THROW_IE)
           throw new InterruptedException();
      else if (interruptMode == REINTERRUPT)
           //实现代码为:Thread.currentThread().interrupt();
           selfInterrupt();
}

至此条件队列await操作全部分析完毕。signal()方法相对容易一些,一起看源码分析下:

   //条件队列唤醒入口
   public final void signal() {
       //如果不是独占锁则抛出异常,再次说明条件队列只适用于独占锁
       if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
       //如果条件队列不为空,则进行唤醒操作
       Node first = firstWaiter;
       if (first != null)
            doSignal(first);
   }

   //该方法就是把一个有效节点从条件队列中删除并加入同步队列
   //如果失败则会查找条件队列上等待的下一个节点直到队列为空
   private void doSignal(Node first) {
        do {
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&(first = firstWaiter) != null);
   }

    //将节点加入同步队列
    final boolean transferForSignal(Node node) {
        //修改节点状态,这里如果修改失败只有一种可能就是该节点被取消,具体看上面await过程分析
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //该方法很熟悉了,跟独占锁入队方法一样,不赘述
        Node p = enq(node);
        //注:这里的p节点是当前节点的前置节点
        int ws = p.waitStatus;
        //如果前置节点被取消或者修改状态失败则直接唤醒当前节点
        //此时当前节点已经处于同步队列中,唤醒会进行锁获取或者正确的挂起操作
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
三、总结

相比于独占锁跟共享锁,条件队列可能是最不受关注的了,但由于它是阻塞队列实现的关键组件,还是有必要了解一下其中的原理。其实我认为关键点有两条,第一是条件队列是建立在某个具体的锁上面的,第二是条件队列跟同步队列是两个队列,前者依赖条件唤醒后者依赖锁释放唤醒,了解了这两点以后搞清楚条件队列就不是什么难事了。

至此,Java同步器AQS中三大锁模式就都分析完了。虽然已经尽力思考,尽量写的清楚,但鉴于水平有限,如果有纰漏的地方,欢迎广大读者指正。
明天就是国庆长假了,我自己也计划出国玩一趟,散散心。
提前祝广大朋友国庆快乐。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67653.html

相关文章

  • 深入浅出AQS组件概览

    摘要:原文地址深入浅出之独占锁模式深入浅出之共享锁模式深入浅出之条件队列前面三篇文章如果之前没有基础的话看起来会比较吃力,这篇文章说明一下的基础知识,方便快速了解。当前节点由于超时或者中断被取消,节点进入这个状态以后将保持不变。 之前分析了AQS中的独占锁,共享锁,条件队列三大模块,现在从结构上来看看AQS各个组件的情况。 原文地址:http://www.jianshu.com/p/49b8...

    DDreach 评论0 收藏0
  • 深入浅出AQS独占锁模式

    摘要:获取锁的过程当线程调用申请获取锁资源,如果成功,则进入临界区。如果队列中有其他等待锁资源的线程需要唤醒,则唤醒队列中的第一个等待节点先入先出。释放锁时,如果队列中有等待的线程就进行唤醒。 每一个Java工程师应该都或多或少了解过AQS,我自己也是前前后后,反反复复研究了很久,看了忘,忘了再看,每次都有不一样的体会。这次趁着写博客,打算重新拿出来系统的研究下它的源码,总结成文章,便于以后...

    Corwien 评论0 收藏0
  • 源码分析JDK8AbstractQueuedSynchronizer

    摘要:与之相关的方法有三个原子性地修改都是类型,可见我们可以进行,来定义的获取与释放从而实现我们自定义的同步器。 前言 源码分析我认为主要有两个作用:满足好奇心,我想每一个有追求的人都不会满足于仅仅做一个API Caller实现功能就好,我们也想知道它到底是怎么实现的;借鉴与升华,当我们明白了一个类的设计原理,在一定的情境下我们可以借鉴其设计哲学,甚至针对我们自己特殊的业务场景对其进行改良与...

    魏宪会 评论0 收藏0
  • 源码分析JDK8AbstractQueuedSynchronizer

    摘要:与之相关的方法有三个原子性地修改都是类型,可见我们可以进行,来定义的获取与释放从而实现我们自定义的同步器。 前言 源码分析我认为主要有两个作用:满足好奇心,我想每一个有追求的人都不会满足于仅仅做一个API Caller实现功能就好,我们也想知道它到底是怎么实现的;借鉴与升华,当我们明白了一个类的设计原理,在一定的情境下我们可以借鉴其设计哲学,甚至针对我们自己特殊的业务场景对其进行改良与...

    sunny5541 评论0 收藏0
  • 源码分析JDK8AbstractQueuedSynchronizer

    摘要:与之相关的方法有三个原子性地修改都是类型,可见我们可以进行,来定义的获取与释放从而实现我们自定义的同步器。 前言 源码分析我认为主要有两个作用:满足好奇心,我想每一个有追求的人都不会满足于仅仅做一个API Caller实现功能就好,我们也想知道它到底是怎么实现的;借鉴与升华,当我们明白了一个类的设计原理,在一定的情境下我们可以借鉴其设计哲学,甚至针对我们自己特殊的业务场景对其进行改良与...

    Betta 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<