摘要:中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。代码中如果执行了方法,线程池会提前创建并启动所有核心线程。线程池最大数量线程池允许创建的线程最大数量。被称为是可重用固定线程数的线程池。
Java中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。那么线程池有什么好处呢,以及他的实现原理是怎么样的呢?
在开发过程中,合理的使用线程池能够带来以下的一些优势,这些优势同时也是一些其他池化的优势,例如数据库连接池,http连接池等。
降低资源消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。
提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。
提高线程的可管理性 线程的比较稀缺的资源,如果无限的创建,不仅会消耗系统资源,还会降低系统的稳定性。
线程池实现原理我们创建线程池完成之后,当把一个任务提交给线程池处理的时候,线程池的处理流程如下:
1)线程池判断核心线程池的任务是否都在执行任务,如果不是,则创建一个新的线程来执行任务,如何核心线程池的线程都在执行任务,则进入下一个流程
2)线程池判断工作队列是否已满,如果工作队列没有满,那么新提交的任务将会存储在工作队列中,如果工作队列满了,那会进入到下一个流程
3)线程池判断线程池中的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,那么使用饱和策略来处理这个任务
创建线程池可以使用java中已经内置的一些默认配置的线程池,也可以使用ThreadPoolExecutor来自己配置参数来创建线程池,阿里代码规约中推荐后者来创建线程池,这样创建的线程池更加符合业务的实际需求。
下面是我创建线程池的一个例子:
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
在上面的这段代码里new ThreadPoolExecutor中一共有6个参数,我们来解析一下这些参数的意义,这样下次创建线程池的时候能够更加灵活的使用。
corePoolSize (核心线程的数量):任务提交到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池核心数大小时就不再创建。代码中如果执行了prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
maximumPoolSize (线程池最大数量):线程池允许创建的线程最大数量。如果队列满了,并且创建的线程数小于最大线程数,那么线程池会再创建新的线程执行任务,如果使用无界队列,那么这个参数设置了意义不大。
keepAliveTime (线程保持活动时间):线程池的工作线程空闲后,保持存活的时间,如果任务很多,切任务执行时间较长,这个值可以设置的大一点。
TimeUnit (上面的那个时间的单位)
BlockingQueue
ArrayBlockingQueue:基于数组的有界阻塞队列 FIFO
LinkedBlockingQueue:基于链表的有界阻塞队列 FIFO 吞吐量高于ArrayBlockingQueue, FixedThreadPool基于这个实现
SynchronousQueue: 不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态方法:Executors.newCachedThreadPool使用了这个队列
PriorityBlockingQueue 具有优先级的无界阻塞队列
handler (饱和策略):当队列和线程池都满了,说明线程池已经处于饱和状态,那么必须采取一种策略处理新提交的任务,目前提供四种策略
AbortPolicy:直接抛出异常
CallerRunsPolicy: 只用调用者所在线程来运行任务
DiscardOldestPolicy :丢弃队列里最近的一个任务,执行当前任务
DiscardPolicy : 不处理丢弃掉
也可以实现RejectedExecutionHandler接口自定义处理方式。
线程池创建完成之后,就可以处理提交的任务,任务如何提交到线程池呢,线程池提供了两个方法:submit()和execute()方法。
public static void main(String[] args) { //提交没有返回结果的任务 EXECUTOR_SERVICE.execute(new Runnable() { @Override public void run() { System.out.println("hello world"); } }); Future future = EXECUTOR_SERVICE.submit(new Runnable() { @Override public void run() { System.out.println("hello world1"); } }); //提交有返回结果的任务,返回一个future的对象,调用get方法获取返回值,阻塞直到返回 Future future1 = EXECUTOR_SERVICE.submit(new Callable
Java对ExecutorService关闭方式有两种,一是调用shutdown()方法,二是调用shutdownNow()方法。
这两个方法虽然都可以关闭线程池,但是有一些区别
shutdown()
1、调用之后不允许继续往线程池内继续添加线程,如果继续添加会出现RejectedExecutionException异常;
2、线程池的状态变为SHUTDOWN状态;
3、所有在调用shutdown()方法之前提交到ExecutorSrvice的任务都会执行;
4、一旦所有线程结束执行当前任务,ExecutorService才会真正关闭。
shutdownNow()
1、该方法返回尚未执行的 task 的 List;
2、线程池的状态变为STOP状态;
3、阻止所有正在等待启动的任务, 并且停止当前正在执行的任务;
最好的关闭线程池的方法:
package multithread.threadpool; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** * @author pangjianfei * @Date 2019/6/21 * @desc 测试线程池 */ public class TestThreadPoolExecutor { static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); public static void main(String[] args) { EXECUTOR_SERVICE.execute( ()-> System.out.println("hello world")); Future future = EXECUTOR_SERVICE.submit(() -> System.out.println("hello world1")); Future future1 = EXECUTOR_SERVICE.submit(() -> {System.out.println("hello world1");return "succ";}); Future future2 = EXECUTOR_SERVICE.submit(() -> {Thread.sleep(5000);System.out.println("hello world2");return "线程执行成功了";}); try { System.out.println(future.get()); System.out.println(future1.get()); System.out.println(future2.get()); } catch (Exception e) { e.printStackTrace(); } Runnable run = () -> { long sum = 0; boolean flag = true; while (flag && !Thread.currentThread().isInterrupted()) { sum += 1; if (sum == Long.MAX_VALUE) { flag = false; } } }; EXECUTOR_SERVICE.execute(run); //先调用shutdown()使线程池状态改变为SHUTDOWN EXECUTOR_SERVICE.shutdown(); try { //2s后检测线程池内的线程是否执行完毕 if (!EXECUTOR_SERVICE.awaitTermination(2, TimeUnit.SECONDS)) { //内部实际通过interrupt来终止线程,所以当调用shutdownNow()时,isInterrupted()会返回true。 EXECUTOR_SERVICE.shutdownNow(); } } catch (InterruptedException e) { EXECUTOR_SERVICE.shutdownNow(); } } }
一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)
如果是CPU密集型应用,则线程池大小设置为N+1
如果是IO密集型应用,则线程池大小设置为2N+1
IO优化中,更加合理的方式是:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目
linux查看CPU的数量:
# 查看物理CPU个数 cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l # 查看每个物理CPU中core的个数(即核数) cat /proc/cpuinfo| grep "cpu cores"| uniq # 查看逻辑CPU的个数 cat /proc/cpuinfo| grep "processor"| wc -l
基于下面的方法可以对线程池中的一些数据进行监控:
//当前排队线程数 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getQueue().size(); //当前活动的线程数 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getActiveCount(); //执行完成的线程数 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getCompletedTaskCount(); //总线程数 ((ThreadPoolExecutor)EXECUTOR_SERVICE).getTaskCount();Executor框架
Executor框架的调度模型是一种二级调度模型。
在HotSpot VM 的线程模型中,Java线程会被一对一的映射为本地操作系统的线程。Java线程的启动与销毁都与本地线程同步。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,我们通常会创建任务,然后将任务提交到调度器,调度器(Executor框架)将这些任务映射为对应数量的线程;底层,操作系统会将这些线程映射到硬件处理器上,这里不是由应用程序控制。模型图如下:
Executor框架主要包括三部分:
任务: 定义的被执行的任务需要实现两个接口之一:Runable、Callable;
任务的执行: 包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor、ForkJoinPool;
任务的异步计算结果: 包括Future接口和实现Future接口的FutureTask类、ForkJoinTask类。
Executor主要包含的类和接口如下:
Executor是一个接口,他是Executor框架的基础,他将任务的提交和执行分离。
ThreadPoolExecutor是线程池的核心类,用来执行被提交的任务
ScheduleThreadPoolExecutor是一个实现类,可以在给定的延迟后执行命令,或者定期执行命令,比Timer更加灵活。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果
Executor框架最核心的类就是ThreadPoolExecutor,他是线程池的实现类,我们在上面看到过创建线程池的例子
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); 里面包含了几个核心的参数,通过Executor框架的工具类Executors,可以直接创建3中类型的线程池。
FixedThreadPool被称为是可重用固定线程数的线程池。为什么会这么说,我们看一下他的实现源码:
public static ExecutorService newFixedThreadPool(int nThreads) { /** * 核心线程数和最大线程数相同,线程等待时间为0表示多余的空闲线程会被立刻终止,LinkedBlockingQueue默认容量是Integer.MAX_VALUE,基本类似于无界队列 */ return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
SingleThreadExecutor是单个Worker线程的Executor.他的实现源码如下:
/** * 可以看到核心线程数和最大线程数都是1,相当于是创建一个单线程顺序的执行队列中的任务 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
CachedThreadPool是一个会根据需要创建新线程的线程池,源码如下:
/** * 核心线程数为0,最大线程数为Integer.MAX_VALUE,线程存活时间为1分钟,而且是使用SynchronousQueue没有容量的队列,这种情况下任务提交快的是时候,会创建大量的线程,耗尽CPU的资源 */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
ScheduledThreadPoolExecutor用来在给定的延迟之后运行任务,或者定期执行任务,他比Timer更加灵活,Timer是创建一个后台线程,而这个线程池可以在构造函数中指定多个对应的后台线程。源码如下:
/** * 这里使用的是无界的队列,如果核心线程池已满,那么任务会一直提交到队列 */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
ScheduledThreadPoolExecutor的执行主要包含两部分
/**参数的作用:1:执行的线程 2、初始化延时 3、两次开始执行最小时间间隔 4、计时单位*/ /**scheduleAtFixedRate()是按照指定频率执行一个任务,就是前一个任务没有执行完成,但是下次执行时间到,仍然会启动线程执行新的任务*/ scheduledExecutorService.scheduleAtFixedRate(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); /**参数作用同上*/ /**scheduleWithFixedDelay()是按照指定频率间隔执行,本次执行结果之后,在延时10s执行下一次的任务*/ scheduledExecutorService.scheduleWithFixedDelay(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS); 上面的两种方法都是向ScheduledThreadPoolExecutor的DelayQueue中添加了一个实现了RunnableScheduleFuture接口的ScheduledTask. public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); //这里对commond进行了封装 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); sft.outerTask = t; delayedExecute(t); return t; }
DelayQueue是一个支持优先级的队列,这里默认会按照执行时间进行排序,time小的排在前面。线程在获取到期需要执行的ScheduledTutureTask之后,执行这个任务,执行完成之后会修改time,将其变为下次要执行的时间,修改之后放回到DelayQueue中。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77839.html
摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...
摘要:任务性质不同的任务可以用不同规模的线程池分开处理。线程池在运行过程中已完成的任务数量。如等于线程池的最大大小,则表示线程池曾经满了。线程池的线程数量。获取活动的线程数。通过扩展线程池进行监控。框架包括线程池,,,,,,等。 Java线程池 [toc] 什么是线程池 线程池就是有N个子线程共同在运行的线程组合。 举个容易理解的例子:有个线程组合(即线程池,咱可以比喻为一个公司),里面有3...
摘要:线程池的工作原理一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行任务的任务队列阻塞队列。使用线程池可以对线程进行统一的分配和监控。线程池的注意事项虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。 线程池的工作原理一个线程池管理了一组工作线程, 同时它还包括了一个用于放置等待执行 任务的任务队列(阻塞队列) 。 一个线程池管理了一组工作线程, 同时它还...
摘要:高并发系列第篇文章。简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。如果调用了线程池的方法,线程池会提前把核心线程都创造好,并启动线程池允许创建的最大线程数。 java高并发系列第18篇文章。 本文主要内容 什么是线程池 线程池实现原理 线程池中常见的各种队列 自定义线程创建的工厂 常见的饱和策略 自定义饱和策略 ...
阅读 1672·2021-11-12 10:36
阅读 1588·2021-11-12 10:36
阅读 3394·2021-11-02 14:46
阅读 3747·2019-08-30 15:56
阅读 3420·2019-08-30 15:55
阅读 1420·2019-08-30 15:44
阅读 1019·2019-08-30 14:00
阅读 2708·2019-08-29 18:41