摘要:用于限定中线程数的最大值。该线程池中的任务队列维护着等待执行的对象。线程池和消息队列笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。以上是笔者在学习实践之后对于多线程和消息队列的粗浅认识,初学者切莫混淆两者的作用。
多线程编程很难,难点在于多线程代码的执行不是按照我们直觉上的执行顺序。所以多线程编程必须要建立起一个宏观的认识。
线程池是多线程编程中的一个重要概念。为了能够更好地使用多线程,学习好线程池当然是必须的。
为什么要使用线程池?平时我们在使用多线程的时候,通常都是架构师配置好了线程池的 Bean,我们需要使用的时候,提交一个线程即可,不需要过多关注其内部原理。
在学习一门新的技术之前,我们还是先了解下为什么要使用它,使用它能够解决什么问题:
创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率
例如:
记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3
如果T1+T3>T2,那么是不是说开启一个线程来执行这个任务太不划算了!
正好,线程池缓存线程,可用已有的闲置线程来执行新任务,避免了T1+T3带来的系统开销
线程并发数量过多,抢占系统资源从而导致阻塞
我们知道线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况
运用线程池能有效的控制线程最大并发数,避免以上的问题
对线程进行一些简单的管理
比如:延时执行、定时循环执行的策略等
运用线程池都能进行很好的实现
创建一个线程池在 Java 中,新建一个线程池对象非常简单,Java 本身提供了工具类java.util.concurrent.Executors,可以使用如下代码创建一个固定数量线程的线程池:
ExecutorService service = Executors.newFixedThreadPool(10);
注意:以上代码用来测试还可以,实际使用中最好能够显示地指定相关参数。
我们可以看下其内部源码实现:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
在阿里巴巴代码规范中,建议我们自己指定线程池的相关参数,为的是让开发人员能够自行理解线程池创建中的每个参数,根据实际情况,创建出合理的线程池。接下来,我们来剖析下java.util.concurrent.ThreadPoolExecutor的构造方法参数。
ThreadPoolExecutor 浅析java.util.concurrent.ThreadPoolExecutor有多个构造方法,我们拿参数最多的构造方法来举例,以下是阿里巴巴代码规范中给出的创建线程池的范例:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy());
贴一张IDEA中的图更方便看:
首先最重要的几个参数,可能就是:corePoolSize,maximumPoolSize,workQueue了,先看下这几个参数的解释:
corePoolSize
用于设定 thread pool 需要时刻保持的最小 core threads 的数量,即便这些 core threads 处于空闲状态啥事都不做也不会将它们回收掉,当然前提是你没有设置 allowCoreThreadTimeOut 为 true。至于 pool 是如何做到保持这些个 threads 不死的,我们稍后再说。
maximumPoolSize
用于限定 pool 中线程数的最大值。如果你自己构造了 pool 且传入了一个 Unbounded 的 queue 且没有设置它的 capacity,那么不好意思,最大线程数会永远 <= corePoolSize,maximumPoolSize 变成了无效的。
workQueue
该线程池中的任务队列:维护着等待执行的 Runnable 对象。当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务
由于本文是初步了解线程池,所以先理解这几个参数,上文对于这三个参数的解释,基本上跟JDK源码中的注释一致(java.util.concurrent.ThreadPoolExecutor#execute里的代码)。
我们编写个程序来方便理解:
// 创建线程池 ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); // 等待执行的runnable Runnable runnable = () -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } }; // 启动的任务数量 int counts = 1224; for (int i = 0; i < counts; i++) { service.execute(runnable); } // 监控线程池执行情况的代码 ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }
线程池的容量与我们启动的任务数量息息相关。
已知:
corePoolSize = 5
maximumPoolSize = 200
workQueue.size() = 1024
我们修改同时 execute 添加到线程池的 Runnable 数量 counts:
counts <= corePoolSize:所有的任务均为核心线程执行,没有任何 Runnable 被添加到 workQueue中
当前排队线程数:0 当前活动线程数:3 执行完成线程数:0 总线程数:3
corePoolSize < counts <= corePoolSize + workQueue.size():所有任务均为核心线程执行,当核心线程处于繁忙状态,则将任务添加到 workQueue 中等待
当前排队线程数:15 当前活动线程数:5 执行完成线程数:0 总线程数:20
corePoolSize + workQueue.size() < counts <= maximumPoolSize + workQueue.size():corePoolSize 个线程由核心线程执行,超出队列长度 workQueue.size() 的任务,将另启动非核心线程执行
当前排队线程数:1024 当前活动线程数:105 执行完成线程数:0 总线程数:1129
counts > maximumPoolSize + workQueue.size():将会报异常java.util.concurrent.RejectedExecutionException
java.util.concurrent.RejectedExecutionException: Task com.bwjava.util.ExecutorServiceUtilTest$$Lambda$1/314265080@725bef66 rejected from java.util.concurrent.ThreadPoolExecutor@2aaf7cc2[Running, pool size = 200, active threads = 200, queued tasks = 1024, completed tasks = 0]线程池踩坑:线程嵌套导致阻塞
这次的踩坑才是我写这篇文章的初衷,借此机会好好了解下线程池的各个概念。本身这段时间在研究爬虫,为了尽量提高爬虫的效率,用到了多线程处理。由于代码写得比较随性,所以遇到了一个阻塞的问题,研究了一下才搞明白,模拟的代码如下:
ThreadPoolExecutor service = new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1024), new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build(), new ThreadPoolExecutor.AbortPolicy()); @Test public void testBlock() { Runnable runnableOuter = () -> { try { Runnable runnableInner1 = () -> { try { TimeUnit.SECONDS.sleep(3); // 模拟比较耗时的爬虫操作 } catch (InterruptedException e) { e.printStackTrace(); } }; Future> submit = service.submit(runnableInner1); submit.get(); // 实际业务中,runnableInner2需要用到此处返回的参数,所以必须get Runnable runnableInner2 = () -> { try { TimeUnit.SECONDS.sleep(5); // 模拟比较耗时的爬虫操作 } catch (InterruptedException e) { e.printStackTrace(); } }; Future> submit2 = service.submit(runnableInner2); submit2.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } }; for (int i = 0; i < 20; i++) { service.execute(runnableOuter); } ThreadPoolExecutor tpe = ((ThreadPoolExecutor) service); while (true) { System.out.println(); int queueSize = tpe.getQueue().size(); System.out.println("当前排队线程数:" + queueSize); int activeCount = tpe.getActiveCount(); System.out.println("当前活动线程数:" + activeCount); long completedTaskCount = tpe.getCompletedTaskCount(); System.out.println("执行完成线程数:" + completedTaskCount); long taskCount = tpe.getTaskCount(); System.out.println("总线程数:" + taskCount); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }
线程池是前文的线程池,参数完全不变。线程的监控代码也一致。当我们运行这个单元测试的时候,会发现打印出来的结果一直是如下:
当前排队线程数:15 当前活动线程数:5 执行完成线程数:0 总线程数:20 当前排队线程数:20 当前活动线程数:5 执行完成线程数:0 总线程数:25 当前排队线程数:20 当前活动线程数:5 执行完成线程数:0 总线程数:25 ……略
根本问题是 Runnable 内部还嵌套了 Runnable ,且他们都提交到了一个线程池。下面分步骤说明问题:
runnableOuter 被提交到了线程池
runnableOuter 开始执行,runnableInner1 被提交到线程池,对 runnableInner1 的结果进行 get,导致runnableOuter 被阻塞
于此同时,更多的 runnableOuter 被提交到线程池,核心线程被 runnableOuter 和 runnableInner1 占满,多余的线程 runnableInner2 被加入 workQueue 中等待执行
runnableInner2 被提交到线程池,但是因为核心线程已满,被提交到了 workQueue ,也处于阻塞状态,此时对 runnableInner2 的结果进行 get,导致 runnableOuter 被阻塞
runnableOuter 被阻塞,无法释放核心线程资源,而 runnableInner2 又因为无法得到核心线程资源,只能呆在 workQueue 里,导致整个程序卡死,无法返回。(有点类似死锁,互相占有了资源,对方不释放,我也不释放)
用图表示大概为:
既然明白了出错的原因,那么解决起来就简单了。这个案例告诉我们,设计一个多线程程序,一定要自顶向下有一个良好的设计,然后再开始编码,不能够盲目地使用多线程、线程池,这样只会导致程序出现莫名其妙的错误。
动态修改 corePoolSize & maximumPoolSize其实这个我没怎么关注过,曾经在一次面试中被问到过。很简单,java.util.concurrent.ThreadPoolExecutor提供了Setter方法,可以直接设置相关参数。按我目前的实践经验,几乎没有用到过,但是知道这个聊胜于无吧。特定的复杂场景下应该很有用。
线程池和消息队列笔者在实际工程应用中,使用过多线程和消息队列处理过异步任务。很多新手工程师往往弄不清楚这两者的区别。按笔者的浅见:
多线程是用来充分利用多核 CPU 以提高程序性能的一种开发技术,线程池可以维持一个队列保存等待处理的多线程任务,但是由于此队列是内存控制的,所以断电或系统故障后未执行的任务会丢失。
消息队列是为消息处理而生的一门技术。其根据消费者的自身消费能力进行消费的特性使其广泛用于削峰的高并发任务处理。此外利用其去耦合的特性也可以实现代码上的解耦。消息队列大多可以对其消息进行持久化,即使断电也能够恢复未被消费的任务并继续处理。
以上是笔者在学习实践之后对于多线程和消息队列的粗浅认识,初学者切莫混淆两者的作用。
参考文献:
Deep thinking in Java thread pool
线程池,这一篇或许就够了
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/73575.html
摘要:参数说明,线程池保留的最小线程数。,线程池中允许拥有的最大线程数。,线程池的运行状态。除非线程池状态发生了变化,发退回到外层循环重新执行,判断线程池的状态。是线程池的核心控制状态,包含的线程池运行状态和有效线程数。 Java是一门多线程的语言,基本上生产环境的Java项目都离不开多线程。而线程则是其中最重要的系统资源之一,如果这个资源利用得不好,很容易导致程序低效率,甚至是出问题。 有...
摘要:前言最近发现很多小伙伴对于线程池的原理不是特别的理解,所以想通过这篇文章来让大家更好的认识线程池的原理,了解到其是如何工作的讲解下面我会将线程池比作一个公司的一个部门,介绍线程池如何工作的,同时介绍其中的一些关键组件和参数。 前言 最近发现很多小伙伴对于Java线程池ThreadPoolExecutor的原理不是特别的理解,所以想通过这篇文章来让大家更好的认识线程池的原理,了解到其是如...
摘要:深入理解线程池线程池初探所谓线程池,就是将多个线程放在一个池子里面所谓池化技术,然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程,然后执行我们的任务。最后的的意思是需要确保线程池已经被启动起来了。 深入理解Java线程池 线程池初探 所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候不是创建一个线程,而是从线程池里面获取一个可用的线程...
摘要:本文是作者自己对中线程的状态线程间协作相关使用的理解与总结,不对之处,望指出,共勉。当中的的数目而不是已占用的位置数大于集合番一文通版集合番一文通版垃圾回收机制讲得很透彻,深入浅出。 一小时搞明白自定义注解 Annotation(注解)就是 Java 提供了一种元程序中的元素关联任何信息和着任何元数据(metadata)的途径和方法。Annotion(注解) 是一个接口,程序可以通过...
阅读 3516·2021-09-27 13:35
阅读 3554·2019-08-29 17:09
阅读 2423·2019-08-26 11:30
阅读 696·2019-08-26 10:32
阅读 530·2019-08-26 10:23
阅读 1192·2019-08-26 10:20
阅读 3147·2019-08-23 15:26
阅读 3546·2019-08-23 14:33