摘要:前言这篇主要来讲解多线程中一个非常经典的设计模式包括它的基础到拓展希望大家能够有所收获生产者消费者模式简述此设计模式中主要分两类线程生产者线程和消费者线程生产者提供数据和任务消费者处理数据和任务该模式的核心就是数据和任务的交互点共享内存缓
前言
这篇主要来讲解多线程中一个非常经典的设计模式
包括它的基础到拓展
希望大家能够有所收获
此设计模式中主要分两类线程:生产者线程和消费者线程
生产者提供数据和任务
消费者处理数据和任务
该模式的核心就是数据和任务的交互点:共享内存缓存区
下面给出简单易懂的一张图:
使用BlockingQueue来做缓冲区是非常合适的
通过BlockingQueue来理解生产者消费者模式
首先我们要知道BlockingQueue是什么?
它是一个实现接口,有很多实现类,比如:
ArrayBlockingQueue:前面讲过,这个队列适合做有界队列,固定线程数
LinkedBlockingQueue:它适合做无界队列
......
以ArrayBlockingQueue为例
它在内部放置了一个对象数组:
final Object[] items;
通过items数组来进行元素的存取
1(存).向队列中压入一个元素:
.offer():如果队列满了,返回false
.put():将元素压入队列末尾,如果队列满了,它就会一直等待
2(取).向队列中弹出元素(从头部弹出):
.poll():如果队列为空,返回null
.take():如果队列为空,继续等待,知道队列中有元素
了解了上面这些基础后,我们来看下实际操作是怎样的
在开始之前我们要有一个Entity类,只存一个long类型的value值进去:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
有了这个数据模型,看下最后的执行main方法:
public static void main(String[] args) throws InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); //建立线程池 BlockingQueueblockingQueue = new ArrayBlockingQueue (10); //建立缓存队列 for (int i=0;i<3;i++){ Producer i = new Producer(queue); executor.execute(i); } //制造三个生产线程 for (int j=0;j<3;j++){ Consumer j = new Consumer(queue); executor.execute(j); } //制造三个消费线程 Thread.sleep(10000); for (int i=0;i<3;i++){ i.stop(); } //停止生产 Thread.sleep(5000); executor.shutdown(); }
这里只给出Main,大家可以通过代码简单理解使用BlockingQueue做缓冲区的过程
没有给出生产者和消费者的具体线程实现类,除了博主比较懒之外,还有是因为使用BlockingQueue做缓冲区并不推荐使用
虽然BlockingQueue是个不错的选择,但它使用了锁和阻塞来保证线程间的同步,并不具备良好的并发性能
下面讲解一种具有高性能的共享缓冲区
我们知道BlockingQueue队列的性能不是特别优越
而之前讲到过ConcurrentLinkedQueue是一个高性能队列,因为它使用了大量的CAS操作
同理,如果我们利用CAS操作实现生产者-消费者模式,性能就可以得到客观的提升
但是大量的CAS操作自己实现起来非常困难
所以推荐使用Disruptor框架
实际工作还是得使用成熟的框架,Disruptor是一款高效的无锁内存队列
它不像传统队列有head和tail指针来操控入列和出列
而是实现了一个固定大小的环形队列(RingBuffer),来看下实际模型图:
生产者向缓冲区写入数据,消费者从缓冲区读取数据,大家都使用了CAS操作
而且由于是环形队列的原因,可以做到完全的内存复用
从而大大减少系统分配空间以及回收空间的额外开销
那么这个框架怎么使用呢?
1.导入包(博主使用了Maven依赖,不同版本大同小异):
com.lmax disruptor 3.3.2
2.依旧创建一个entity类:
public class MyData { private long value; public long getValue() { return value; } public void setValue(long value) { this.value = value; } }
3.还要写一个Factory类,细心的同学会看到环形队列是固定大小的
这个Factory会在Disruptor实例对象构造时,构造所有缓冲区中的对象实例
public class DataFactory implements EventFactory{ @Override public Object newInstance() { return new MyData(); } }
4.生产者(具体每行代码的作用都已经注释):
public class Producers { private final RingBufferringBuffer; //创建环形队列(环形缓冲区) public Producers(RingBuffer ringBuffer) { this.ringBuffer = ringBuffer; //将ringBuffer与Producers绑定 } public void putData(ByteBuffer byteBuffer){ //此方法将产生的数据推入缓冲区 long sequeue = ringBuffer.next(); //通过.next()方法得到ringBuffer的下一个节点,并且赋值给sequeue MyData event = ringBuffer.get(sequeue); //将mydata数据存入到下一个节点 event.setValue(byteBuffer.getLong(0)); //mydata的值有ByteBuffer参数带入 ringBuffer.publish(sequeue); //将sequeue节点内的数据发布 } }
5.消费者:
public class Consumers implements WorkHandler{ @Override public void onEvent(MyData myData) throws Exception { System.out.println("当前线程为:"+Thread.currentThread().getId()+"线程,它处理的数据是:"+myData.getValue()); } }
6.执行函数:
public class RunTest { public static void main(String[] args) throws InterruptedException { Executor executor = Executors.newCachedThreadPool(); //创建线程池 DataFactory dataFactory = new DataFactory(); //创建Factory实例 int bufferSize = 1024; //设置缓存区大小为1024(必须是2的整数次幂) Disruptordisruptor = new Disruptor ( dataFactory, bufferSize, executor, ProducerType.MULTI, new BlockingWaitStrategy() ); disruptor.handleEventsWithWorkerPool( new Consumers(), new Consumers(), new Consumers(), new Consumers() ); disruptor.start(); //Disruptor启动 RingBuffer ringBuffer = disruptor.getRingBuffer(); //实例化环形队列并与Disruptor绑定 Producers producers = new Producers(ringBuffer); //实例化生产者并绑定ringBuffer ByteBuffer byteBuffe = ByteBuffer.allocate(8); //创建一个容量为256字节的ByteBuffer for (long n = 0;true;n++){ byteBuffe.putLong(0,n); producers.putData(byteBuffe); Thread.sleep(100); System.out.println("add data "+n); } } }
我们来看下执行结果:
当前线程为:13线程,它处理的数据是:1059 add data 1059 当前线程为:11线程,它处理的数据是:1060 add data 1060 当前线程为:10线程,它处理的数据是:1061 add data 1061 当前线程为:12线程,它处理的数据是:1062 add data 1062 当前线程为:13线程,它处理的数据是:1063 add data 1063 当前线程为:11线程,它处理的数据是:1064 add data 1064 当前线程为:10线程,它处理的数据是:1065
可以看出,因为我无限的让生产线程生产数据,而RingBuffer中那十几条消费线程不停的消费数据
此外Disruptor不止CAS操作,还提供了四种等待策略让消费者监控缓冲区的信息:
1.BlockingWaitStrategy:默认策略,最节省CPU,但在高并发下性能表现最糟糕
2.SleepingWaitStrategy:等待数据时自旋等待,不成功会使用LockSupport方法阻塞自己,通常用于异步日志
3.YieldWaitStrategy:用于低延时场合,在内部执行Thread.yield()死循环
4.BusySpinWaitStrategy:消费线程进行死循环监控缓冲区,吃掉所有CPU资源
除了CAS操作,消费者等待策略,Disruptor还使用CPU Cache的优化来进行优化
根据Disruptor官方报道:Disruptor的性能比BlockingQueuez至少高一倍以上!
以上便是生产者消费者模式的应用
谢谢阅读,记得点关注看更新
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/70719.html
摘要:今天就先到这里,大家可以看看这些内容的拓展记得点关注看更新,谢谢阅读 前言 这是一个长篇博客,希望大家关注我并且一起学习java高并发废话不多说,直接开始 并行和并发 并行:多个线程同时处理多个任务并发:多个线程处理同个任务,不一定要同时 下面用图来描述并行和并发的区别:(实现和虚线表示两个不同的线程) showImg(https://segmentfault.com/img/bVYT...
摘要:可以用代替可以用代替定义的对象的值是不可变的今天就先到这里,大家可以看看这些内容的拓展记得点关注看更新,谢谢阅读 前言 java高并发第二篇讲的是java线程的基础依旧不多说废话 线程和进程 进程是操作系统运行的基础,是一个程序运行的实体,windows上打开任务管理器就能看到进程线程是轻量级的进程,是程序执行的最小单位,是在进程这个容器下进行的 线程基本操作 新建一个线程类有两种方式...
摘要:前言今天讲的多线程的同步控制直接进入正题重入锁重入锁可以完全代替,它需要类来实现下面用一个简单的例子来实现重入锁以上代码打印出来的是,可以说明也实现了线程同步它相比更加灵活,因为重入锁实现了用户自己加锁,自己释放锁记得一定要释放,不然其他线 前言 今天讲的多线程的同步控制直接进入正题 ReentrantLock重入锁 重入锁可以完全代替synchronized,它需要java.util...
摘要:前言本篇主要讲解如何去优化锁机制或者克服多线程因为锁可导致性能下降的问题线程变量有这样一个场景,前面是一大桶水,个人去喝水,为了保证线程安全,我们要在杯子上加锁导致大家轮着排队喝水,因为加了锁的杯子是同步的,只能有一个人拿着这个唯一的杯子喝 前言 本篇主要讲解如何去优化锁机制或者克服多线程因为锁可导致性能下降的问题 ThreadLocal线程变量 有这样一个场景,前面是一大桶水,10个...
阅读 1675·2021-11-12 10:35
阅读 1618·2021-08-03 14:02
阅读 2689·2019-08-30 15:55
阅读 2031·2019-08-30 15:54
阅读 765·2019-08-30 14:01
阅读 2431·2019-08-29 17:07
阅读 2257·2019-08-26 18:37
阅读 3035·2019-08-26 16:51