资讯专栏INFORMATION COLUMN

AbstractQueuedSynchronizer理解之四(Condition)

RiverLi / 2784人阅读

摘要:总结在一开是的例子中,假设有两个线程,分别代表生产者和消费者线程,生产消费元素的队列容量为。

什么是Condition

Condition必须要和独占锁一起使用,独占锁代替了原来的synchronized,Condition代替了原来的Object中的监视器方法(wait, notify and notifyAll);一个Lock可以对应多个Condition,这样线程之间可以按照条件唤醒指定的线程,而不是简单的notifyAll多有的线程,使得我们多线程编程的时候可以灵活的控制线程。

独占锁和Condition最经典的配合使用就是ArrayBlockingQueue.java,典型的生产者消费者问题:

</>复制代码

  1. /*
  2. * Concurrency control uses the classic two-condition algorithm
  3. * found in any textbook.
  4. */
  5. /** Main lock guarding all access */
  6. final ReentrantLock lock;
  7. /** Condition for waiting takes */
  8. private final Condition notEmpty;
  9. /** Condition for waiting puts */
  10. private final Condition notFull;

这是在许多教科书中能找到的经典的双Condition算法的并发控制,需要有一个独占锁ReentrantLock,然后再定义两个Condition,notEmpty(队列不是空的)表示可以从队列中消费元素的信号条件,notFull(队列不是满的)表示可以向队列生产元素的信号条件。这两个Condition都是调用了lock.newCondition()方法实例化的。

当消费者线程调用消费方法take时:

</>复制代码

  1. public E take() throws InterruptedException {
  2. final ReentrantLock lock = this.lock;
  3. lock.lockInterruptibly();
  4. try {
  5. //当队列的元素数量为0时,调用notEmpty.await,阻塞当前的消费线程
  6. while (count == 0)
  7. notEmpty.await();
  8. //dequeue中调用了notFull.signal(),通知生产者队列还没满,可以生产
  9. return dequeue();
  10. } finally {
  11. lock.unlock();
  12. }
  13. }

当生产者线程调用生产方法put时:

</>复制代码

  1. public void put(E e) throws InterruptedException {
  2. checkNotNull(e);
  3. final ReentrantLock lock = this.lock;
  4. lock.lockInterruptibly();
  5. try {
  6. //当队列满时,调用notFull.await(),阻塞当前生产线程,停止生产
  7. while (count == items.length)
  8. notFull.await();
  9. //enqueue中调用了notEmpty.signal(),通知消费者队列里有元素,可以消费
  10. enqueue(e);
  11. } finally {
  12. lock.unlock();
  13. }
  14. }
Condition的await

在AQS中有一个ConditionObject内部类实现了Condition接口,其中有两个成员变量:

</>复制代码

  1. /** First node of condition queue. */
  2. private transient Node firstWaiter;
  3. /** Last node of condition queue. */
  4. private transient Node lastWaiter;

Condition也有一个node队列,firstWaiter、lastWaiter分别表示第一个和最后一个node。

先看await方法:

</>复制代码

  1. public final void await() throws InterruptedException {
  2. //如果线程设置中断标志,抛出中断异常
  3. if (Thread.interrupted())
  4. throw new InterruptedException();
  5. //往队列添加node
  6. Node node = addConditionWaiter();
  7. //完全释放锁,head的后继节点将被唤醒,然后被移出sync队列
  8. int savedState = fullyRelease(node);
  9. int interruptMode = 0;
  10. //判断当前节点是否在sync队列中(当condition调用signal是会将该节点放入Sync队列),如果不在就park当前线程,线程在这里开始等待被signal
  11. while (!isOnSyncQueue(node)) {
  12. LockSupport.park(this);
  13. //发送中断时(唤醒了线程)break;checkInterruptWhileWaiting中调用了transferAfterCancelledWait(贴在下面),这个方法时检测中断是发生在signal之前还是之后
  14. if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
  15. break;
  16. }
  17. //当前线程被signal后,调用acquireQueued抢占锁,如果interruptMode不为抛出异常,设置为REINTERRUPT
  18. if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
  19. interruptMode = REINTERRUPT;
  20. if (node.nextWaiter != null) // clean up if cancelled
  21. //从头到尾移除取消的节点
  22. unlinkCancelledWaiters();
  23. if (interruptMode != 0)
  24. //继续中断还是抛出异常
  25. reportInterruptAfterWait(interruptMode);
  26. }
  27. final boolean transferAfterCancelledWait(Node node) {
  28. //首先CAS设置node状态为0,如果成功说明中断发生在signal之前(因为signal会将node状态设置为0)
  29. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
  30. //将node入sync队列
  31. enq(node);
  32. return true;
  33. }
  34. /*
  35. * If we lost out to a signal(), then we can"t proceed
  36. * until it finishes its enq(). Cancelling during an
  37. * incomplete transfer is both rare and transient, so just
  38. * spin.
  39. */
  40. //如果node不在sync队列中,yield,让出cpu
  41. while (!isOnSyncQueue(node))
  42. Thread.yield();
  43. //中断发生在signal后
  44. return false;
  45. }

分析一下addConditionWaiter:

</>复制代码

  1. private Node addConditionWaiter() {
  2. Node t = lastWaiter;
  3. // If lastWaiter is cancelled, clean out.
  4. //如果最后一个node被取消,清除node
  5. if (t != null && t.waitStatus != Node.CONDITION) {
  6. unlinkCancelledWaiters();
  7. t = lastWaiter;
  8. }
  9. //新建一个node,持有当前线程,状态为CONDITION
  10. Node node = new Node(Thread.currentThread(), Node.CONDITION);
  11. if (t == null)
  12. //如果尾节点为null,说明condition队列还是空的,将新建的node作为头节点
  13. firstWaiter = node;
  14. else
  15. //如果condition队列已经存在,将新建的node作为尾节点的next
  16. t.nextWaiter = node;
  17. //将新建node设置为尾节点
  18. lastWaiter = node;
  19. //返回新建的node
  20. return node;
  21. }

在这里我们可以看到Condition的队列是一个单链表。
看一下unlinkCancelledWaiters,Condition所有操作都是在获取锁之后执行的,所以不用考虑线程安全问题:

</>复制代码

  1. private void unlinkCancelledWaiters() {
  2. Node t = firstWaiter;
  3. Node trail = null;
  4. while (t != null) {
  5. Node next = t.nextWaiter;
  6. if (t.waitStatus != Node.CONDITION) {
  7. t.nextWaiter = null;
  8. if (trail == null)
  9. firstWaiter = next;
  10. else
  11. trail.nextWaiter = next;
  12. if (next == null)
  13. lastWaiter = trail;
  14. }
  15. else
  16. trail = t;
  17. t = next;
  18. }
  19. }

该方法从队列头开始往后遍历所有node,移除已经取消的node;

在新建了node后,调用了fullyRelease:

</>复制代码

  1. final int fullyRelease(Node node) {
  2. boolean failed = true;
  3. try {
  4. //保存当前的state
  5. int savedState = getState();
  6. //release(savedState)尝试释放锁,这也是为什么叫fullyRelease
  7. if (release(savedState)) {
  8. failed = false;
  9. //返回之前保存的state
  10. return savedState;
  11. } else {
  12. throw new IllegalMonitorStateException();
  13. }
  14. } finally {
  15. if (failed)
  16. //如果失败,将当前node设置为取消状态
  17. node.waitStatus = Node.CANCELLED;
  18. }
  19. }

看一下release:

</>复制代码

  1. public final boolean release(int arg) {
  2. //尝试释放锁,这里调用的是ReentrantLock实现的tryRelease,传入的arg是当前的state,所以会释放成功,即state0
  3. if (tryRelease(arg)) {
  4. Node h = head;
  5. if (h != null && h.waitStatus != 0)
  6. //唤醒后继节点
  7. unparkSuccessor(h);
  8. return true;
  9. }
  10. return false;
  11. }

下面的方法是判断当前节点是否在Sync队列中

</>复制代码

  1. final boolean isOnSyncQueue(Node node) {
  2. //如果当前节点状态为CONDITION或者节点前驱为null,说明该节点已经在CONDITION队列中,不在Syc队列里
  3. if (node.waitStatus == Node.CONDITION || node.prev == null)
  4. return false;
  5. //如果节点后继不是null,那该节点一定在Syc队列中
  6. if (node.next != null) // If has successor, it must be on queue
  7. return true;
  8. /*
  9. * node.prev can be non-null, but not yet on queue because
  10. * the CAS to place it on queue can fail. So we have to
  11. * traverse from tail to make sure it actually made it. It
  12. * will always be near the tail in calls to this method, and
  13. * unless the CAS failed (which is unlikely), it will be
  14. * there, so we hardly ever traverse much.
  15. */
  16. //此时节点入列的CAS动作可能失败,所以要从尾部往前查找该节点再次确认
  17. return findNodeFromTail(node);
  18. }
Condition的signal

</>复制代码

  1. public final void signal() {
  2. //如果当前线程不是当前的独占线程,抛出异常
  3. if (!isHeldExclusively())
  4. throw new IllegalMonitorStateException();
  5. Node first = firstWaiter;
  6. if (first != null)
  7. //signal Condition队列的第一个节点
  8. doSignal(first);
  9. }
  10. private void doSignal(Node first) {
  11. //如果transferForSignal失败(即当前节点取消)且下一个节点存在,while继续loop
  12. do {
  13. //设置第一个节点的next为firstWaiter,此时如果firstWaiter为null,说明队列空了,将lastWaiter也设置为null
  14. if ( (firstWaiter = first.nextWaiter) == null)
  15. lastWaiter = null;
  16. //设置第一个节点next为null,help GC
  17. first.nextWaiter = null;
  18. } while (!transferForSignal(first) &&
  19. (first = firstWaiter) != null);
  20. }
  21. final boolean transferForSignal(Node node) {
  22. /*
  23. * If cannot change waitStatus, the node has been cancelled.
  24. */
  25. //如果为node设置状态失败,说明node被取消,返回false
  26. if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
  27. return false;
  28. /*
  29. * Splice onto queue and try to set waitStatus of predecessor to
  30. * indicate that thread is (probably) waiting. If cancelled or
  31. * attempt to set waitStatus fails, wake up to resync (in which
  32. * case the waitStatus can be transiently and harmlessly wrong).
  33. */
  34. //将当前node入列sync队列,返回node的前继
  35. Node p = enq(node);
  36. int ws = p.waitStatus;
  37. //如果前继的状态为取消或者设置前继状态为SIGNAL失败,当前node线程unpark
  38. if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
  39. LockSupport.unpark(node.thread);
  40. return true;
  41. }

signal后,Condition第一个节点将入列sync的队列,等待抢占到锁继续执行。

总结

在一开是的例子中,假设有两个线程P,C分别代表生产者和消费者线程,生产消费元素E的队列Q容量为1。

C无限loop调用take,当C抢占到独占锁,发现Q时空的,调用notEmpty.await(),线程C释放锁并且入列notEmpty队列park,等待别的线程调用notEmpty.signal();

P无限loop调用put,当P抢占到独占锁生产了一个E,调用notEmpty.signal()通知C,然后释放了锁;

C收到signal信号,入列SYC队列,并且unpark,尝试抢占独占锁,成功获得独占锁后,消费了一个E,然后调用notFull.signal();

P生产E时发现Q已满(C还没来得及消费),调用notFull.await()线程P释放锁并且入列notFull队列park,等待notFull.signal()通知自己unpark并入列AQS队列去抢占独占锁进行生产;

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70922.html

相关文章

  • AbstractQueuedSynchronizer理解之四Condition

    摘要:总结在一开是的例子中,假设有两个线程,分别代表生产者和消费者线程,生产消费元素的队列容量为。 什么是Condition Condition必须要和独占锁一起使用,独占锁代替了原来的synchronized,Condition代替了原来的Object中的监视器方法(wait, notify and notifyAll);一个Lock可以对应多个Condition,这样线程之间可以按照条件...

    leiyi 评论0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 实现原理

    摘要:实现原理是通过基于单链表的条件队列来管理等待线程的。中断在转移到同步队列期间或之后发生,此时表明有线程正在调用转移节点。在该种中断模式下,再次设置线程的中断状态。 1. 简介 Condition是一个接口,AbstractQueuedSynchronizer 中的ConditionObject内部类实现了这个接口。Condition声明了一组等待/通知的方法,这些方法的功能与Objec...

    leone 评论0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 实现原理

    摘要:实现原理是通过基于单链表的条件队列来管理等待线程的。中断在转移到同步队列期间或之后发生,此时表明有线程正在调用转移节点。在该种中断模式下,再次设置线程的中断状态。 1. 简介 Condition是一个接口,AbstractQueuedSynchronizer 中的ConditionObject内部类实现了这个接口。Condition声明了一组等待/通知的方法,这些方法的功能与Objec...

    李世赞 评论0 收藏0
  • AbstractQueuedSynchronizer 原理分析 - Condition 实现原理

    摘要:实现原理是通过基于单链表的条件队列来管理等待线程的。中断在转移到同步队列期间或之后发生,此时表明有线程正在调用转移节点。在该种中断模式下,再次设置线程的中断状态。 1. 简介 Condition是一个接口,AbstractQueuedSynchronizer 中的ConditionObject内部类实现了这个接口。Condition声明了一组等待/通知的方法,这些方法的功能与Objec...

    bigdevil_s 评论0 收藏0
  • AbstractQueuedSynchronizer的介绍和原理分析

    摘要:同步器拥有三个成员变量队列的头结点队列的尾节点和状态。对于同步器维护的状态,多个线程对其的获取将会产生一个链式的结构。使用将当前线程,关于后续会详细介绍。 简介提供了一个基于FIFO队列,可以用于构建锁或者其他相关同步装置的基础框架。该同步器(以下简称同步器)利用了一个int来表示状态,期望它能够成为实现大部分同步需求的基础。使用的方法是继承,子类通过继承同步器并需要实现它的方法来管理...

    Yuanf 评论0 收藏0

发表评论

0条评论

RiverLi

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<