资讯专栏INFORMATION COLUMN

ThreadPoolExecutor浅谈

garfileo / 724人阅读

摘要:耐心看完的你或多或少会有收获的解释在了解线程池之前,希望你已经了解了内存模型和前位表示运行状态,后面位存储当前运行最大容量实际线程池大小还是由决定以下为线程池的几个状态官方注释在最上方接受新的任务不接受新的任务,但是已在

耐心看完的你或多或少会有收获!

ThreadPoolExecutor field 的解释

在了解线程池之前,希望你已经了解了 Java内存模型 和 AQS CAS

     /**                    
     * The runState provides the main lifecycle control, taking on values:
     *
     *   RUNNING:  Accept new tasks and process queued tasks
     *   SHUTDOWN: Don"t accept new tasks, but process queued tasks
     *   STOP:     Don"t accept new tasks, don"t process queued tasks,
     *             and interrupt in-progress tasks
     *   TIDYING:  All tasks have terminated, workerCount is zero,
     *             the thread transitioning to state TIDYING
     *             will run the terminated() hook method
     *   TERMINATED: terminated() has completed
     *
     * The numerical order among these values matters, to allow
     * ordered comparisons. The runState monotonically increases over
     * time, but need not hit each state. The transitions are:
     *
     * RUNNING -> SHUTDOWN
     *    On invocation of shutdown(), perhaps implicitly in finalize()
     * (RUNNING or SHUTDOWN) -> STOP
     *    On invocation of shutdownNow()
     * SHUTDOWN -> TIDYING
     *    When both queue and pool are empty
     * STOP -> TIDYING
     *    When pool is empty
     * TIDYING -> TERMINATED
     *    When the terminated() hook method has completed terminated()
     */
    
    // 前 3 位表示运行状态,后面 29 位存储当前运行 workerCount
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32 - 3
    
    // 最大容量
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 00011111111111111111111111111111
   
    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY. 实际线程池大小还是由 CAPACITY 决定
     */
    private volatile int maximumPoolSize;
    
    // 以下为线程池的几个状态 官方注释在最上方
    // 接受新的任务
    private static final int RUNNING    = -1 << COUNT_BITS; // 11100000000000000000000000000000
    
    // 不接受新的任务,但是已在队列中的任务,还会继续处理
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 00000000000000000000000000000000
    
    // 不接受,不处理新的任务,且中断正在进行中的任务
    private static final int STOP       =  1 << COUNT_BITS; // 00100000000000000000000000000000
    
    // 所有任务已停止,workerCount 清零,注意 workerCount 是由 workerCountOf(int c) 计算得出的
    private static final int TIDYING    =  2 << COUNT_BITS; // 01000000000000000000000000000000
    
    // 所有任务已完成
    private static final int TERMINATED =  3 << COUNT_BITS; // 01100000000000000000000000000000
    
    // 线程池运行状态和已工作的 workerCount 初始化为 RUNNING 和 0
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    // 计算当前 state
    // ~CAPACITY 为 11100000000000000000000000000000 & c(假如前三位为 000 说明线程池已经 SHUTDOWN)
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    
    // 同时拿到 state workerCount
    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    // 可以计算出当前工作的 workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    
    // 线程入列
    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
       
            // 获得当前 state 和 workerCount
            // 判断是否满足加入核心线程
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                // 以核心线程的方式加入队列
                if (addWorker(command, true))
                    return;
                // 添加失败 获取最新的线程池 state 和 workerCount
                c = ctl.get();
            }
            // 在运行且成功加入队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 再检查一次,不在运行就拒绝任务
                if (!isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    // 加入一个 null
                    addWorker(null, false);
            }
            // 加入失败就拒绝任务
            else if (!addWorker(command, false))
                reject(command);
        }
    
    // 实际的操作
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                // 获得当前 state 和 workerCount
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 大于 SHUTDOWN 即 STOP TIDYING TERMINATED
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 计算 workerCount
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 成功了就退出
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        // 走到这一步说明 rs 为 RUNNING 或 SHUTDOWN 可以重新尝试加入
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 统一线程的名字
                // 设置 daemon 和 priority
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        // 异常检查
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    // 添加成功 启动线程
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 加入失败 
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
        
    // 加入失败 做一些扫尾清理
    private void addWorkerFailed(Worker w) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                if (w != null)
                    workers.remove(w);
                // workerCount-1
                decrementWorkerCount();
                // 尝试更新状态 何为尝试,即需要满足一定条件,而不是冒然去做某事
                tryTerminate();
            } finally {
                mainLock.unlock();
            }
        }
总结一下

写得好的源码,注释一定要好好看一遍

线程池的状态和工作线程数量用 32 位二进制数表示,然后通过二进制的位运算获取状态和数量,这种设计实在是太过精妙

膜拜大师

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74933.html

相关文章

  • 浅谈Java并发编程系列(六) —— 线程池的使用

    摘要:线程池的作用降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的资源浪费。而高位的部分,位表示线程池的状态。当线程池中的线程数达到后,就会把到达的任务放到中去线程池的最大长度。默认情况下,只有当线程池中的线程数大于时,才起作用。 线程池的作用 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的资源浪费。 提高响应速度。当任务到达时,不需要等到线程创建就能立即执行...

    Vicky 评论0 收藏0
  • ThreadPoolExecutor策略配置以及应用场景

    摘要:支持通过调整构造参数来配置不同的处理策略,本文主要介绍常用的策略配置方法以及应用场景。对于这种场景,我们可以设置使用带有长度限制的队列以及限定最大线程个数的线程池,同时通过设置处理任务被拒绝的情况。 ThreadPoolExecutor 是用来处理异步任务的一个接口,可以将其理解成为一个线程池和一个任务队列,提交到 ExecutorService 对象的任务会被放入任务队或者直接被线程...

    tuantuan 评论0 收藏0
  • (十七)java多线程之ThreadPoolExecutor

    摘要:本人邮箱欢迎转载转载请注明网址代码已经全部托管有需要的同学自行下载引言在之前的例子我们要创建多个线程处理一批任务的时候我是通过创建线程数组或者使用线程集合来管理的但是这样做不太好因为这些线程没有被重复利用所以这里要引入线程池今天我们就讲线程 本人邮箱: 欢迎转载,转载请注明网址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    wpw 评论0 收藏0
  • 使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解

    摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。 之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...

    Chiclaim 评论0 收藏0
  • 一看就懂的Java线程池分析详解

    摘要:任务性质不同的任务可以用不同规模的线程池分开处理。线程池在运行过程中已完成的任务数量。如等于线程池的最大大小,则表示线程池曾经满了。线程池的线程数量。获取活动的线程数。通过扩展线程池进行监控。框架包括线程池,,,,,,等。 Java线程池 [toc] 什么是线程池 线程池就是有N个子线程共同在运行的线程组合。 举个容易理解的例子:有个线程组合(即线程池,咱可以比喻为一个公司),里面有3...

    Yangder 评论0 收藏0

发表评论

0条评论

garfileo

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<