资讯专栏INFORMATION COLUMN

Java中的队列

hankkin / 1078人阅读

摘要:当队列满时,存储元素的线程会等待队列可用。分别是一个由数组结构组成的有界阻塞队列。一个支持优先级排序的无界阻塞队列。此队列的默认和最大长度为。此队列按照先进先出的原则对元素进行排序。

最近在看数据结构的时候,看到了队列这里,在实际的开发中我们很少会手动的去实现一个队列,甚至很少直接用到队列,但是在Java的包中有一些具有特殊属性的队列应用的比较广泛,例如:阻塞队列&并发队列.

阻塞队列

阻塞队列(BlockingQueue)是一个额外支持两种操作的队列。这两个附加的操作是:
1、在队列为空时,获取元素的线程会等待队列变为非空。
2、当队列满时,存储元素的线程会等待队列可用。
阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

阻塞队列提供了四种处理方法:

抛出异常

add(e):在添加元素的时候如果队列已满,那么直接抛出异常。
remove(e):移除元素,如果队列为空,那么抛出异常。
element():检查方法。

public static void test() {
   ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
   for (int i=0; i

返回特殊值

1、offer(e) 入队的时候返回特殊值,在不同的阻塞队列中实现有一定的差别
2、poll() 出队的时候返回特殊的值
3、peek() 测试出队能否成功

一直阻塞

1、put(e) 如果队列已满,那么会一直阻塞,直到成功
2、take()  如果队列为空,那么出队会一直阻塞,直到成功

阻塞,超时退出

1、offer(e,time,unit)
2、poll(time,unit)

Java中的阻塞队列
JDK7提供了7个阻塞队列。分别是

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

DelayQueue:一个使用优先级队列实现的无界阻塞队列。

SynchronousQueue:一个不存储元素的阻塞队列。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

下面具体看一下每一种阻塞队列的实现方式以及使用场景:

1. ArrayBlockingQueue
特性:用数组实现的实现的有界阻塞队列,默认情况下不保证线程公平的访问队列(按照阻塞的先后顺序访问队列),队列可用的时候,阻塞的线程都可以争夺队列的访问资格,当然也可以使用以下的构造方法创建一个公平的阻塞队列。
ArrayBlockingQueue blockingQueue2 = new ArrayBlockingQueue<>(10, true);
下面通过源码探究以下,这个阻塞队列是如何实现的?如果实现公平与非公平的控制。

构造过程

public ArrayBlockingQueue(int capacity) {
    this(capacity, false);
}

public ArrayBlockingQueue(int capacity, boolean fair) {
    if (capacity <= 0)
        throw new IllegalArgumentException();
    //基于数组实现
    this.items = new Object[capacity];
    /**公平与非公平是通过可重入锁来实现的*/
    lock = new ReentrantLock(fair);
    notEmpty = lock.newCondition();
    notFull =  lock.newCondition();
}
/**阻塞队列的公平与非公平是通过可重入锁来实现的,关于为什么可重入锁可以实现线程访问的公平非公平特性,我们晚一点分析一下ReentrantLock的实现原理。

【关于ReentrantLock的实现原理】https://segmentfault.com/a/11...

add(E) 操作,如果add失败,那么抛出异常

public boolean add(E e) {
    return super.add(e);
}
/**AbstractQueue 父类的add方法*/
public boolean add(E e) {
    if (offer(e))
        return true;
    else
        throw new IllegalStateException("Queue full");
}
/**通过多态调用自己的offer(e)实现*/
public boolean offer(E e) {
    checkNotNull(e);
    //加锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        //如果队列满了,那么返回false
        if (count == items.length)
            return false;
        else {
            //入队
            enqueue(e);
            return true;
        }
    } finally {
        lock.unlock();
    }
}
private void enqueue(E var1) {
    Object[] var2 = this.items;
    //putIndex可以认为是队列的队尾后的一个位置,数据入队对应的位置,如果队列满了,那么putIndex设置为0
    var2[this.putIndex] = var1;
    if (++this.putIndex == var2.length) {
        this.putIndex = 0;
    }

    ++this.count;
    //唤醒一个等待在condition上的线程
    this.notEmpty.signal();
}

put(E e) put操作,如果队列已满,那么会一直阻塞,直到put成功

public void put(E var1) throws InterruptedException {
    checkNotNull(var1);
    ReentrantLock var2 = this.lock;
    //加锁,可被线程中断返回
    var2.lockInterruptibly();
    try {
        //如果队列已经满了,那么阻塞
        while(this.count == this.items.length) {
            this.notFull.await();
        }
        //进行入队操作
        this.enqueue(var1);
    } finally {
        var2.unlock();
    }
}
/**
 * 在队列满的情况put操作被阻塞,那么什么时候该操作可以被唤醒呢?很显然是队列中出现空地的情况下,才会被唤醒在notFull这种条件下 
 * 阻塞的操作:
 * 所以在发生以下操作的时候,会被唤醒进行入队的操作
 * 1、dequeue()操作 2、removeAt(int var1)操作 3、clear() 4、drainTo
 */

take() 出队操作,如果队列为空,那么阻塞,直到队列中包含对应元素唤醒

/**实现比较容易,和上面的操作异曲同工*/
public E take() throws InterruptedException {
    ReentrantLock var1 = this.lock;
    var1.lockInterruptibly();

    Object var2;
    try {
        while(this.count == 0) {
            this.notEmpty.await();
        }

        var2 = this.dequeue();
    } finally {
        var1.unlock();
    }

    return var2;
}
个人总结:实现阻塞操作和核心在于线程挂起以及线程的唤醒,在Java中提供了两种线程等待以及线程唤醒的方式。一是基于对象监视器的wait(),notify()方法。 二是通过Condition.await()和signal()方法。
2. LinkedBlockingQueue
基于链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。这个队列的实现原理和ArrayBlockingQueue实现基本相同。可以看一下队列的定义:

队列的定义

/**默认的构造函数*/
public LinkedBlockingQueue() {
    this(2147483647);
}

public LinkedBlockingQueue(int var1) {
    this.count = new AtomicInteger();
    this.takeLock = new ReentrantLock();
    this.notEmpty = this.takeLock.newCondition();
    this.putLock = new ReentrantLock();
    this.notFull = this.putLock.newCondition();
    if (var1 <= 0) {
        throw new IllegalArgumentException();
    } else {
        this.capacity = var1;
        //链表的头结点和尾节点,默认是空
        this.last = this.head = new LinkedBlockingQueue.Node((Object)null);
    }
}
3、PriorityBlockingQueue
一个支持优先级的无界队列。默认情况下元素采取自然顺序排列,也可以通过比较器comparator来指定元素的排序规则。元素按照升序排列。具体是如何实现的?

队列的定义以及构造方法

/**定义和通常的阻塞队列相同,AbstractQueue中定义了队列的基本操作,BlockingQueue中定义可阻塞队列的相关操作定义*/
public class PriorityBlockingQueue extends AbstractQueue implements BlockingQueue
/**构造方法,默认的无参构造方法,调用的是另一个构造方法,默认定义了一个队列的容量,那为什么说他是无界队列呢?接着向下*/
public PriorityBlockingQueue() {
    this(DEFAULT_INITIAL_CAPACITY, null);
}
/**所有的构造方法最后调用的构造方法, comparator是一个比较器,通过比较器可以确定队列中元素的排列顺序*/
public PriorityBlockingQueue(int initialCapacity, Comparator comparator) {
    if (initialCapacity < 1)
        throw new IllegalArgumentException();
    this.lock = new ReentrantLock();
    this.notEmpty = lock.newCondition();
    this.comparator = comparator;
    /**队列是基于数组实现的*/
    this.queue = new Object[initialCapacity];
}

add 操作以及offer操作

public boolean add(E e) {
    return offer(e);
}
/**
 * offer操作
 */
public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    //如果队列已满,那么尝试进行扩容(个人感觉这里使用 >= 并不是很合理)
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
    try {
        Comparator cmp = comparator;
        if (cmp == null)
            //使用默认的比较方法将e放到队列中
            siftUpComparable(n, e, array);
        else
            //使用指定的比较顺序将数据插入到队列中
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        //激活一个在notEmpty这个condition上等待的线程
        notEmpty.signal();
    } finally {
        lock.unlock();
    }
    return true;
}
/**tryGrow()实现*/
private void tryGrow(Object[] array, int oldCap) {
    //这里先释放了锁,最后需要重新获取锁,那么这个时候所有的add操作都会执行下面的代码段
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
}
并发队列

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74576.html

相关文章

  • [Java并发-6]“管程”-java管程初探

    摘要:语言在之前,提供的唯一的并发原语就是管程,而且之后提供的并发包,也是以管程技术为基础的。但是管程更容易使用,所以选择了管程。线程进入条件变量的等待队列后,是允许其他线程进入管程的。并发编程里两大核心问题互斥和同步,都可以由管程来帮你解决。 并发编程这个技术领域已经发展了半个世纪了。有没有一种核心技术可以很方便地解决我们的并发问题呢?这个问题, 我会选择 Monitor(管程)技术。Ja...

    Steve_Wang_ 评论0 收藏0
  • 想进大厂?50个多线程面试题,你会多少?(一)

    摘要:下面是线程相关的热门面试题,你可以用它来好好准备面试。线程安全问题都是由全局变量及静态变量引起的。持有自旋锁的线程在之前应该释放自旋锁以便其它线程可以获得自旋锁。 最近看到网上流传着,各种面试经验及面试题,往往都是一大堆技术题目贴上去,而没有答案。 不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员...

    wow_worktile 评论0 收藏0
  • Java中的Queue与Deque

    摘要:最小初始化容量。它作为堆栈队列双端队列的操作和的操作是一致的,只是内部的实现不同。根据元素内容查找和删除的效率比较低,为。但是接口有对应的并发实现类类。 Queue接口的实现类 Queue接口作为队列数据结构,java在实现的时候,直接定义了Deque接口(双端队列)来继承Queue接口,并且只实现Deque接口。这样java中的双端队列就囊括了队列、双端队列、堆栈(Deque接口又定...

    zhangrxiang 评论0 收藏0
  • Java 性能调优指南之 Java 集合概览

    摘要:单线程集合本部分将重点介绍非线程安全集合。非线程安全集合框架的最新成员是自起推出的。这是标准的单线程阵营中唯一的有序集合。该功能能有效防止运行时造型。检查个集合之间不存在共同的元素。基于自然排序或找出集合中的最大或最小元素。 【编者按】本文作者为拥有十年金融软件开发经验的 Mikhail Vorontsov,文章主要概览了所有标准 Java 集合类型。文章系国内 ITOM 管理平台 O...

    gnehc 评论0 收藏0
  • Java 集合 Queue

    摘要:除此之外,还有一个接口,代表一个双端队列,双端队列可以同时从两端删除添加元素,因此的实现类既可当成队列使用,也可当成栈使用。相当于栈方法将一个元素进该双端队列所表示的栈的栈顶。 Queue用于模拟队列这种数据结构,队列通常是指先进先出(FIFO)的容器。队列的头部保存在队列中存放时间最长的元素,队列的尾部保存在队列中存放时间最短的元素。新元素插入(offer)到队列的尾部,访问元素(p...

    bang590 评论0 收藏0

发表评论

0条评论

hankkin

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<