资讯专栏INFORMATION COLUMN

J.U.C|AQS共享式源码分析

ghnor / 862人阅读

摘要:主要讲解方法共享式获取同步状态,返回值表示获取成功,反之则失败。源码分析同步器的和方法请求共享锁的入口当并且时才去才获取资源获取锁以共享不可中断模式获取锁将当前线程一共享方式构建成节点并将其加入到同步队列的尾部。

一、写在前面

上篇给大家聊了独占式的源码,具体参见《J.U.C|AQS独占式源码分析》

这一章我们继续在AQS的源码世界中遨游,解读共享式同步状态的获取和释放。

二、什么是共享式

共享式与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。

我们以读写锁为例来看两者,一个线程在对一个资源文件进行读操作时,那么这一时刻对于文件的写操作均被阻塞,而其它线程的读操作可以同时进行。
当写操作要求对资源独占操作,而读操作可以是共享的,两种不同的操作对同一资源进行操作会是什么样的?看下图


共享式访问资源,其他共享时均被允许,而独占式被阻塞。


独占式访问资源时,其它访问均被阻塞。

通过读写锁给大家一起温故下独占式和共享式概念,上一节我们已经聊过独占式,本章我们主要聊共享式。

主要讲解方法

protected int tryAcquireShared(int arg);共享式获取同步状态,返回值 >= 0 表示获取成功,反之则失败。

protected boolean tryReleaseShared(int arg): 共享式释放同步状态。

三、核心方法分析 3.1 同步状态的获取

public final void acquireShared(int arg)

共享式获取同步状态的顶级入口,如果当前线程未获取到同步状态,将会加入到同步队列中等待,与独占式唯一的区别是在于同一时刻可以有多个线程获取到同步状态。

方法源码

public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

方法函数解析

tryAcquireShared(arg):获取同步状态,返回值大于等于0表示获取成功,否则失败。

doAcquireShared(arg):共享式获取共享状态,包含构建节点,加入队列等待,唤醒节点等操作。

源码分析

同步器的 acquireShared 和 doAcquireShared 方法

//请求共享锁的入口
public final void acquireShared(int arg) {
        // 当state != 0 并且tryAcquireShared(arg) < 0 时才去才获取资源
        if (tryAcquireShared(arg) < 0)
            // 获取锁
            doAcquireShared(arg);
    }
// 以共享不可中断模式获取锁
private void doAcquireShared(int arg) {
        // 将当前线程一共享方式构建成 node 节点并将其加入到同步队列的尾部。这里addWaiter(Node.SHARED)操作和独占式基本一样,
        final Node node = addWaiter(Node.SHARED);
        // 是否成功标记
        boolean failed = true;
        try {
            // 等待过程是否被中断标记
            boolean interrupted = false;
            自旋
            for (;;) {
                // 获取当前节点的前驱节点
                final Node p = node.predecessor();
                // 判断前驱节点是否是head节点,也就是看自己是不是老二节点
                if (p == head) {
                    // 如果自己是老二节点,尝试获取资源锁,返回三种状态
                    // state < 0 : 表示获取资源失败
                    // state = 0: 表示当前正好线程获取到资源, 此时不需要进行向后继节点传播。
                    // state > 0: 表示当前线程获取资源锁后,还有多余的资源,需要向后继节点继续传播,获取资源。 
                    int r = tryAcquireShared(arg);
                    // 获取资源成功
                    if (r >= 0) {
                        // 当前节点线程获取资源成功后,对后继节点进行逻辑操作
                        setHeadAndPropagate(node, r);
                        // setHeadAndPropagate(node, r) 已经对node.prev = null,在这有对p.next = null; 等待GC进行垃圾收集。
                        p.next = null; // help GC
                        // 如果等待过程被中断了, 将中断给补上。
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                // 判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

在acquireShared(int arg)方法中,同步器调用tryAcquireShared(arg)方法获取同步状态,返回同步状态有两种。

当同步状态大于等于0时: 表示可以获取到同步状态,退出自旋,在doAcquireShared(int arg)方法中可以看到节点获取资源退出自旋的条件就是大于等于0

小于0会加入同步队列中等待被唤醒。

addWaiter和enq方法

// 创建节点,并将节点加入到同步队列尾部中。
 private Node addWaiter(Node mode) {
        // 以共享方式为线程构建Node节点
        Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速加入到队列尾部
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS保证原子操作,将node节点加入到队列尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 快速加入失败,走 enq(node)方法
        enq(node);
        return node;
}
//以自旋的方式,将node节点加入到队列的尾部
private Node enq(final Node node) {
        // 自旋
        for (;;) {
            // 获取尾部节点
            Node t = tail;
            // 如果tail节点为空, 说明同步队列还没初始化,必须先进行初始化
            if (t == null) { // Must initialize
                // CAS保证原子操作, 新建一个空 node 节点并将其设置为head节点
                if (compareAndSetHead(new Node()))
                    // 设置成功并将tail也指向该节点
                    tail = head;
            } else {
                // 将node节点加入到队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这两个方法和独占式的基本相同,注释中都标明了,在这就不多做解释了。

获取资源成功后对后继节点的操作setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        // 记录老的head节点,以便核对
        Node h = head; // Record old head for check below
        // 将node 设置成head节点
        setHead(node);
        // 这里表示: 如果资源足够(propagate > 0)或者旧头节点为空(h == null)或者旧节点的waitStatus为 SIGNAL(-1) 或者 PROPAGATE(-3)(h.waitStatus < 0)
        // 或者当前head节点不为空或者waitStatus为SIGNAL(-1) 或者 PROPAGATE(-3),此时需要继续唤醒后继节点来尝试获取资源。
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            // 当前node节点的后继节点
            Node s = node.next;
            //如果后节点为空或者属于共享节点
            if (s == null || s.isShared())
                // 继续尝试获取资源
                doReleaseShared();
        }
    }

首先将当前节点设置为head节点 setHead(node), 其次根据条件看是否对后继节点继续唤醒。

获取资源失败进行阻塞等待unpark

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 获取前驱节点的等待状态
        int ws = pred.waitStatus;
        // 如果等待状态已经为SIGNAL(表示当前当前节点的后继节点处于等待状态,如果当前节点释放了同步状态或者被中断, 则会唤醒后继节点)
        if (ws == Node.SIGNAL)
            // 直接返回,表示可以安心的去休息了
            return true;
        // 如果前驱的节点的状态 ws > 0(表示该节点已经被取消或者中断,也就是成无效节点,需要从同步队列中取消的)
        if (ws > 0) {
            // 循环往前需寻找,知道寻找到一个有效的安全点(一个等待状态<= 0 的节点,排在它后面)
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 注意这一波操作后,获奖取消的节点全部变成GC可回收的废弃链。
            pred.next = node;
        } else {
            //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它获取资源后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

 private final boolean parkAndCheckInterrupt() {
        // 调用park方法使当前节点的线程进入waiting
        LockSupport.park(this);    
        //返回线程中断状态
        return Thread.interrupted();
    }

这两个方法和独占式基本相同。

接着看doReleaseShared 这个比较复杂

private void doReleaseShared() {
        //注意,这里的头结点已经是上面新设定的头结点了,从这里可以看出,如果propagate=0,
        //不会进入doReleaseShared方法里面,那就有共享式变成了独占式
        for (;;) { // 死循环以防在执行此操作时添加新节点:退出条件 h == head
            Node h = head;
            // 前提条件,当前的头节点不为空,并且不是尾节点
            if (h != null && h != tail) {
                // 当前头节点的等待状态
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    // 如果当前节点的状态为SIGNAL,则利用CAS将其状态设置为0(也就是初始状态)
                    //这里不直接设为Node.PROPAGATE,是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
                    //这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases 设置失败,重新循环
                    // 唤醒后继节点
                    unparkSuccessor(h);
                }
                // 如果等待状态不为0 则利用CAS将其状态设置为PROPAGATE ,以确保在释放资源时能够继续通知后继节点。
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed 如果head 期间发生了改变,则需要从新循坏
                break;
        }
    }
private void unparkSuccessor(Node node) {
        
        int ws = node.waitStatus;
        // 在此再次判断当前头节点的的状态,如果小于0 将设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        //获取后继节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            //如果后继节点为空或者等待状态大于0 直接放弃。
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                // 循环从尾部往前寻找下一个等待状态不大于0的节点
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒该节点的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }

3.2 共享状态释放
最后一步释放资源就比较简单了。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
四、总结

在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程会加入到队列中并进行自旋,出列的(或者停止自旋)的条件时前驱节点为头节点并且成功获取了同步状态。在释放同步状态时,调用Release方法释放同步状态,然后唤醒头节点的后继节点。

共享式方式在唤醒后继节点获得资源后会判断当前资源是否还有多余的,如果有会继续唤醒下一个节点。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74283.html

相关文章

  • J.U.C|AQS独占源码分析

    摘要:本章我们主要聊独占式即同一时刻只能有一个线程获取同步状态,其它获取同步状态失败的线程则会加入到同步队列中进行等待。到这独占式获取同步和释放同步状态的源码已经分析完了。 一、写在前面 上篇文章通过ReentrantLock 的加锁和释放锁过程给大家聊了聊AQS架构以及实现原理,具体参见《J.U.C|AQS的原理》。 理解了原理,我们在来看看再来一步一步的聊聊其源码是如何实现的。 本章给...

    why_rookie 评论0 收藏0
  • J.U.C|可重入锁ReentrantLock

    摘要:二什么是重入锁可重入锁,顾名思义,支持重新进入的锁,其表示该锁能支持一个线程对资源的重复加锁。将由最近成功获得锁,并且还没有释放该锁的线程所拥有。可以使用和方法来检查此情况是否发生。 一、写在前面 前几篇我们具体的聊了AQS原理以及底层源码的实现,具体参见 《J.U.C|一文搞懂AQS》《J.U.C|同步队列(CLH)》《J.U.C|AQS独占式源码分析》《J.U.C|AQS共享式源...

    wangdai 评论0 收藏0
  • J.U.C|同步队列(CLH)

    摘要:二什么是同步队列同步队列一个双向队列,队列中每个节点等待前驱节点释放共享状态锁被唤醒就可以了。三入列操作如上图了解了同步队列的结构,我们在分析其入列操作在简单不过。 一、写在前面 在上篇我们聊到AQS的原理,具体参见《J.U.C|AQS原理》。 这篇我们来给大家聊聊AQS中核心同步队列(CLH)。 二、什么是同步队列(CLH) 同步队列 一个FIFO双向队列,队列中每个节点等待前驱...

    Nosee 评论0 收藏0
  • J.U.C|condition分析

    摘要:造成当前线程在接到信号被中断或到达指定最后期限之前一直处于等待状态。该线程从等待方法返回前必须获得与相关的锁。如果线程已经获取了锁,则将唤醒条件队列的首节点。 一、写在前面 在前几篇我们聊了 AQS、CLH、ReentrantLock、ReentrantReadWriteLock等的原理以及其源码解读,具体参见专栏 《非学无以广才》 这章我们一起聊聊显示的Condition 对象。 ...

    Sourcelink 评论0 收藏0
  • J.U.C|读-写锁ReentrantReadWriteLock

    摘要:所以就有了读写锁。只要没有,读取锁可以由多个线程同时保持。其读写锁为两个内部类都实现了接口。读写锁同样依赖自定义同步器来实现同步状态的,而读写状态就是其自定义同步器的状态。判断申请写锁数量是否超标超标则直接异常,反之则设置共享状态。 一、写在前面 在上篇我们聊到了可重入锁(排它锁)ReentrantLcok ,具体参见《J.U.C|可重入锁ReentrantLock》 Reentra...

    Tonny 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<