摘要:最后一直调用函数判断节点是否被转移到队列上,也就是中等待获取锁的队列。这样的话,函数中调用函数就会返回,导致函数进入最后一步重新获取锁的状态。函数其实就做了一件事情,就是不断尝试调用函数,将队首的一个节点转移到队列中,直到转移成功。
我在前段时间写了一篇关于AQS源码解析的文章
AbstractQueuedSynchronizer超详细原理解析,在文章里边我说JUC包中的大部分多线程相关的类都和AQS相关,今天我们就学习一下依赖于AQS来实现的阻塞队列BlockingQueue的实现原理。本文中的源码未加说明即来自于以ArrayBlockingQueue。
相信大多数同学在学习线程池时会了解阻塞队列的概念,熟记各种类型的阻塞队列对线程池初始化的影响。当从阻塞队列获取元素但是队列为空时,当前线程会阻塞直到另一个线程向阻塞队列中添加一个元素;类似的,当向一个阻塞队列加入元素时,如果队列已经满了,当前线程也会阻塞直到另外一个线程从队列中读取一个元素。阻塞队列一般都是先进先出的,用来实现生产者和消费者模式。当发生上述两种情况时,阻塞队列有四种不同的处理方式,这四种方式分别为抛出异常,返回特殊值(null或在是false),阻塞当前线程直到执行结束,最后一种是只阻塞固定时间,到时后还无法执行成功就放弃操作。这些方法都总结在下边这种表中了。
我们就只分析put和take方法。
put和take函数我们都知道,使用同步队列可以很轻松的实现生产者-消费者模式,其实,同步队列就是按照生产者-消费者的模式来实现的,我们可以将put函数看作生产者的操作,take是消费者的操作。
我们首先看一下ArrayListBlock的构造函数。它初始化了put和take函数中使用到的关键成员变量,分别是ReentrantLock和Condition。
public ArrayBlockingQueue(int capacity, boolean fair) { this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
ReentrantLock是AQS的子类,其newCondition函数返回的Condition接口实例是定义在AQS类内部的ConditionObject实现类。它可以直接调用AQS相关的函数。
put函数会在队列末尾添加元素,如果队列已经满了,无法添加元素的话,就一直阻塞等待到可以加入为止。函数的源码如下所示。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //先获得锁 try { while (count == items.length) //如果队列满了,就NotFull这个Condition对象上进行等待 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //这里可以注意的是ArrayBlockingList实际上使用Array实现了一个环形数组, //当putIndex达到最大时,就返回到起点,继续插入, //当然,如果此时0位置的元素还没有被取走, //下次put时,就会因为cout == item.length未被阻塞。 if (++putIndex == items.length) putIndex = 0; count++; //因为插入了元素,通知等待notEmpty事件的线程。 notEmpty.signal(); }
我们会发现put函数使用了wait/notify的机制。与一般生产者-消费者的实现方式不同,同步队列使用ReentrantLock和Condition相结合的先获得锁,再等待的机制;而不是Synchronized和Object.wait的机制。这里的区别我们下一节再详细讲解。
看完了生产者相关的put函数,我们再来看一下消费者调用的take函数。take函数在队列为空时会被阻塞,一直到阻塞队列加入了新的元素。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //如果队列为空,那么在notEmpty对象上等待, //当put函数调用时,会调用notEmpty的notify进行通知。 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { E x = (E) items[takeIndex]; items[takeIndex] = null; //取出takeIndex位置的元素 if (++takeIndex == items.length) //如果到了尾部,将指针重新调整到头部 takeIndex = 0; count--; .... //通知notFull对象上等待的线程 notFull.signal(); return x; }await操作
我们发现ArrayBlockingList并没有使用Object.wait,而是使用的Condition.await,这是为什么呢?其中又有哪些原因呢?
Condition对象可以提供和Object的wait和notify一样的行为,但是后者必须先获取synchronized这个内置的monitor锁,才能调用;而Condition则必须先获取ReentrantLock。这两种方式在阻塞等待时都会将相应的锁释放掉,但是Condition的等待可以中断,这是二者唯一的区别。
我们先来看一下Condition的wait函数,wait函数的流程大致如下图所示。
wait函数主要有三个步骤。一是调用addConditionWaiter 函数,在condition wait queue队列中添加一个节点,代表当前线程在等待一个消息。然后调用fullyRelease函数,将持有的锁释放掉,调用的是AQS的函数,不清楚的同学可以查看本篇开头的介绍的文章。最后一直调用isOnSyncQueue函数判断节点是否被转移到sync queue队列上,也就是AQS中等待获取锁的队列。如果没有,则进入阻塞状态,如果已经在队列上,则调用acquireQueued函数重新获取锁。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //在condition wait队列上添加新的节点 Node node = addConditionWaiter(); //释放当前持有的锁 int savedState = fullyRelease(node); int interruptMode = 0; //由于node在之前是添加到condition wait queue上的,现在判断这个node //是否被添加到Sync的获得锁的等待队列上,Sync就是AQS的子类 //node在condition queue上说明还在等待事件的notify, //notify函数会将condition queue 上的node转化到Sync的队列上。 while (!isOnSyncQueue(node)) { //node还没有被添加到Sync Queue上,说明还在等待事件通知 //所以调用park函数来停止线程执行 LockSupport.park(this); //判断是否被中断,线程从park函数返回有两种情况,一种是 //其他线程调用了unpark,另外一种是线程被中断 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //代码执行到这里,已经有其他线程调用notify函数,或则被中断,该线程可以继续执行,但是必须先 //再次获得调用await函数时的锁.acquireQueued函数在AQS文章中做了介绍. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; .... } final int fullyRelease(Node node) { //AQS的方法,当前已经在锁中了,所以直接操作 boolean failed = true; try { int savedState = getState(); //获取state当前的值,然后保存,以待以后恢复 // release函数是AQS的函数,不清楚的同学请看开头介绍的文章。 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } private int checkInterruptWhileWaiting(Node node) { //中断可能发生在两个阶段中,一是在等待signa时,另外一个是在获得signal之后 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { //这里要和下边的transferForSignal对应着看,这是线程中断进入的逻辑.那边是signal的逻辑 //两边可能有并发冲突,但是成功的一方必须调用enq来进入acquire lock queue中. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } //如果失败了,说明transferForSignal那边成功了,等待node 进入acquire lock queue while (!isOnSyncQueue(node)) Thread.yield(); return false; }signal操作
signal函数将condition wait queue队列中队首的线程节点转移等待获取锁的sync queue队列中。这样的话,wait函数中调用isOnSyncQueue函数就会返回true,导致wait函数进入最后一步重新获取锁的状态。
我们这里来详细解析一下condition wait queue和sync queue两个队列的设计原理。condition wait queue是等待消息的队列,因为阻塞队列为空而进入阻塞状态的take函数操作就是在等待阻塞队列不为空的消息。而sync queue队列则是等待获取锁的队列,take函数获得了消息,就可以运行了,但是它还必须等待获取锁之后才能真正进行运行状态。
signal函数的示意图如下所示。
signal函数其实就做了一件事情,就是不断尝试调用transferForSignal 函数,将condition wait queue队首的一个节点转移到sync queue队列中,直到转移成功。因为一次转移成功,就代表这个消息被成功通知到了等待消息的节点。
public final void signal() { if (!isHeldExclusively()) //如果当前线程没有获得锁,抛出异常 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //将Condition wait queue中的第一个node转移到acquire lock queue中. doSignal(first); } private void doSignal(Node first) { do { //由于生产者的signal在有消费者等待的情况下,必须要通知 //一个消费者,所以这里有一个循环,直到队列为空 //把first 这个node从condition queue中删除掉 //condition queue的头指针指向node的后继节点,如果node后续节点为null,那么也将尾指针也置为null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); //transferForSignal将node转而添加到Sync的acquire lock 队列 } final boolean transferForSignal(Node node) { //如果设置失败,说明该node已经被取消了,所以返回false,让doSignal继续向下通知其他未被取消的node if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //将node添加到acquire lock queue中. Node p = enq(node); int ws = p.waitStatus; //需要注意的是这里的node进行了转化 //ws>0代表canceled的含义所以直接unpark线程 //如果compareAndSetWaitStatus失败,所以直接unpark,让线程继续执行await中的 //进行isOnSyncQueue判断的while循环,然后进入acquireQueue函数. //这里失败的原因可能是Lock其他线程释放掉了锁,同步设置p的waitStatus //如果compareAndSetWaitStatus成功了呢?那么该node就一直在acquire lock queue中 //等待锁被释放掉再次抢夺锁,然后再unpark if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }后记
后边一篇文章主要讲解如何自己使用AQS来创建符合自己业务需求的锁,请大家继续关注我的文章啦.一起进步偶。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73031.html
摘要:线程通信传统的线程通信方法概述方法导致当前线程等待,直到其他线程调用该同步监视器的方法或方法来唤醒该线程。运行结果如下线程组和未处理的异常表示线程组,可以对一批线程进行分类管理。对线程组的控制相当于同时控制这批线程。 线程通信 传统的线程通信 方法概述: wait方法:导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。 w...
摘要:如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。 前言 本文直接参考 Doug Lea 写的 Java doc 和注释,这也是我们在学习 java 并发包时最好的材料了。希望大家能有所思、有所悟,学习 Doug Lea 的代码风格,并将其优雅...
摘要:创建一个阻塞队列生产者生产,目前总共有消费者消费,目前总共有原文链接更多教程 原文链接 更多教程 本文概要 生产者和消费者问题是线程模型中老生常谈的问题,也是面试中经常遇到的问题。光在Java中的实现方式多达数十种,更不用说加上其他语言的实现方式了。那么我们该如何学习呢? 本文会通过精讲wait()和notify()方法实现生产者-消费者模型,来学习生产者和消费者问题的原理。 目的...
摘要:一和并发包中的和主要解决的是线程的互斥和同步问题,这两者的配合使用,相当于的使用。写锁与读锁之间互斥,一个线程在写时,不允许读操作。的注意事项不支持重入,即不可反复获取同一把锁。没有返回值,也就是说无法获取执行结果。 一、Lock 和 Condition Java 并发包中的 Lock 和 Condition 主要解决的是线程的互斥和同步问题,这两者的配合使用,相当于 synchron...
阅读 3314·2021-11-10 11:36
阅读 3198·2021-10-08 10:21
阅读 2810·2021-09-29 09:35
阅读 2354·2021-09-22 16:06
阅读 3870·2021-09-09 09:33
阅读 1276·2019-08-30 15:44
阅读 3133·2019-08-30 10:59
阅读 2942·2019-08-29 15:32