资讯专栏INFORMATION COLUMN

实现生产者消费者模式的四种方式(Synchronized、Lock、Semaphore、Blocki

godiscoder / 3044人阅读

摘要:所谓生产者消费者模式,即个线程进行生产,同时个线程进行消费,两种角色通过内存缓冲区进行通信图片来源下面我们通过四种方式,来实现生产者消费者模式。通过生产者调用,减少数目可以消费的数量。

所谓生产者消费者模式,即N个线程进行生产,同时N个线程进行消费,两种角色通过内存缓冲区进行通信

图片来源https://www.cnblogs.com/chent...

下面我们通过四种方式,来实现生产者消费者模式。

首先是最原始的synchronized方式

定义库存类(即图中缓存区)

class Stock {
    private String name;
    // 标记库存是否有内容
    private boolean hasComputer = false;

    public synchronized void putOne(String name) {
        // 若库存中已有内容,则生产线程阻塞等待
        while (hasComputer) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.name = name;
        System.out.println("生产者...生产了 " + name);
        // 更新标记
        this.hasComputer = true;
        // 这里用notify的话,假设p0执行完毕,此时c0,c1都在wait, 同时唤醒另一个provider:p1,
        // p1判断标记后休眠,造成所有线程都wait的局面,即死锁;
        // 因此使用notifyAll解决死锁问题
        this.notifyAll();
    }

    public synchronized void takeOne() {
        // 若库存中没有内容,则消费线程阻塞等待生产完毕后继续
        while (!hasComputer) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        System.out.println("消费者...消费了 " + name);
        this.hasComputer = false;
        this.notifyAll();
    }
}

定义生产者和消费者(为了节省空间和方便阅读,这里将生产者和消费者定义成了匿名内部类)

public static void main(String[] args) {
    // 用于通信的库存类
    Stock computer = new Stock();
    // 定义两个生产者和两个消费者
    Thread p1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.putOne("Dell");
            }
        }
    });
    Thread p2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.putOne("Mac");
            }
        }
    });
    
    Thread c1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.takeOne();
            }
        }
    });
    Thread c2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.takeOne();
            }
        }
    });
    p1.start();
    p2.start();
    c1.start();
    c2.start();
}

运行结果图

第二种方式:Lock

Jdk1.5之后加入了Lock接口,一个lock对象可以有多个Condition类,Condition类负责对lock对象进行wait,notify,notifyall操作

定义库存类

class LockStock {
    final Lock lock = new ReentrantLock();
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    // 加入库存概念,可批量生产和消费
    // 定义最大库存为10
    final String[] stock = new String[10];
    // 写入标记、读取标记、已有商品数量
    int putptr, takeptr, count;

    public void put(String computer) {
        // lock代替synchronized
        lock.lock();
        try {
            // 若库存已满则生产者线程阻塞
            while (count == stock.length)
                notFull.await();
            // 库存中加入商品
            stock[putptr] = computer;
            // 库存已满,指针置零,方便下次重新写入
            if (++putptr == stock.length) putptr = 0;
            ++count;
            System.out.println(computer + " 正在生产数据: -- 库存剩余:" + count);
            notEmpty.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public String take(String consumerName) {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await();
            // 从库存中获取商品
            String computer = stock[takeptr];
            if (++takeptr == stock.length) takeptr = 0;
            --count;
            System.out.println(consumerName + " 正在消费数据:" + computer + " -- 库存剩余:" + count);
            notFull.signal();
            return computer;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        // 无逻辑作用,放慢速度
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "";
    }
}

以上部分代码摘自java7 API中Condition接口的官方示例

接着还是定义生产者和消费者

public static void main(String[] args) {
    LockStock computer = new LockStock();
    Thread p1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.put("Dell");
            }
        }
    });
    Thread p2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.put("Mac");
            }
        }
    });

    Thread c1 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.take("zhangsan");
            }
        }
    });
    Thread c2 = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                computer.take("李四");
            }
        }
    });
    // 两个生产者两个消费者同时运行
    p1.start();
    p2.start();
    c1.start();
    c2.start();
}

运行结果图:

第三种方式:Semaphore
首先依旧是库存类:

class Stock {
    List stock = new LinkedList();
    // 互斥量,控制共享数据的互斥访问
    private Semaphore mutex = new Semaphore(1);

    // canProduceCount可以生产的总数量。 通过生产者调用acquire,减少permit数目
    private Semaphore canProduceCount = new Semaphore(10);

    // canConsumerCount可以消费的数量。通过生产者调用release,增加permit数目
    private Semaphore canConsumerCount = new Semaphore(0);

    public void put(String computer) {
        try {
            // 可生产数量 -1
            canProduceCount.acquire();
            mutex.acquire();
            // 生产一台电脑
            stock.add(computer);
            System.out.println(computer + " 正在生产数据" + " -- 库存剩余:" + stock.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放互斥锁
            mutex.release();
            // 释放canConsumerCount,增加可以消费的数量
            canConsumerCount.release();
        }
        // 无逻辑作用,放慢速度
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void get(String consumerName) {
        try {
            // 可消费数量 -1
            canConsumerCount.acquire();
            mutex.acquire();
            // 从库存消费一台电脑
            String removedVal = stock.remove(0);
            System.out.println(consumerName + " 正在消费数据:" + removedVal + " -- 库存剩余:" + stock.size());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            mutex.release();
            // 消费后释放canProduceCount,增加可以生产的数量
            canProduceCount.release();
        }
    }
}

还是生产消费者:

public class SemaphoreTest {
    public static void main(String[] args) {
        // 用于多线程操作的库存变量
        final Stock stock = new Stock();
        // 定义两个生产者和两个消费者
        Thread dellProducer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.put("Del");
                }
            }
        });
        Thread macProducer = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.put("Mac");
                }
            }
        });
        Thread consumer1 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.get("zhangsan");
                }
            }
        });
        Thread consumer2 = new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    stock.get("李四");
                }
            }
        });
        dellProducer.start();
        macProducer.start();
        consumer1.start();
        consumer2.start();
    }
}

运行结果图:

第四种方式:BlockingQueue
BlockingQueue的put和take底层实现其实也是使用了第二种方式中的ReentrantLock+Condition,并且帮我们实现了库存队列,方便简洁
1、定义生产者

class Producer implements Runnable {
    // 库存队列
    private BlockingQueue stock;
    // 生产/消费延迟
    private int timeOut;
    private String name;

    public Producer(BlockingQueue stock, int timeout, String name) {
        this.stock = stock;
        this.timeOut = timeout;
        this.name = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                stock.put(name);
                System.out.println(name + " 正在生产数据" + " -- 库存剩余:" + stock.size());
                TimeUnit.MILLISECONDS.sleep(timeOut);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

2、定义消费者

class Consumer implements Runnable {
    // 库存队列
    private BlockingQueue stock;
    private String consumerName;

    public Consumer(BlockingQueue stock, String name) {
        this.stock = stock;
        this.consumerName = name;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 从库存消费一台电脑
                String takeName = stock.take();
                System.out.println(consumerName + " 正在消费数据:" + takeName + " -- 库存剩余:" + stock.size());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

3、定义库存并运行

public static void main(String[] args) {
        // 定义最大库存为10
        BlockingQueue stock = new ArrayBlockingQueue<>(10);
        Thread p1 = new Thread(new Producer(stock, 500, "Mac"));
        Thread p2 = new Thread(new Producer(stock, 500, "Dell"));
        Thread c1 = new Thread(new Consumer(stock,"zhangsan"));
        Thread c2 = new Thread(new Consumer(stock, "李四"));

        p1.start();
        p2.start();
        c1.start();
        c2.start();

    }

运行结果图:

感谢阅读~欢迎指正和补充~~~

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/76979.html

相关文章

  • Java 中的 Monitor 机制

    摘要:基本元素机制需要几个元素来配合,分别是临界区对象及锁条件变量以及定义在对象上的,操作。这个外部条件在机制中称为条件变量。提供的机制,其实是,等元素合作形成的,甚至说外部的条件变量也是个组成部分。 monitor的概念 管程,英文是 Monitor,也常被翻译为监视器,monitor 不管是翻译为管程还是监视器,都是比较晦涩的,通过翻译后的中文,并无法对 monitor 达到一个直观的描...

    Jacendfeng 评论0 收藏0
  • 想进大厂?50个多线程面试题,你会多少?(一)

    摘要:下面是线程相关的热门面试题,你可以用它来好好准备面试。线程安全问题都是由全局变量及静态变量引起的。持有自旋锁的线程在之前应该释放自旋锁以便其它线程可以获得自旋锁。 最近看到网上流传着,各种面试经验及面试题,往往都是一大堆技术题目贴上去,而没有答案。 不管你是新程序员还是老手,你一定在面试中遇到过有关线程的问题。Java语言一个重要的特点就是内置了对并发的支持,让Java大受企业和程序员...

    wow_worktile 评论0 收藏0
  • 40道阿里巴巴JAVA研发岗多线程面试题详解,你能答出多少

    摘要:但是单核我们还是要应用多线程,就是为了防止阻塞。多线程可以防止这个问题,多条线程同时运行,哪怕一条线程的代码执行读取数据阻塞,也不会影响其它任务的执行。 1、多线程有什么用?一个可能在很多人看来很扯淡的一个问题:我会用多线程就好了,还管它有什么用?在我看来,这个回答更扯淡。所谓知其然知其所以然,会用只是知其然,为什么用才是知其所以然,只有达到知其然知其所以然的程度才可以说是把一个知识点...

    lpjustdoit 评论0 收藏0
  • Java 并发学习笔记

    摘要:方法可以将当前线程放入等待集合中,并释放当前线程持有的锁。此后,该线程不会接收到的调度,并进入休眠状态。该线程会唤醒,并尝试恢复之前的状态。 并发 最近重新复习了一边并发的知识,发现自己之前对于并发的了解只是皮毛。这里总结以下Java并发需要掌握的点。 使用并发的一个重要原因是提高执行效率。由于I/O等情况阻塞,单个任务并不能充分利用CPU时间。所以在单处理器的机器上也应该使用并发。为...

    DrizzleX 评论0 收藏0

发表评论

0条评论

godiscoder

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<