资讯专栏INFORMATION COLUMN

生产者消费者之Java简单实现

HollisChuang / 1271人阅读

摘要:为了解决这个问题于是引入了生产者和消费者模式。代码实现多生产者和多消费者实现阻塞队列,将生产者和消费者解耦。已经满了等待用使用于多个生产者的情况说明中有元素可以取用使用于多个消费者的情况。

为什么要使用生产者和消费者模式

在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。

在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

什么是生产者消费者模式

生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。

代码实现(多生产者 和 多消费者)

QueueBuffer : 实现阻塞队列,将生产者和消费者解耦。它底层是一个数组,构造的时候指定数组的大小。由于实现的时多生产这和多消费者的模型,所以注意一下 put 和 get 中对阻塞条件的描述用的是while循环,这是为了生产者之间或者消费者之间他们的内部竞争所造成的数组越界异常。

package concurrency;

public class QueueBuffer {
    private final int SIZE;
    private int count = 0;
    private int[] buffer;
    public QueueBuffer(int size){
        this.SIZE = size;
        buffer = new int[SIZE];
    }

    public int getSIZE(){
        return SIZE;
    }

    public synchronized void put(int value){
        while (count == SIZE){ //buffer已经满了 等待get   ,用while使用于多个生产者的情况
            try {
                wait();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }

        notifyAll(); //说明buffer中有元素 可以取
        buffer[count++] = value;
        System.out.println("Put "+value+" current size = "+count);
    }

    public synchronized int get(){
        while(count == 0){//用while使用于多个消费者的情况。
            try {
                wait();//buffer为空,需要等到put进元素
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
//        notify() 只是去通知其他的线程,但是synchronized 方法里面的代码还是会执行完毕的。
//        synchronized方法本来就加了锁。代码的执行跟你的notify()也无关,代码的执行是跟你的
//        synchronized绑定一起而已。

        notifyAll(); //说明刚刚从buffer中取出了元素 有空位可以加进新的元素
        int result = buffer[--count];
        System.out.println("Get "+result+" current size = "+count);
        return result;
    }
}

class Test{
    public static void main(String[] args){
        QueueBuffer q = new QueueBuffer(10);
        new Producer(q);
        new Producer(q);
        new Producer(q);
        new Consumer(q);
        new Consumer(q);
        new Consumer(q);
        System.out.println("Press Control-C to stop.");
    }
}

Producer

package concurrency;

import java.util.Random;

public class Producer implements Runnable {

    Random rand = new Random(47);

    private QueueBuffer q;

    Producer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Producer").start();
    }

    public void run() {
        while (true) {
            q.put(rand.nextInt(q.getSIZE()));
            Thread.yield();
        }
    }
}

Consumer

package concurrency;

public class Consumer implements Runnable {
    private QueueBuffer q;

    Consumer(QueueBuffer q) {
        this.q = q;
        new Thread(this, "Consumer").start();
    }

    public void run() {
        while (true){
            q.get();
            Thread.yield();
        }
    }
}

注意事项

调用obj的wait(), notify()方法前,必须获得obj对象的锁,也就是必须写在synchronized(obj) {…} 代码段内。

调用obj.wait()后,线程A就释放了obj的锁,否则线程B无法获得obj锁,也就无法在synchronized(obj) {…} 代码段内唤醒A。

当obj.wait()方法返回后,线程A需要再次获得obj锁,才能继续执行。

如果A1,A2,A3都在obj.wait(),则B调用obj.notify()只能唤醒A1,A2,A3中的一个(具体哪一个由JVM决定)。

obj.notifyAll()则能全部唤醒A1,A2,A3,但是要继续执行obj.wait()的下一条语句,必须获得obj锁,因此,A1,A2,A3只有一个有机会获得锁继续执行,例如A1,其余的需要等待A1释放obj锁之后才能继续执行。

当B调用obj.notify/notifyAll的时候,B正持有obj锁,因此,A1,A2,A3虽被唤醒,但是仍无法获得obj锁。直到B退出synchronized块,释放obj锁后,A1,A2,A3中的一个才有机会获得锁继续执行。这一点很重要,并不是调用 notify 或者 notifyAll 之后马上释放锁,而是执行完相应的synchronized代码段。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/70641.html

相关文章

  • java高并发从零到放弃(五)

    摘要:前言这篇主要来讲解多线程中一个非常经典的设计模式包括它的基础到拓展希望大家能够有所收获生产者消费者模式简述此设计模式中主要分两类线程生产者线程和消费者线程生产者提供数据和任务消费者处理数据和任务该模式的核心就是数据和任务的交互点共享内存缓 前言 这篇主要来讲解多线程中一个非常经典的设计模式包括它的基础到拓展希望大家能够有所收获 生产者-消费者模式简述 此设计模式中主要分两类线程:生产者...

    meislzhua 评论0 收藏0
  • 分布式服务框架远程通讯技术及原理分析

    摘要:微软的虽然引入了事件机制,可以在队列收到消息时触发事件,通知订阅者。由微软作为主要贡献者的,则对以及做了进一层包装,并能够很好地实现这一模式。 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基...

    sorra 评论0 收藏0
  • 分布式服务框架远程通讯技术及原理分析

    摘要:微软的虽然引入了事件机制,可以在队列收到消息时触发事件,通知订阅者。由微软作为主要贡献者的,则对以及做了进一层包装,并能够很好地实现这一模式。 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在Java领域中有很多可实现远程通讯的技术,例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,这些名词之间到底是些什么关系呢,它们背后到底是基...

    0xE7A38A 评论0 收藏0
  • javaBlockingQueue实现产者费者

    摘要:直接上代码注意在使用实现生产者消费者模型时候,泛型使用若等对象时候会发现消费者出现异常,这是由于传值和传引用的区别,而由于的自动装箱不会出现此类问题,具体可自行尝试生产者消费者获取失败主线程 直接上代码注意在使用blockingqueue实现生产者消费者模型时候,BlockingQueue泛型使用若atomic等对象时候会发现消费者出现异常,这是由于传值和传引用的区别,而Integer...

    李义 评论0 收藏0
  • Java并发编程线程间通讯(下)-产者费者

    摘要:前文回顾上一篇文章重点唠叨了中协调线程间通信的机制,它有力的保证了线程间通信的安全性以及便利性。所以同一时刻厨师线程和服务员线程不会同时在等待队列中。对于在操作系统中线程的阻塞状态,语言中用和这三个状态分别表示。 前文回顾 上一篇文章重点唠叨了java中协调线程间通信的wait/notify机制,它有力的保证了线程间通信的安全性以及便利性。本篇将介绍wait/notify机制的一个应用...

    lufficc 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<