资讯专栏INFORMATION COLUMN

AbstractQueuedSynchronizer理解之二(CountDownLatch)

张宪坤 / 2510人阅读

摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。

本文分析一下CountDownLatch是如何运用AQS的

CountDownLatch是什么

CountDownLatch顾名思义它是一个Latch(门闩),它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当state为0时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。

CountDownLatch小栗子

</>复制代码

  1. public static void main(String[] args) throws InterruptedException{
  2. int threadSize = 3;
  3. CountDownLatch doneSignal = new CountDownLatch(threadSize);
  4. for (int i = 1; i <= threadSize; i++) {
  5. final int threadNum = i;
  6. new Thread(() -> {
  7. System.out.println("thread" + threadNum + ":start");
  8. try {
  9. Thread.sleep(1000 * threadNum);
  10. } catch (InterruptedException e) {
  11. System.out.println("thread" + threadNum + ":exception");
  12. }
  13. doneSignal.countDown();
  14. System.out.println("thread" + threadNum + ":complete");
  15. }).start();
  16. }
  17. System.out.println("main thread:await");
  18. doneSignal.await();
  19. System.out.println("main thread:go on");
  20. }

例子中主线程启动了三条子线程,睡眠一段时间,此时主线程在等待所有子线程结束后才会继续执行下去;
看一下输出结果:

</>复制代码

  1. main thread:await
  2. thread1:start
  3. thread2:start
  4. thread3:start
  5. thread1:complete
  6. thread2:complete
  7. thread3:complete
  8. main thread:go on
  9. Process finished with exit code 0
CountDownLatch原理分析

既然CountDownLatch也是AQS的一种使用方式,我们看一下它的内部类Syc是怎么实现AQS的:

</>复制代码

  1. private static final class Sync extends AbstractQueuedSynchronizer {
  2. private static final long serialVersionUID = 4982264981922014374L;
  3. //构造函数,初始化同步状态state的值,即线程个数
  4. Sync(int count) {
  5. setState(count);
  6. }
  7. int getCount() {
  8. return getState();
  9. }
  10. //这里重写了方法,在共享模式下,告诉调用者是否可以抢占state锁了,正数代表可以,负数代表否定;当state0时返回正数
  11. protected int tryAcquireShared(int acquires) {
  12. return (getState() == 0) ? 1 : -1;
  13. }
  14. //共享模式下释放锁
  15. protected boolean tryReleaseShared(int releases) {
  16. // Decrement count; signal when transition to zero
  17. for (;;) {
  18. int c = getState();
  19. //state为0时说明没有什么可释放
  20. if (c == 0)
  21. return false;
  22. int nextc = c-1;
  23. if (compareAndSetState(c, nextc))
  24. //CAS对state操作成功后返回state值是否为0,为0则释放成功
  25. return nextc == 0;
  26. }
  27. }
  28. }

看完了重写的AQS同步器后,我们了解了CountDownLatch对state锁的描述。接下来先看主线程调用的await方法,在await方法里调用了AQS的acquireSharedInterruptibly:

</>复制代码

  1. //在共享模式下尝试抢占锁
  2. public final void acquireSharedInterruptibly(int arg)
  3. throws InterruptedException {
  4. //线程中断抛出异常
  5. if (Thread.interrupted())
  6. throw new InterruptedException();
  7. //尝试抢占前先查询一下是否可以抢占,如果返回值大于0程序往下执行,小于0则等待
  8. if (tryAcquireShared(arg) < 0)
  9. doAcquireSharedInterruptibly(arg);
  10. }
  11. private void doAcquireSharedInterruptibly(int arg)
  12. throws InterruptedException {
  13. //在Reentrant解析中我们看过,往队列中新增node(共享模式)
  14. final Node node = addWaiter(Node.SHARED);
  15. boolean failed = true;
  16. try {
  17. for (;;) {
  18. final Node p = node.predecessor();
  19. if (p == head) {
  20. //如果当前node的前继时head,马上尝试抢占锁
  21. int r = tryAcquireShared(arg);
  22. if (r >= 0) {
  23. //如果state==0即允许往下执行,重新设置head并往下传播信号
  24. setHeadAndPropagate(node, r);
  25. p.next = null; // help GC
  26. failed = false;
  27. //得到往下执行的允许
  28. return;
  29. }
  30. }
  31. //以下都跟Reentrant一样
  32. if (shouldParkAfterFailedAcquire(p, node) &&
  33. parkAndCheckInterrupt())
  34. throw new InterruptedException();
  35. }
  36. } finally {
  37. if (failed)
  38. cancelAcquire(node);
  39. }
  40. }
  41. private void setHeadAndPropagate(Node node, int propagate) {
  42. Node h = head; // Record old head for check below
  43. //将当前node设置为head,清空node的thread、prev
  44. setHead(node);
  45. /*
  46. * Try to signal next queued node if:
  47. * Propagation was indicated by caller,
  48. * or was recorded (as h.waitStatus either before
  49. * or after setHead) by a previous operation
  50. * (note: this uses sign-check of waitStatus because
  51. * PROPAGATE status may transition to SIGNAL.)
  52. * and
  53. * The next node is waiting in shared mode,
  54. * or we don"t know, because it appears null
  55. *
  56. * The conservatism in both of these checks may cause
  57. * unnecessary wake-ups, but only when there are multiple
  58. * racing acquires/releases, so most need signals now or soon
  59. * anyway.
  60. */
  61. //如果propagate大于0,或者原来head的等待状态小于0或者现在head的等待状态小于0
  62. if (propagate > 0 || h == null || h.waitStatus < 0 ||
  63. (h = head) == null || h.waitStatus < 0) {
  64. Node s = node.next;
  65. //准备唤醒下一个节点
  66. if (s == null || s.isShared())
  67. doReleaseShared();
  68. }
  69. }
  70. private void doReleaseShared() {
  71. /*
  72. * Ensure that a release propagates, even if there are other
  73. * in-progress acquires/releases. This proceeds in the usual
  74. * way of trying to unparkSuccessor of head if it needs
  75. * signal. But if it does not, status is set to PROPAGATE to
  76. * ensure that upon release, propagation continues.
  77. * Additionally, we must loop in case a new node is added
  78. * while we are doing this. Also, unlike other uses of
  79. * unparkSuccessor, we need to know if CAS to reset status
  80. * fails, if so rechecking.
  81. */
  82. for (;;) {
  83. Node h = head;
  84. if (h != null && h != tail) {
  85. int ws = h.waitStatus;
  86. if (ws == Node.SIGNAL) {
  87. //如果head的状态为SIGNAL,更改状态为0
  88. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  89. continue; // loop to recheck cases
  90. //唤醒后继节点
  91. unparkSuccessor(h);
  92. }
  93. //如果head状态为0,更改状态为PROPAGATE
  94. else if (ws == 0 &&
  95. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  96. continue; // loop on failed CAS
  97. }
  98. //如果head没有改变,结束当前loop,如果遇到head被别的线程改变,继续loop
  99. if (h == head) // loop if head changed
  100. break;
  101. }
  102. }

释放锁的信号一直向后传播,直到所有node被唤醒并继续执行,那第一个信号时何时发起的呢?我们来看一下CountDownLatch的countDown方法,该方法调用了sync的releaseShared方法:

</>复制代码

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. //如果同步状态state为0时,调用doReleaseShared,在这里就发出了第一个唤醒所有等待node的信号,然后信号自动往后传播
  4. doReleaseShared();
  5. return true;
  6. }
  7. return false;
  8. }
总结

CountDownLatch在调用await的时候判断state释放为0,如果大于0则阻塞当前线程,将当前线程的node添加到队列中等待;在调用countDown时当遇到state减到0时,发出释放共享锁的信号,从头节点的后记节点开始往后传递信号,将队列等待的线程逐个唤醒并继续往下执行;
在这里state跟Reentrant的state独占锁含义不同,state的含义是由AQS的子类去描述的。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70916.html

相关文章

  • AbstractQueuedSynchronizer理解之二CountDownLatch

    摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。 本文分析一下CountDownLatch是如何运用AQS的 CountDownLatch是什么 CountDownLatch顾名思义它是一个Latch(门闩),它...

    shery 评论0 收藏0
  • AbstractQueuedSynchronizer理解之二CountDownLatch

    摘要:本文分析一下是如何运用的是什么顾名思义它是一个门闩,它是用一个计数器实现的,初始状态计数器的数值等于线程数,每当有线程完成任务后,计数器就会减一。当为时,锁就会被释放,凡是之前因抢占锁而等待的线程这时候就会被唤醒继续抢占锁。 本文分析一下CountDownLatch是如何运用AQS的 CountDownLatch是什么 CountDownLatch顾名思义它是一个Latch(门闩),它...

    greatwhole 评论0 收藏0
  • AbstractQueuedSynchronizer理解之三(Semaphore)

    摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...

    MingjunYang 评论0 收藏0
  • AbstractQueuedSynchronizer理解之三(Semaphore)

    摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...

    马忠志 评论0 收藏0
  • AbstractQueuedSynchronizer理解之三(Semaphore)

    摘要:信号可以理解为一种许可,拿到许可的线程才可以继续执行。的计数器其实记录的就是许可的数量,当许可数量为时,方法就会阻塞。 本文接着分析Semaphore的实现原理 Semaphore是什么 Semaphore是一个计数信号量。Semaphore(信号)可以理解为一种许可,拿到许可的线程才可以继续执行。Semaphore的计数器其实记录的就是许可的数量,当许可数量为0时,acquire方法...

    zollero 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<