资讯专栏INFORMATION COLUMN

BlockingQueue学习

xuhong / 2426人阅读

摘要:引言在包中,很好的解决了在多线程中,如何高效安全传输数据的问题。同时,也用于自带线程池的缓冲队列中,了解也有助于理解线程池的工作模型。

引言

java.util.Concurrent包中,BlockingQueue很好的解决了在多线程中,如何高效安全“传输”数据的问题。通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利。同时,BlockingQueue也用于java自带线程池的缓冲队列中,了解BlockingQueue也有助于理解线程池的工作模型。

一 BlockingQueue接口

该接口属于队列,所以继承了Queue接口,该接口最重要的五个方法分别是offer方法poll方法put方法take方法drainTo方法

offer方法和poll方法分别有一个静态重载方法,分别是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意义是在限定时间内存入或取出对象,如果不能存入取出则返回false。

put方法会在当队列存储对象达到限定值时阻塞线程,而在队列不为空时唤醒被take方法所阻塞的线程。take方法是相反的。

drainTo方法可批量获取队列中的元素。

二 常见的BlockingQueue实现 一 LinkedBlockingQueue

LinkedBlockingQueue是比较常见的BlockingQueue的实现,他是基于链表的阻塞队列。在创建该对象时如果不指定可存储对象个数大小时,默认为Integer.MAX_VALUE。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

LinkedBlockingQueue内部使用了独立的两把锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

put方法和offer方法:

public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
 
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
}

这两个方法的区别是put方法在容量达到上限时会阻塞,而offer方法则会直接返回false。

二 ArrayBlockingQueue

ArrayBlockingQueue基于数组的阻塞队列,除了有一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。

三 SynchronousQueue

是一种没有缓冲的阻塞队列,在生产者put的同时必须要有一个消费者进行take,否则就会阻塞。声明一个SynchronousQueue有两种不同的方式。公平模式和非公平模式的区别:如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

四 PriorityBlockingQueue和DelayQueue

PriorityBlockingQueue是基于优先级的阻塞队列,该队列不会阻塞生产者,只会阻塞消费者。

DelayQueue队列存储的对象只有指定的延迟时间到了才能被取出,该队列也不会阻塞生产者。

三 BlockingQueue的使用

在处理多线程生产者消费者问题时的演示代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
 
/**
 * Created by gavin on 15-8-30.
 */
public class BlockingQueueTest {
 
    public static void main(String[] args)
    {
        BlockingQueue queue = new ArrayBlockingQueue(1000);
        Thread p1 = new Thread(new Producer(queue),"producer1");
        Thread p2 = new Thread(new Producer(queue),"producer2");
        Thread c1 = new Thread(new Consumer(queue),"consumer1");
        Thread c2 = new Thread(new Consumer(queue),"consumer2");
 
        p1.start();
        p2.start();
        c1.start();
        c2.start();
    }
}
 
 
 
class Producer implements Runnable{
    private BlockingQueue queue;
 
    public Producer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                queue.put(Thread.currentThread().getName()+" product "+i);
            } catch (InterruptedException e) {
                System.err.println(Thread.currentThread().getName() + " error");
            }
            i++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
 
            }
        }
    }
}
class Consumer implements Runnable{
    private BlockingQueue queue;
 
    public Consumer(BlockingQueue queue) {
        this.queue = queue;
    }
 
    public void run() {
        int i = 0;
        while (!Thread.currentThread().isInterrupted())
        {
            try {
                String str = queue.take();
                System.out.println(str);
            } catch (InterruptedException e) {
 
            }
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
 
            }
        }
    }
}
四 总结

BlockingQueue在并发编程中扮演着重要的角色,既可以自己用来解决生产者消费者问题,也用于java自带线程池的缓冲队列。

参考:
BlockingQueue

更多文章:http://blog.gavinzh.com

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/64465.html

相关文章

  • 解读 Java 并发队列 BlockingQueue

    摘要:如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。 前言 本文直接参考 Doug Lea 写的 Java doc 和注释,这也是我们在学习 java 并发包时最好的材料了。希望大家能有所思、有所悟,学习 Doug Lea 的代码风格,并将其优雅...

    maochunguang 评论0 收藏0
  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • 线程池 execute() 的工作逻辑

    摘要:与最大线程池比较。如果加入成功,需要二次检查线程池的状态如果线程池没有处于,则从移除任务,启动拒绝策略。如果线程池处于状态,则检查工作线程是否为。线程池将如何工作这个问题应该就不难回答了。 原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/ 最近在看《Java并发编程的艺术》回顾线程池的原理和参数的时候发现一个问题,...

    waruqi 评论0 收藏0
  • Java多线程进阶(三一)—— J.U.C之collections框架:BlockingQueue

    摘要:和方法会一直阻塞调用线程,直到线程被中断或队列状态可用和方法会限时阻塞调用线程,直到超时或线程被中断或队列状态可用。 showImg(https://segmentfault.com/img/bVbgyPy?w=1191&h=670); 本文首发于一世流云专栏:https://segmentfault.com/blog... 一、引言 从本节开始,我们将介绍juc-collectio...

    entner 评论0 收藏0
  • 猫头鹰的深夜翻译:BlockingQueue和持续管理

    摘要:我们将使用单个线程管理任务放入队列的操作以及从队列中取出的操作。同时这个线程会持续的管理队列。另一个线程将用来创建,它将一直运行知道服务器终止。此线程永远不会过期,有助于实现持续监控。这些请求将会自动的被获取,并在线程中继续处理。 在Java中,BlockingQueue接口位于java.util.concurrent包下。阻塞队列主要用来线程安全的实现生产者-消费者模型。他们可以使用...

    YanceyOfficial 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<