摘要:和方法会一直阻塞调用线程,直到线程被中断或队列状态可用和方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。
本文首发于一世流云专栏:https://segmentfault.com/blog...一、引言
从本节开始,我们将介绍juc-collections框架中的“阻塞队列”部分。阻塞队列在实际应用中非常广泛,许多消息中间件中定义的队列,通常就是一种“阻塞队列”。
那么“阻塞队列”和我们之前讨论过的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什么不同呢?
ConcurrentLinkedQueue和ConcurrentLinkedDeque是以非阻塞算法实现的高性能队列,其使用场景一般是在并发环境下,需要“队列”/“栈”这类数据结构时才会使用;而“阻塞队列”通常利用了“锁”来实现,也就是会阻塞调用线程,其使用场景一般是在“生产者-消费者”模式中,用于线程之间的数据交换或系统解耦。
在Java多线程基础(七)——Producer-Consumer模式中,我们曾简要的谈到过“生产者-消费者”这种模式。在这种模式中,“生产者”和“消费者”是相互独立的,两者之间的通信需要依靠一个队列。这个队列,其实就是本文中的“阻塞队列”。
引入“阻塞队列”的最大好处就是解耦,在软件工程中,“高内聚,低耦合”是进行模块设计的准则之一,这样“生产者”和“消费者”其实是互不影响的,将来任意一方需要升级时,可以保证系统的平滑过渡。
二、BlockingQueue简介BlockingQueue是在JDK1.5时,随着J.U.C引入的一个接口:
BlockingQueue继承了Queue接口,提供了一些阻塞方法,主要作用如下:
当线程向队列中插入元素时,如果队列已满,则阻塞线程,直到队列有空闲位置(非满);
当线程从队列中取元素(删除队列元素)时,如果队列未空,则阻塞线程,直到队列有元素;
既然BlockingQueue是一种队列,所以也具备队列的三种基本方法:插入、删除、读取:
操作类型 | 抛出异常 | 返回特殊值 | 阻塞线程 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
删除 | remove() | poll() | take() | poll(time, unit) |
读取 | element() | peek() | / | / |
可以看到,对于每种基本方法,“抛出异常”和“返回特殊值”的方法定义和Queue是完全一样的。BlockingQueue只是增加了两类和阻塞相关的方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。
put(e)和take()方法会一直阻塞调用线程,直到线程被中断或队列状态可用;
offer(e, time, unit)和poll(time, unit)方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。
public interface BlockingQueueextends Queue { /** * 插入元素e至队尾, 如果队列已满, 则阻塞调用线程直到队列有空闲空间. */ void put(E e) throws InterruptedException; /** * 插入元素e至队列, 如果队列已满, 则限时阻塞调用线程,直到队列有空闲空间或超时. */ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 从队首删除元素,如果队列为空, 则阻塞调用线程直到队列中有元素. */ E take() throws InterruptedException; /** * 从队首删除元素,如果队列为空, 则限时阻塞调用线程,直到队列中有元素或超时. */ E poll(long timeout, TimeUnit unit) throws InterruptedException; // ... }
除此之外,BlockingQueue还具有以下特点:
BlockingQueue队列中不能包含null元素;
BlockingQueue接口的实现类都必须是线程安全的,实现类一般通过“锁”保证线程安全;
BlockingQueue 可以是限定容量的。remainingCapacity()方法用于返回剩余可用容量,对于没有容量限制的BlockingQueue实现,该方法总是返回Integer.MAX_VALUE 。
三、再谈“生产者-消费者”模式最后,我们来看下如何利用BlockingQueue来实现生产者-消费者模式。在生产者-消费者模式中,一共有四类角色:生产者、消费者、消息队列、消息体。我们利用BlockingQueue来实现消息队列,其余部分没有什么变化。
Producer(生产者)生产者生产消息体(Data),并将消息体(Data)传递给通道(Channel)。
/** * 生产者 */ public class Producer implements Runnable { private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { String v = String.valueOf(ThreadLocalRandom.current().nextInt()); Data data = new Data(v); try { channel.put(data); System.out.println(Thread.currentThread().getName() + " produce :" + data); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Consumer(消费者)
消费者从通道(Channel)中获取数据,进行处理。
/** * 消费者 */ public class Consumer implements Runnable { private final Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { try { Object obj = channel.take(); System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Channel(通道)
相当于消息的队列,对消息进行排队,控制消息的传输。
/** * 通道类 */ public class Channel { private final BlockingQueue blockingQueue; public Channel(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public Object take() throws InterruptedException { return blockingQueue.take(); } public void put(Object o) throws InterruptedException { blockingQueue.put(o); } }Data(消息体/数据)
Data代表了实际生产或消费的数据。
/** * 数据/消息 */ public class Dataimplements Serializable { private T data; public Data(T data) { this.data = data; } public T getData() { return data; } public void setData(T data) { this.data = data; } @Override public String toString() { return "Data{" + "data=" + data + "}"; } }
调用如下:
public class Main { public static void main(String[] args) { BlockingQueue blockingQueue = new SomeQueueImplementation(); Channel channel = new Channel(blockingQueue); Producer p = new Producer(channel); Consumer c1 = new Consumer(channel); Consumer c2 = new Consumer(channel); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77009.html
摘要:接口截止目前为止,我们介绍的阻塞队列都是实现了接口。该类在构造时一般需要指定容量,如果不指定,则最大容量为。另外,由于内部通过来保证线程安全,所以的整体实现时比较简单的。另外,双端队列相比普通队列,主要是多了队尾出队元素队首入队元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首发于一世流云专栏:ht...
摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...
摘要:在章节中,我们说过,维护了一把全局锁,无论是出队还是入队,都共用这把锁,这就导致任一时间点只有一个线程能够执行。入队锁对应的是条件队列,出队锁对应的是条件队列,所以每入队一个元素,应当立即去唤醒可能阻塞的其它入队线程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首发于一世流云专栏:https://seg...
摘要:在队尾插入指定元素,如果队列已满,则阻塞线程加锁队列已满。这里必须用,防止虚假唤醒在队列上等待之所以这样做,是防止线程被意外唤醒,不经再次判断就直接调用方法。 showImg(https://segmentfault.com/img/bVbgCD0?w=768&h=512); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、ArrayBl...
摘要:初始状态对应二叉树结构将顶点与最后一个结点调换即将顶点与最后一个结点交换,然后将索引为止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、PriorityBlockingQueue简介 PriorityBlockin...
阅读 3275·2021-11-24 09:39
阅读 2784·2021-10-12 10:20
阅读 1858·2019-08-30 15:53
阅读 3056·2019-08-30 14:14
阅读 2580·2019-08-29 15:36
阅读 1091·2019-08-29 14:11
阅读 1923·2019-08-26 13:51
阅读 3386·2019-08-26 13:23