资讯专栏INFORMATION COLUMN

ReentrantLock源码分析(待补充)

Jinkey / 1086人阅读

摘要:先分析下同步器,这个是用于锁实现的类,就用到了它的方法就是调用去做事情的。方法简单来说是构造节点,然后用的方式把这个节点加入同步器中节点链表的尾巴。通过调用同步器的方法,等待队列中的头节点线程安全地移动到同步队列。

AbstractQueuedSynchronizer

先分析下同步器AbstractQueuedSynchronizer,这个是用于锁实现的类,ReentrantLock就用到了它

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

ReentrantLocklock()方法就是调用acquire(int arg)去做事情的。
其中protected boolean tryAcquire(int arg) 是留给子类去实现的,所以这里是采用了模板设计模式。这个方法简单来说就是去获取锁。

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

addWaiter方法简单来说是构造节点,然后用CAS的方式把这个节点加入同步器中节点链表的尾巴。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果前驱结点是头节点的话,那么尝试获取锁(也就是同步状态)
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果失败的话
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}

acquireQueued()方法中,当前线程在“死循环”中尝试获取同步状态,并且只有前驱节点是头节点才能够尝试获取同步状态。

/*
* 查询是否有线程比当前线程更早地请求获取锁
*/
public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        //如果h==t,返回false,表示没有。h==t的时候要么没有等待者,要么只有一个。当只有一个的时候,
        //那么这一个节点肯定是首节点,而首节点中的线程必定是获取了锁的。
        // h!=t&&((s = h.next) == null || s.thread != Thread.currentThread())
        //如果h!=t,则进入后面的判断
        //当头节点的后继节点为null,但是这个时候tail为null(head和tail节点都是懒初始
        //化),或者头节点的后继节点不为null,但是头节点的后继节点中的线程不是当前线程,则返回true
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
}

如果等待队列为空,或者只有首节点,或者首节点的后继节点中的线程是当前线程,那么当前线程就可以去获取同步状态

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            /*
             *如果这个节点的waitStatus已经被标记为SIGNAL(-1)了的话,那么
             *这个节点就需要park,所以方法返回true
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
             /*
             * 该节点的前驱节点如果被取消了(waitStatus为1时表示取消状态,目前只有这个状态的值大于
             * 0),那么跳过前驱节点(通过死循环的方式把前驱结点的前驱结点设置为自己的前驱结点)。然
             * 后退出if条件语句,调到末尾,返回false,表示自己还可以抢救一下,可以进行获取的锁的尝
             * 试,而不是park。
             * 
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don"t park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
             /*
             * 如果以上情况都不成立的话,那么就把自己的waitStatus标记为SIGNAL,返回true,表示自己要
             * park,战略性投降。
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
    .....
    abstract static class Sync extends AbstractQueuedSynchronizer
    /**
    public ReentrantLock() {
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
    .....
}
ConditionObject await()
    /**
     * Implements interruptible condition wait.
     * 
    *
  1. If current thread is interrupted, throw InterruptedException. *
  2. Save lock state returned by {@link #getState}. *
  3. Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
  4. Block until signalled or interrupted. *
  5. Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
  6. If interrupted while blocked in step 4, throw InterruptedException. *
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); long savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { //线程将会在调用park方法后阻塞,直到被重新唤醒,从condition队列加入同步队列,从await()方 //法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中), //进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。 LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); //成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已 //经成功地获取了锁。 } /** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */ final long fullyRelease(Node node) { boolean failed = true; try { long savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { //如果释放同步状态失败,就throws exception,在finally那里还会Cancels node throw new IllegalMonitorStateException(); } } finally { //如果释放同步状态失败,就Cancels node if (failed) node.waitStatus = Node.CANCELLED; } }
signal()
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

调用Conditionsignal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。

    /**
     * Removes and transfers nodes until hit non-cancelled one or
     * null. Split out from signal in part to encourage compilers
     * to inline the case of no waiters.
     * @param first (non-null) the first node on condition queue
     */
    private void doSignal(Node first) {
        do {
            //当没有waiters时进行的一些优化,以便编译器进行方法内联,transferForSignal方法才是重点
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
       把节点从condition队列“传输”到同步队列去。”传输“成功或者在唤醒前节点已经取消,就返回true。
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         * 通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
            LockSupport.unpark(node.thread);
        return true;
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68800.html

相关文章

  • Java 重入锁 ReentrantLock 原理分析

    摘要:的主要功能和关键字一致,均是用于多线程的同步。而仅支持通过查询当前线程是否持有锁。由于和使用的是同一把可重入锁,所以线程可以进入方法,并再次获得锁,而不会被阻塞住。公平与非公平公平与非公平指的是线程获取锁的方式。 1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似。所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻...

    lx1036 评论0 收藏0
  • 深入分析AQS实现原理

    摘要:更新成功返回,否则返回这个操作是原子的,不会出现线程安全问题,这里面涉及到这个类的操作,一级涉及到这个属性的意义。 简单解释一下J.U.C,是JDK中提供的并发工具包,java.util.concurrent。里面提供了很多并发编程中很常用的实用工具类,比如atomic原子操作、比如lock同步锁、fork/join等。 从Lock作为切入点 我想以lock作为切入点来讲解AQS,毕竟...

    sewerganger 评论0 收藏0
  • 图解AQS原理之ReentrantLock详解-公平锁

    摘要:概述前面已经讲解了关于的非公平锁模式,关于非公平锁,内部其实告诉我们谁先争抢到锁谁就先获得资源,下面就来分析一下公平锁内部是如何实现公平的如果没有看过非公平锁的先去了解下非公平锁,因为这篇文章前面不会讲太多内部结构,直接会对源码进行分析前文 概述 前面已经讲解了关于AQS的非公平锁模式,关于NonfairSync非公平锁,内部其实告诉我们谁先争抢到锁谁就先获得资源,下面就来分析一下公平...

    Taonce 评论0 收藏0
  • J.U.C|可重入锁ReentrantLock

    摘要:二什么是重入锁可重入锁,顾名思义,支持重新进入的锁,其表示该锁能支持一个线程对资源的重复加锁。将由最近成功获得锁,并且还没有释放该锁的线程所拥有。可以使用和方法来检查此情况是否发生。 一、写在前面 前几篇我们具体的聊了AQS原理以及底层源码的实现,具体参见 《J.U.C|一文搞懂AQS》《J.U.C|同步队列(CLH)》《J.U.C|AQS独占式源码分析》《J.U.C|AQS共享式源...

    wangdai 评论0 收藏0
  • 线程间的同步与通信(5)——ReentrantLock源码分析

    摘要:之所以使用这种方式是因为在恢复一个被挂起的线程与该线程真正运行之间存在着严重的延迟。这样我们就可以把线程恢复运行的这段时间给利用起来了,结果就是线程更早的获取了锁,线程获取锁的时刻也没有推迟。 前言 系列文章目录 上一篇 我们学习了lock接口,本篇我们就以ReentrantLock为例,学习一下Lock锁的基本的实现。我们先来看看Lock接口中的方法与ReentrantLock对其实...

    susheng 评论0 收藏0

发表评论

0条评论

Jinkey

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<