摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。
本文分析一下CountDownLatch是如何运用AQS的
CountDownLatch是什么CountDownLatch顾名思义它是一个Latch(门闩),它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当state为0时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。
CountDownLatch小栗子public static void main(String[] args) throws InterruptedException{ int threadSize = 3; CountDownLatch doneSignal = new CountDownLatch(threadSize); for (int i = 1; i <= threadSize; i++) { final int threadNum = i; new Thread(() -> { System.out.println("thread" + threadNum + ":start"); try { Thread.sleep(1000 * threadNum); } catch (InterruptedException e) { System.out.println("thread" + threadNum + ":exception"); } doneSignal.countDown(); System.out.println("thread" + threadNum + ":complete"); }).start(); } System.out.println("main thread:await"); doneSignal.await(); System.out.println("main thread:go on"); }
例子中主线程启动了三条子线程,睡眠一段时间,此时主线程在等待所有子线程结束后才会继续执行下去;
看一下输出结果:
main thread:await thread1:start thread2:start thread3:start thread1:complete thread2:complete thread3:complete main thread:go on Process finished with exit code 0CountDownLatch原理分析
既然CountDownLatch也是AQS的一种使用方式,我们看一下它的内部类Syc是怎么实现AQS的:
private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; //构造函数,初始化同步状态state的值,即线程个数 Sync(int count) { setState(count); } int getCount() { return getState(); } //这里重写了方法,在共享模式下,告诉调用者是否可以抢占state锁了,正数代表可以,负数代表否定;当state为0时返回正数 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } //共享模式下释放锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); //state为0时说明没有什么可释放 if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) //CAS对state操作成功后返回state值是否为0,为0则释放成功 return nextc == 0; } } }
看完了重写的AQS同步器后,我们了解了CountDownLatch对state锁的描述。接下来先看主线程调用的await方法,在await方法里调用了AQS的acquireSharedInterruptibly:
//在共享模式下尝试抢占锁 public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //线程中断抛出异常 if (Thread.interrupted()) throw new InterruptedException(); //尝试抢占前先查询一下是否可以抢占,如果返回值大于0程序往下执行,小于0则等待 if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //在Reentrant解析中我们看过,往队列中新增node(共享模式) final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { //如果当前node的前继时head,马上尝试抢占锁 int r = tryAcquireShared(arg); if (r >= 0) { //如果state==0即允许往下执行,重新设置head并往下传播信号 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; //得到往下执行的允许 return; } } //以下都跟Reentrant一样 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below //将当前node设置为head,清空node的thread、prev setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus either before * or after setHead) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don"t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ //如果propagate大于0,或者原来head的等待状态小于0或者现在head的等待状态小于0 if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; //准备唤醒下一个节点 if (s == null || s.isShared()) doReleaseShared(); } } private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { //如果head的状态为SIGNAL,更改状态为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases //唤醒后继节点 unparkSuccessor(h); } //如果head状态为0,更改状态为PROPAGATE else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } //如果head没有改变,结束当前loop,如果遇到head被别的线程改变,继续loop if (h == head) // loop if head changed break; } }
释放锁的信号一直向后传播,直到所有node被唤醒并继续执行,那第一个信号时何时发起的呢?我们来看一下CountDownLatch的countDown方法,该方法调用了sync的releaseShared方法:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { //如果同步状态state为0时,调用doReleaseShared,在这里就发出了第一个唤醒所有等待node的信号,然后信号自动往后传播 doReleaseShared(); return true; } return false; }总结
CountDownLatch在调用await的时候判断state释放为0,如果大于0则阻塞当前线程,将当前线程的node添加到队列中等待;在调用countDown时当遇到state减到0时,发出释放共享锁的信号,从头节点的后记节点开始往后传递信号,将队列等待的线程逐个唤醒并继续往下执行;
在这里state跟Reentrant的state独占锁含义不同,state的含义是由AQS的子类去描述的。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/76307.html
摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。 本文分析一下CountDownLatch是如何运用AQS的 CountDownLatch是什么 CountDownLatch顾名思义它是一个Latch(门闩),它...
摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。 本文分析一下CountDownLatch是如何运用AQS的 CountDownLatch是什么 CountDownLatch顾名思义它是一个Latch(门闩),它...
摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...
摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...
摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...
阅读 2641·2023-04-25 18:10
阅读 1552·2019-08-30 15:53
阅读 2713·2019-08-30 13:10
阅读 3178·2019-08-29 18:40
阅读 1093·2019-08-23 18:31
阅读 1175·2019-08-23 16:49
阅读 3373·2019-08-23 16:07
阅读 853·2019-08-23 15:27