资讯专栏INFORMATION COLUMN

AbstractQueuedSynchronizer原理剖析

vslam / 3431人阅读

摘要:无论是公平锁还是非公平锁,它们的实现都依赖于,它提供了一个基于先进先出等待队列实现和的框架。特性如下仅通过一个类型来代表状态。等唤醒的时候,重新获取锁,并清掉中的线程。

无论是公平锁还是非公平锁,它们的实现都依赖于AbstractQueuedSynchronizer,它提供了一个基于先进先出等待队列 实现block locks和synchronizers的框架。特性如下

仅通过一个 int 类型来代表状态。对于ReentrantLock而言,他就是线程持有锁的次数,当次数为0时,代表锁没有被持有,正数代表被持有的次数,负数则是超出了锁的持有范围,有可能存在死循环

支持独占模式(默认的)和共享模式。在独占模式中,去获取一个已经被其它线程拥有的锁只会失败,共享模式中,则多个线程是可以成功的。

lock()原理

当ReentrantLock获取锁失败时,会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

private Node addWaiter(Node mode) { 
    Node node = new Node(Thread.currentThread(), mode); //创建一个节点,存储当前的线程,以及锁持有的模式,对于 ReentrantLock来说就是 独占 型
    // Try the fast path of enq; backup to full enq on failure    
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {//CAS操作,如果当前的尾部节点没有被其它线程更改,那么把新的节点设置成队列的尾部
            pred.next = node;
            return node;
        }
    }
    enq(node);//首次入队
    return node;
}

获取失败进行入队操作,首先就是往队列中添加一个正在等待的节点Node


从Node本身的结构可以看到,AQS(AbstractQueuedSynchronizer)本身就维护了一个双向链表,用来存放等待中的线程。链表的每个节点,代表那个线程,是独占还是共享锁。
创建好节点之后,便执行入队操作,对于首次创建队列

private Node enq(final Node node) {
    for (;;) {
       //借助CAS机制实现无锁操作,所以需要一直执行直到CAS成功
        Node t = tail;
        if (t == null) { // 初始化发生在第一次创建队列,这样的好处是,当竞争不激烈的时候,实际上也就不会发生这些操作,性能也会好些
             if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

可以看到,入队也就是从队尾插入新的等待线程,入队完毕,也就开始去进行不断的尝试,直到获取锁成功,可以看到,对于lock来说,其实已经是阻塞了

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor(); //优先执行当前节点的前一个节点
            if (p == head && tryAcquire(arg)) { //仅当当前节点的前一个节点是head,才去获取线程,这里可以看出其实先等待的线程是会优先处理,也就是FIFO原则
                setHead(node); 
                p.next = null; // help GC    ,释放掉当前线程在队列中的引用,也可以看做’出队"了            
                failed = false;
                //执行到这里说明获取锁成功
                return interrupted;
            }
            //执行到这里说明存在竞争,有多个线程都在等待一个锁
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt()) //里面会对当前线程执行中断,当被唤醒时,继续循环
                    //如果线程被中断,设置中断标记,区别于 doAcquireInterruptibly,doAcquireInterruptibly是直接抛出异常,这也就是 lockInterruptibly能够抛出中断的原因
                    interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
从这里可以看到,无论锁是公平锁还是非公平锁,只要被放入了等待队列,此时的执行依然是谁先等待就先执行谁  ,非公平锁体现在新来的线程会无视已经等了的线程,可以优先去抢锁,所以公平体现在第一次参与抢锁的线程会去等待已经在等待队列中的线程,非公平并不是说从已经在等待的线程队列里面随便选一个

shouldParkAfterFailedAcquire的源码如下

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; //查看前一个节点的等待状态
    if (ws == Node.SIGNAL)
            //已经尝试过获取锁,可以执行park了
            return true;
    if (ws > 0) {   
        do {
            //去掉队列中所有已经取消的线程
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {             
        //此时当前线程的前一个节点的等待状态必定是0或者PROGATE,这表明当前线程在park之前可以再尝试一次去获取锁,也就是说前一个节点可能刚获取到SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

waitStatus:等待的状态,共有5种

SIGNAL:,表明它的前一个节点需要执行 unparking;

CANCELLED:当前节点保存的线程由于超时或者中断被取消了;

CONDITION:接档正处于条件队列中,执行了await;

PROPAGATE:一个共享的锁需要传递释放信号到其它节点

0:非上述4中状态,有可能是刚获取signal,此时它的值是0,也有可能是新建的head节点

parkAndCheckInterrupt主要是park当前线程

private final boolean parkAndCheckInterrupt() {
    //当获取不到许可时,阻塞线程,解除阻塞状态的情况如下:
    //1 某个线程对这个线程调用了unpark方法
    //2 某个线程中断了这个线程
    //3 这个方法毫无理由的返回了 [park比较奇特的地方],基于这样,调用的时候必须去判断park的条件,以及当它返回的时候,去设置中断的状态
    LockSupport.park(this); 
    //返回线程的中断状态
    return Thread.interrupted();
}

至此lock()执行结束

unlock()原理

当执行unlock时,ReentrentLock执行对应的Release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        //执行这里表示所已经被释放,可以让它的下一个节点来抢锁
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h); //h.waitStatus == 0 表示还没有执行park,自然不需要unpark
        return true;
    }
    return false;
}

如果release成功,即当前线程持有的所有锁都已经释放,那么就可以执行 unparkSuccessor,从源码可以看到,unpark是从头部开始进行的,结合lock的原理,可知AQS本身就是一个先进先出的队列
unparkSuccessor源码如下

private void unparkSuccessor(Node node) {
     int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

     Node s = node.next; //有可能线程被取消了,所以从尾部往前遍历,到最近一个没有被取消的线程
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t; //确保线程没有取消
    }
    if (s != null)
        LockSupport.unpark(s.thread); //恢复线程
}

至此unlock()完毕

await的原理
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();//当前线程已经中断了,抛出中断异常
    //添加一个新的waiter到condition queue中,这个新的Node的waitStatus会被标记为CONDITION
    Node node = addConditionWaiter(); 
    //释放当前线程拥有的锁,即从sync queue中去掉当前线程
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
    //如果当前线程不在持有锁的队列里头,对他进行休眠,当其它线程执行 unlock的时候,释放锁,就会执行unpark操作,此时它会被唤醒,唤醒后,如果它在syn队列里头,开始继续往下执行。(这个插入操作则是由signal完成)
        LockSupport.park(this); 
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;//等待的过程中线程中断了,退出
    }
//重新竞争锁,相当于执行了lock操作
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    //再次去获取锁,如果当前的线程在park的时候是被中断了,并且ConditionObject并不是由于中断返回,这里再次标记为中断
        interruptMode = REINTERRUPT;  
    if (node.nextWaiter != null) 
    //清除非Condition模式的线程,而在signal中有先关操作将conditon的线程设置成非condition
         unlinkCancelledWaiters();
    if (interruptMode != 0)
    //上报等待的过程中发生了中断,如果是要抛出中断,就抛出,否则再次执行中断
        reportInterruptAfterWait(interruptMode); 
}

isOnSyncQueue源码如下

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)
    //node本身是调用了 await 方法,或者没有在获取锁的队列里头,[如果在里头必定有一个前置的节点]
        return false; 
    if (node.next != null) 
    //当前节点存在下一个节点,那么它肯定是执行过 enq ,即获取过锁
        return true; 
    // CAS失败的时候,有可能 node.rev是没有的,因此需要从头到尾遍历一次
   return findNodeFromTail(node); 
}

checkInterruptWhileWaiting源码如下

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    //线程中断重新获取锁,并且设置waitStatus为0,以便后续线程从condition queue清除
        enq(node); 
        return true;
    }
    while (!isOnSyncQueue(node))
    //如果CAS失败,只要当前节点没有在Sync queue中,那么一直自旋,每次都会交出执行权限
        Thread.yield(); 
    return false;
}

可以看到,await其实就是释放线程原有的锁,并把它放入conditon队列中,然后执行阻塞。等唤醒的时候,重新获取锁,并清掉condition queue中的线程。 至此await执行结束

singnal的原理
public final void signal() {
    if (!isHeldExclusively()) //只有当前线程持有了锁,才能释放
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);//优先释放队列头的,也就是等待时间最长的condition node
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
//将节点从condition queue转移到sync queue
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false; //设置为非等待失败,则不继续转移
//CAS设置等待状态为0成功
Node p = enq(node); //新节点放入sync queue,并返回原来的尾部节点,也就是新节点的前一个节点
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) //参考shouldParkAfterFailedAcquire
        LockSupport.unpark(node.thread);//如果当前节点的前一个节点线程已经取消,或者将当前节点的前一个节点线程的waitStatus设置成SIGNAL失败,则直接唤醒当前线程
    return true;
}

可以看到signal最关键的信息就是去掉等待队列中的CONDITION状态,并将线程加入sync队列,至此signal结束

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/72416.html

相关文章

  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    supernavy 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    ddongjian0000 评论0 收藏0
  • 高并发

    摘要:表示的是两个,当其中任意一个计算完并发编程之是线程安全并且高效的,在并发编程中经常可见它的使用,在开始分析它的高并发实现机制前,先讲讲废话,看看它是如何被引入的。电商秒杀和抢购,是两个比较典型的互联网高并发场景。 干货:深度剖析分布式搜索引擎设计 分布式,高可用,和机器学习一样,最近几年被提及得最多的名词,听名字多牛逼,来,我们一步一步来击破前两个名词,今天我们首先来说说分布式。 探究...

    wangdai 评论0 收藏0
  • Java多线程进阶(七)—— J.U.C之locks框架:AQS独占功能剖析(2)

    摘要:开始获取锁终于轮到出场了,的调用过程和完全一样,同样拿不到锁,然后加入到等待队列队尾然后,在阻塞前需要把前驱结点的状态置为,以确保将来可以被唤醒至此,的执行也暂告一段落了安心得在等待队列中睡觉。 showImg(https://segmentfault.com/img/remote/1460000016012467); 本文首发于一世流云的专栏:https://segmentfault...

    JayChen 评论0 收藏0
  • JAVA并发编程之-ReentrantLock锁原理解读

    摘要:作者毕来生微信锁状态转换分类以后帮助我们提供了线程同步机制,通过显示定义同步锁来实现对象之间的同步。等待重新尝试因为在中是用关键字声明的,故可以在线程间可见再次判断一下能否持有锁可能线程同步代码执行得比较快,已经释放了锁,不可以就返回。 作者 : 毕来生微信: 878799579 锁状态转换 showImg(https://segmentfault.com/img/remote/...

    荆兆峰 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<