摘要:对于指的是线程池中正在运行的线程。总结现在就可以知道,大致的线程池实现原理首先,各自存放线程和任务,其中,任务带有阻塞。
前言
文章主要来自:点这里 。这也是博主的博客,主要分享了自己接触过的一些后端技术,有不对的地方希望可以提出。
线程池的相关类我们都知道,所谓线程池,那么就是相当于有一个池子,线程就放在这个池子中进行重复利用,能够减去了线程的创建和销毁所带来的代价。但是这样并不能很好的解释线程池的原理,下面从代码的角度分析一下线程池的实现。
对于原理,在 Java 中,有几个接口,类 值得我们关注:
Executor
ExecutorService
AbstractExecutorService
ThreadPoolExecutor
Executorpublic interface Executor { void execute(Runnable command); }
Executor 接口只有一个 方法,execute,并且需要 传入一个 Runnable 类型的参数。那么它的作用自然是 具体的执行参数传入的任务。
ExecutorServicepublic interface ExecutorService extends Executor { void shutdown(); ListshutdownNow(); boolean isShutdown(); Future submit(Callable task); Future submit(Runnable task, T result); Future> submit(Runnable task); List > invokeAll(Collection extends Callable > tasks) throws InterruptedException; ...... }
ExecutorService 接口继承了 Executor,并且提供了一些其他的方法,比如说:
shutdownNow : 关闭线程池,返回放入了线程池,但是还没开始执行的线程。
submit : 执行的任务 允许拥有返回值。
invokeAll : 运行把任务放进集合中,进行批量的执行,并且能有返回值
这三个方法也可以说是这个接口重点扩展的方法。
Ps:execute 和 submit 区别:
submit 有返回值,execute 没有返回值。 所以说可以根据任务有无返回值选择对应的方法。
submit 方便异常的处理。 如果任务可能会抛出异常,而且希望外面的调用者能够感知这些异常,那么就需要调用 submit 方法,通过捕获 Future.get 抛出的异常。
AbstractExecutorServiceAbstractExecutorService 是一个抽象类,主要完成了 对 submit 方法,invokeAll 方法 的实现。 但是其实它的内部还是调用了 execute 方法,例如:
public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); RunnableFutureThreadPoolExecutorftask = newTaskFor(task, null); execute(ftask); return ftask; }
ThreadPoolExecutor 继承了 AbstractExecutorService,并且实现了最重要的 execute 方法,是我们主要需要研究的类。另外,整个线程池是如何实现的呢?
在该类中,有两个成员变量 非常的重要:
private final HashSetworkers = new HashSet (); private final BlockingQueue workQueue;
对于 workers 变量,主要存在了线程对象 Worker,Worker 实现了 Runnable 接口。而对于 workQueue 变量,主要存放了需要执行的任务。 这样其实可以猜到, 整个线程池的实现原理应该是 workQueue 中不断的取出需要执行的任务,放在 workers 中进行处理。
另外,当线程池中的线程用完了之后,多余的任务会等待,那么这个等待的过程是 怎么实现的呢? 其实如果熟悉 BlockingQueue,那么就会马上知道,是利用了 BlockingQueue 的take 方法进行处理。
下面具体代码分析:
public void execute(Runnable command) { ...... if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } ...... }
首先,这里需要先理解两个概念。我们在创建线程池的时候,通常会指定两个变量,一个是maximumPoolSize,另外一个是 corePoolSize。
对于 maximumPoolSize:指的是 线程池中最多允许有多少个线程。
对于 corePoolSize: 指的是线程池中正在运行的线程。
在 线程池中,有这样的设定,我们加入一个任务进行执行,
如果现在线程池中正在运行的线程数量大于 corePoolSize 指定的值而 小于maximumPoolSize 指定的值,那么就会创建一个线程对该任务进行执行,一旦一个线程被创建运行。
如果线程池中的线程数量大于corePoolSize,那么这个任务执行完毕后,该线程会被回收;如果 小于corePoolSize,那么该线程即使空闲,也不会被回收。下个任务过来,那么就使用这个空闲线程。
对于上述代码,首先有:
if (workerCountOf(c) < corePoolSize)
也就是说,判断现在的线程数量是否小于corePoolSize,如果小于,那么就创建一个线程执行该任务,也就是执行
addWorker(command, true)
如果大于,那么就把该任务放进队列当中,即
workQueue.offer(command)
那么,addWorker 是干什么的呢?
private boolean addWorker(Runnable firstTask, boolean core) { boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } ...... }
在这里可以看到一些关键代码,例如 w = new Worker(firstTask), 以及 workers.add(w); 从这里 我们就可以看到,创建 线程对象 并且加入到 线程 队列中。但是,我们现在还没有看到具体是怎么执行任务的,继续追踪
w = new Worker(firstTask),如下代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { ...... final Thread thread; Runnable firstTask; Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } ......
对于 runWorker 方法:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } ...... }
在这段代码中,就有很多关键的信息,比如说,Runnable task = w.firstTask;如果为空,那么就 执行 task = getTask(),如果不为空,那么就 进行 task.run(); 调用其方法,这里也就是具体的执行的任务。
现在知道了是怎么样执行具体的任务,那么假如任务的数量 大于 线程池的数量,那么是怎么实现等待的呢,这里就需要看到getTask() 的具体实现了,如下:
private Runnable getTask() { for (;;) { ...... try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
这里可以看到, 一个 for 死循环,以及
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
而 workQueue 是 BlockingQueue 类型,也就是带有阻塞的功能。
这就是 线程如何等待执行的。
总结现在就可以知道,大致的线程池实现原理:
首先,各自存放线程和任务,其中,任务带有阻塞。
private final HashSetworkers = new HashSet (); private final BlockingQueue workQueue;
然后,在 execute 方法中 进行 addWorker(command,true),也就是创建一个线程,把任务放进去执行;或者是直接把任务放入到任务队列中。
接着 如果是 addWorker,那么就会 new Worker(task) -》调用其中 run() 方法,在Worker 的run() 方法中,调用 runWorker(this); 方法 -》在该方法中就会具体执行我们的任务 task.run(); 同时这个 runWorker方法相当于是个死循环,正常情况下就会一直取出 任务队列中的任务来执行,这就保证了线程 不会销毁。
所以,这也是为什么常说的线程池可以避免线程的频繁创建和 销毁带来的性能消耗。
写在最后写出来,说出来才知道对不对,知道不对才能改正,改正了才能成长。
在技术方面,希望大家眼里都容不得沙子。如果有不对的地方或者需要改进的地方希望可以指出,万分感谢。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/64900.html
摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...
摘要:进程线程与协程它们都是并行机制的解决方案。选择是任意性的,并在对实现做出决定时发生。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此线程池支持定时以及周期性执行任务的需求。 并发与并行的概念 并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件 并行(Parallel...
摘要:为程序员金三银四精心挑选的余道面试题与答案,欢迎大家向我推荐你在面试过程中遇到的问题我会把大家推荐的问题添加到下面的常用面试题清单中供大家参考。 为Java程序员金三银四精心挑选的300余道Java面试题与答案,欢迎大家向我推荐你在面试过程中遇到的问题,我会把大家推荐的问题添加到下面的常用面试题清单中供大家参考。 前两天写的以下博客,大家比较认可,热度不错,希望可以帮到准备或者正在参加...
摘要:今天给大家总结一下,面试中出镜率很高的几个多线程面试题,希望对大家学习和面试都能有所帮助。指令重排在单线程环境下不会出先问题,但是在多线程环境下会导致一个线程获得还没有初始化的实例。使用可以禁止的指令重排,保证在多线程环境下也能正常运行。 下面最近发的一些并发编程的文章汇总,通过阅读这些文章大家再看大厂面试中的并发编程问题就没有那么头疼了。今天给大家总结一下,面试中出镜率很高的几个多线...
摘要:线程池的工作原理一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行任务的任务队列阻塞队列。使用线程池可以对线程进行统一的分配和监控。线程池的注意事项虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。 线程池的工作原理一个线程池管理了一组工作线程, 同时它还包括了一个用于放置等待执行 任务的任务队列(阻塞队列) 。 一个线程池管理了一组工作线程, 同时它还...
阅读 3390·2021-09-26 09:46
阅读 2728·2021-09-13 10:23
阅读 3445·2021-09-07 10:24
阅读 2363·2019-08-29 13:20
阅读 2888·2019-08-28 17:57
阅读 3034·2019-08-26 13:27
阅读 1150·2019-08-26 12:09
阅读 475·2019-08-26 10:27