Condition必须要和独占锁一起使用,独占锁代替了原来的synchronized,Condition代替了原来的Object中的监视器方法(wait, notify and notifyAll);一个Lock可以对应多个Condition,这样线程之间可以按照条件唤醒指定的线程,而不是简单的notifyAll多有的线程,使得我们多线程编程的时候可以灵活的控制线程。
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列的元素数量为0时,调用notEmpty.await,阻塞当前的消费线程 while (count == 0) notEmpty.await(); //dequeue中调用了notFull.signal(),通知生产者队列还没满,可以生产 return dequeue(); } finally { lock.unlock(); } }
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //当队列满时,调用notFull.await(),阻塞当前生产线程,停止生产 while (count == items.length) notFull.await(); //enqueue中调用了notEmpty.signal(),通知消费者队列里有元素,可以消费 enqueue(e); } finally { lock.unlock(); } }Condition的await
/** First node of condition queue. */ private transient Node firstWaiter; /** Last node of condition queue. */ private transient Node lastWaiter;
public final void await() throws InterruptedException { //如果线程设置中断标志,抛出中断异常 if (Thread.interrupted()) throw new InterruptedException(); //往队列添加node Node node = addConditionWaiter(); //完全释放锁,head的后继节点将被唤醒,然后被移出sync队列 int savedState = fullyRelease(node); int interruptMode = 0; //判断当前节点是否在sync队列中(当condition调用signal是会将该节点放入Sync队列),如果不在就park当前线程,线程在这里开始等待被signal while (!isOnSyncQueue(node)) { LockSupport.park(this); //发送中断时(唤醒了线程)break;checkInterruptWhileWaiting中调用了transferAfterCancelledWait(贴在下面),这个方法时检测中断是发生在signal之前还是之后 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //当前线程被signal后,调用acquireQueued抢占锁,如果interruptMode不为抛出异常,设置为REINTERRUPT if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled //从头到尾移除取消的节点 unlinkCancelledWaiters(); if (interruptMode != 0) //继续中断还是抛出异常 reportInterruptAfterWait(interruptMode); } final boolean transferAfterCancelledWait(Node node) { //首先CAS设置node状态为0,如果成功说明中断发生在signal之前(因为signal会将node状态设置为0) if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //将node入sync队列 enq(node); return true; } /* * If we lost out to a signal(), then we can"t proceed * until it finishes its enq(). Cancelling during an * incomplete transfer is both rare and transient, so just * spin. */ //如果node不在sync队列中,yield,让出cpu while (!isOnSyncQueue(node)) Thread.yield(); //中断发生在signal后 return false; }
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. //如果最后一个node被取消,清除node if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //新建一个node,持有当前线程,状态为CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //如果尾节点为null,说明condition队列还是空的,将新建的node作为头节点 firstWaiter = node; else //如果condition队列已经存在,将新建的node作为尾节点的next t.nextWaiter = node; //将新建node设置为尾节点 lastWaiter = node; //返回新建的node return node; }
private void unlinkCancelledWaiters() { Node t = firstWaiter; Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; if (trail == null) firstWaiter = next; else trail.nextWaiter = next; if (next == null) lastWaiter = trail; } else trail = t; t = next; } }
final int fullyRelease(Node node) { boolean failed = true; try { //保存当前的state int savedState = getState(); //release(savedState)尝试释放锁,这也是为什么叫fullyRelease if (release(savedState)) { failed = false; //返回之前保存的state return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) //如果失败,将当前node设置为取消状态 node.waitStatus = Node.CANCELLED; } }
public final boolean release(int arg) { //尝试释放锁,这里调用的是ReentrantLock实现的tryRelease,传入的arg是当前的state,所以会释放成功,即state为0 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继节点 unparkSuccessor(h); return true; } return false; }
final boolean isOnSyncQueue(Node node) { //如果当前节点状态为CONDITION或者节点前驱为null,说明该节点已经在CONDITION队列中,不在Syc队列里 if (node.waitStatus == Node.CONDITION || node.prev == null) return false; //如果节点后继不是null,那该节点一定在Syc队列中 if (node.next != null) // If has successor, it must be on queue return true; /* * node.prev can be non-null, but not yet on queue because * the CAS to place it on queue can fail. So we have to * traverse from tail to make sure it actually made it. It * will always be near the tail in calls to this method, and * unless the CAS failed (which is unlikely), it will be * there, so we hardly ever traverse much. */ //此时节点入列的CAS动作可能失败,所以要从尾部往前查找该节点再次确认 return findNodeFromTail(node); }Condition的signal
public final void signal() { //如果当前线程不是当前的独占线程,抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //signal Condition队列的第一个节点 doSignal(first); } private void doSignal(Node first) { //如果transferForSignal失败(即当前节点取消)且下一个节点存在,while继续loop do { //设置第一个节点的next为firstWaiter,此时如果firstWaiter为null,说明队列空了,将lastWaiter也设置为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; //设置第一个节点next为null,help GC first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ //如果为node设置状态失败,说明node被取消,返回false if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ //将当前node入列sync队列,返回node的前继 Node p = enq(node); int ws = p.waitStatus; //如果前继的状态为取消或者设置前继状态为SIGNAL失败,当前node线程unpark if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
