资讯专栏INFORMATION COLUMN

实现Java 阻塞队列

xiaoxiaozi / 490人阅读

摘要:实现阻塞队列在自己实现之前先搞清楚阻塞队列的几个特点基本队列特性先进先出。消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。

实现Java 阻塞队列
在自己实现之前先搞清楚阻塞队列的几个特点:
基本队列特性:先进先出。
写入队列空间不可用时会阻塞。
获取队列数据时当队列为空时将阻塞。

实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。

这里的 ArrayBlockingQueue 看名字很明显是由数组实现。

我们先根据它这三个特性尝试自己实现试试。

初始化队列

我这里自定义了一个类:ArrayBlockQueue,它的构造函数如下:

//队列参数
    public int size;

    private volatile Object[] items;

    private volatile int pollPoint = 0;

    private volatile int addPoint = 0;

    private volatile int count = 0;

//初始化队列容量
    public ArrayBlockQueue(int size) {
        this.size = size;
        this.items = new Object[size];
    }
写入操作

有几个需要注意的点:

队列满的时候,写入的线程需要被阻塞。

写入过队列的数量大于队列大小时需要从第一个下标开始写。

先看第一个队列满的时候,写入的线程需要被阻塞,先来考虑下如何才能使一个线程被阻塞,看起来的表象线程卡住啥事也做不了。

其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信。
所以我这里的做法是,一旦队列满时就将写入线程调用 addLock.wait() 进入 waiting 状态,直到空间可用时再进行唤醒。
初始化 两个锁
//初始化两个锁
    private Object addLock = new Object();

    private Object pollLock = new Object();
写入操作代码
public void add(Object item) {

        synchronized (addLock) {
            //队列满则阻塞
            while (count >= size) {
                try {
                    addLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            items[addPoint] = item;
            addPoint = (addPoint + 1) % size;
            count++;

            //当队列从0个元素添加到1个元素,则说明从空状态转非空状态可以通知一次取元素线程
            if (count == 1)
                synchronized (pollLock) {
                    pollLock.notifyAll();
                }
        }
    }

所以这里声明了两个对象用于队列满、空情况下的互相通知作用。

在写入数据成功后需要使用 pollLock.notifyAll(),这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。

这里的 wait 和 notify 操作都需要对各自的对象使用 synchronized 方法块,这是因为 wait 和 notifyAll 都需要获取到各自的锁。
消费队列

上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。
代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的。
总的来说就是:

写入队列满时会阻塞直到获取线程消费了队列数据后唤醒写入线程。

消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程。

public Object poll() {

        synchronized (pollLock) {
            //队列空则阻塞
            while (count == 0) {
                try {
                    pollLock.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            Object value = items[pollPoint];
            pollPoint = (pollPoint + 1) % size;
            count--;

            //当队列从满到非满,可以通知一次增添元素的线程
            if (count == (size - 1))
                synchronized (addLock) {
                    addLock.notifyAll();
                }
            return value;
        }
    }
测试

每一秒出队1个

//消费者
//每一秒出队1个
class Counsum extends Thread {

    private ArrayBlockQueue queue;

    public Counsum(ArrayBlockQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        while (true) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " 出队一个数据:   " + queue.poll());
        }
    }
}

每5秒入队2个,可见生产明显慢于消费

//生产者
//每5秒入队2个
class Product extends Thread {

    private ArrayBlockQueue queue;

    public Product(ArrayBlockQueue queue) {
        this.queue = queue;
    }

    @Override
    public void run() {

        int i = 0;
        while (true) {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            for (int j = 0; j < 2; j++, i++) {

                queue.add(i);
                System.out.println(Thread.currentThread().getName() + "【入队一个数据:   " + i+"】");
            }
        }
    }
}
main
public class Test extends Object {

    public static void main(String[] args) throws CloneNotSupportedException {
        
        ArrayBlockQueue queue = new ArrayBlockQueue(5);
        
        Thread counsum = new Counsum(queue);
        Thread counsum1 = new Counsum(queue);
        Thread counsum2 = new Counsum(queue);

        Thread product = new Product(queue);
        Thread product1 = new Product(queue);
        Thread product2 = new Product(queue);
        counsum.start();
        product.start();
        counsum1.start();
        product1.start();
        counsum2.start();
        product2.start();
    }
}
//output
消费者线程-1出队一个数据:   0
生成者线程-3入队一个数据:    0
消费者线程-3出队一个数据:   0
消费者线程-2出队一个数据:   0
生成者线程-2入队一个数据:    0
生成者线程-1入队一个数据:    0
生成者线程-2入队一个数据:    1
生成者线程-3入队一个数据:    1
生成者线程-1入队一个数据:    1
消费者线程-3出队一个数据:   1
消费者线程-1出队一个数据:   1
消费者线程-2出队一个数据:   1
消费者线程-2出队一个数据:   2
生成者线程-2入队一个数据:    2
消费者线程-1出队一个数据:   2
生成者线程-3入队一个数据:    2
生成者线程-1入队一个数据:    2
消费者线程-3出队一个数据:   2
生成者线程-1入队一个数据:    3
生成者线程-3入队一个数据:    3
生成者线程-2入队一个数据:    3
消费者线程-1出队一个数据:   3
消费者线程-3出队一个数据:   3
消费者线程-2出队一个数据:   3

引用

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75935.html

相关文章

  • 什么是阻塞队列?如何使用阻塞队列实现生产者-消费者模型?

    摘要:什么是阻塞队列阻塞队列是一个在队列基础上又支持了两个附加操作的队列。阻塞队列的应用场景阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。由链表结构组成的无界阻塞队列。 什么是阻塞队列? 阻塞队列是一个在队列基础上又支持了两个附加操作的队列。 2个附加操作: 支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。 支持阻塞的...

    jemygraw 评论0 收藏0
  • java 队列

    摘要:是基于链接节点的线程安全的队列。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。队列内部仅允许容纳一个元素。该队列的头部是延迟期满后保存时间最长的元素。 队列简述 Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构Queue接口与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。...

    goji 评论0 收藏0
  • Java知识点总结(Java容器-Queue)

    摘要:知识点总结容器知识点总结容器接口与是在同一级别,都是继承了接口。另一种队列则是双端队列,支持在头尾两端插入和移除元素,主要包括。一个由链表结构组成的无界阻塞队列。是一个阻塞的线程安全的队列,底层实现也是使用链式结构。 Java知识点总结(Java容器-Queue) @(Java知识点总结)[Java, Java容器] Queue Queue接口与List、Set是在同一级别,都是继承了...

    hedzr 评论0 收藏0
  • Java阻塞队列实现

    摘要:尽管中已经包含了阻塞队列的官方实现,但是熟悉其背后的原理还是很有帮助的。阻塞队列的实现阻塞队列的实现类似于带上限的的实现。下面是阻塞队列的一个简单实现必须注意到,在和方法内部,只有队列的大小等于上限或者下限时,才调用方法。 阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,或者当队列是满时,往队列里添加元素的操作会被阻塞。试图从空的阻塞队列中获取元素的线程...

    付伦 评论0 收藏0
  • Java 队列

    摘要:队列中有元素时,就说明有过期了,线程继续执行,然后元素出队,根据相应的移除缓存。所以严格来说,虽然实现了队列接口,但是它的目的却并不是队列,而是将生产者消费者线程配对。转移队列链式转移队列。 引言 本周在编写短信验证码频率限制切面的时候,经潘老师给的实现思路,使用队列进行实现。 看了看java.util包下的Queue接口,发现还从来没用过呢! Collection集合类接口,由它派生...

    Pocher 评论0 收藏0
  • Java并发编程笔记(一)

    摘要:并发编程实战水平很高,然而并不是本好书。一是多线程的控制,二是并发同步的管理。最后,使用和来关闭线程池,停止其中的线程。当线程调用或等阻塞时,对这个线程调用会使线程醒来,并受到,且线程的中断标记被设置。 《Java并发编程实战》水平很高,然而并不是本好书。组织混乱、长篇大论、难以消化,中文翻译也较死板。这里是一篇批评此书的帖子,很是贴切。俗话说:看到有这么多人骂你,我就放心了。 然而知...

    cnsworder 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<