资讯专栏INFORMATION COLUMN

浅谈Java并发编程系列(六) —— 线程池的使用

Vicky / 2415人阅读

摘要:线程池的作用降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的资源浪费。而高位的部分,位表示线程池的状态。当线程池中的线程数达到后,就会把到达的任务放到中去线程池的最大长度。默认情况下,只有当线程池中的线程数大于时,才起作用。

线程池的作用

降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的资源浪费。

提高响应速度。当任务到达时,不需要等到线程创建就能立即执行。

方便管理线程。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以对线程进行统一的分配,优化及监控。

设置线程池大小

设置线程池的大小可以从以下几个方面分析入手:

系统中有多少个cpu?

多大的内存?

任务是计算密集型、I/O密集集还是二者皆可?

是否需要像JDBC连接这样的稀缺资源?

对于计算密集型的任务,在拥有N个cpu的机器上,通常将线程池大小设置为N+1时,能够实现最优的利用率。 对于包含I/O操作或者其他阻塞操作的任务,由于线程并不会一直执行,因此线程池的规模应该更大。
可通过如下公式进行估计:
$$N_{threads} = N_{cpu}*U_{cpu}*(1+frac{W}{C})$$
其中:
$$ U_{cpu} = target CPU utilization, 0 le U_{cpu} le 1$$
$$ frac{W}{C} = ration of wait time to compute time$$
可以通过Rumtime来获得CUP的数目:

int N_CPUS = Runtime.getRuntime().availableProcessor();

当然,CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。计算方法:计算每个任务对该资源的需求量,然后用该资源的可用总量除以每个任务的需要量,所得结果就是线程池的大小上限。

线程池的实现原理 ThreadPoolExecutor

Java的线程池针对不同应用的场景,主要有固定长度类型、可变长度类型以及定时执行等几种。针对这几种类型的创建,java中有一个专门的Executors类提供了一系列的方法封装了具体的实现。这些功能和用途不一样的线程池主要依赖ThreadPoolExecutor, ScheduledThreadPoolExecutor等几个类。如前面文章讨论所说,这些类和相关类的主要结构如下:

ThreadPoolExecutor是实现线程池最核心的类之一。在分析ThreadPoolExecutor的实现原理之前,让来看看实现线程池需要考虑的点:
从线程池本身的定义来看,它是将一组事先创建好的线程放在一个资源池里,当需要的时候就将该线程分配给具体的任务来执行。那么,这个池子的大小如何确定?线程池肯定要面临多个线程资源访问的情况,是不是本身的结构要保证线程安全呢?如果线程池创建好之后后续有若干任务使用了线程资源,当池里面的资源使用完之后要如何安排?是给线程扩容,创建更多的线程资源,还是增加一个队列,让一些任务先在里面排队呢?在一些极端的情况下,比如任务数量实在太多线程池处理不过来,对于这些任务怎么处理呢?线程执行的时候会碰到异常或都错误的情况,这些异常要如何处理?如何保证这些异常的处理不会导致线程池其他任务的正常运行不出错呢?

总结一下,这些问题可以归纳为如下几点:

线程池的结构;

线程池的任务分配策略;

线程池的异常和错误处理机制;

下面结合ThreadPoolExecutor的实现源码来详细分析一下。

线程数量和线程池状态
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是线程池的主要控制状态,它是一个AtomicInteger的数值,它表示了两部分内容:

workerCount, 表示有效线程数;

runState, 表示线程池状态,是否运行,停止等。

ctl是用一个integer(32)来包含线程池状态和数量的表示,高三位为线程池的状态,后(2^29)-1为线程数限制。 这就是为什么前面用一个Integer.SIZE-3来作为位数。这样这个整数的0-28位表示的就是线程的数目。而高位的部分,29-31位表示线程池的状态。这里定义的主要有5种状态,分别对应值是从-1到3. 他们对应着线程的running, shutdown, stop, tidying, terminated这几个状态。

线程池的结构

除了以上部分外,线程池里还有如下成员:

    private final BlockingQueue workQueue;
    
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * Set containing all worker threads in pool. Accessed only when holding mainLock.
     */
    private final HashSet workers = new HashSet();

    /**
     * Wait condition to support awaitTermination
     */
    private final Condition termination = mainLock.newCondition();

    /**
     * Tracks largest attained pool size. Accessed only under
     * mainLock.
     */
    private int largestPoolSize;

    /**
     * Counter for completed tasks. Updated only on termination of
     * worker threads. Accessed only under mainLock.
     */
    private long completedTaskCount;

    private volatile ThreadFactory threadFactory;

    /**
     * Handler called when saturated or shutdown in execute.
     */
    private volatile RejectedExecutionHandler handler;

    /**
     * Timeout in nanoseconds for idle threads waiting for work.
     * Threads use this timeout when there are more than corePoolSize
     * present or if allowCoreThreadTimeOut. Otherwise they wait
     * forever for new work.
     */
    private volatile long keepAliveTime;

    /**
     * If false (default), core threads stay alive even when idle.
     * If true, core threads use keepAliveTime to time out waiting
     * for work.
     */
    private volatile boolean allowCoreThreadTimeOut;

    /**
     * Core pool size is the minimum number of workers to keep alive
     * (and not allow to time out etc) unless allowCoreThreadTimeOut
     * is set, in which case the minimum is zero.
     */
    private volatile int corePoolSize;

    /**
     * Maximum pool size. Note that the actual maximum is internally
     * bounded by CAPACITY.
     */
    private volatile int maximumPoolSize;

workerQueue: 一个BlockingQueue队列,本身的结构可以保证访问的线程安全。相当于一个排队等待队列。当线程池的线程数达到corePoolSize的时候,一些需要等待执行的线程就放到这个队列里等待。
worker: 一个HashSet 集合。线程池里所有可以立即执行的线程都放在这个集合里。
mainLock: 一个访问workers所需要使用的锁。从前面的workQueue,workers这两个结构可以看出,如果要往线程池里面增加执行任务或者执行完毕一个任务,都要访问这两个结构。所以大多数情况下为了保证线程安全,就需要使用mainLock这个锁。
corePoolSize:处于活跃状态的最少worker数目。每个worker会创建一个新的线程去执行任务。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务来才创建线程去执行,除非调用了prestartAllCoreThreads()或prestartCoreThread()方法。当线程池中的线程数达到corePoolSize后,就会把到达的任务放到workerQueue中去;
maximumPoolSize: 线程池的最大长度。当线程池里面的线程数达到这个数字时就不能再往里面加了,此时会根据设置的handler参数,即拒绝处理任务策略来处理新到来的任务。
keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才起作用。当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程数为0。
threadFactory: 线程工厂,主要用来创建线程;
largestPoolSize: 用来记录线程池中曾经有过的最大线程数,跟线程池的容易没有任何关系。
handler: 表示当拒绝处理任务时的策略,有以四种取值:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 
任务的执行者——Worker
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66069.html

相关文章

  • 我的阿里之路+Java面经考点

    摘要:我的是忙碌的一年,从年初备战实习春招,年三十都在死磕源码,三月份经历了阿里五次面试,四月顺利收到实习。因为我心理很清楚,我的目标是阿里。所以在收到阿里之后的那晚,我重新规划了接下来的学习计划,将我的短期目标更新成拿下阿里转正。 我的2017是忙碌的一年,从年初备战实习春招,年三十都在死磕JDK源码,三月份经历了阿里五次面试,四月顺利收到实习offer。然后五月怀着忐忑的心情开始了蚂蚁金...

    姘搁『 评论0 收藏0
  • 浅谈Java并发编程系列(一)—— 如何保证线程安全

    摘要:比如需要用多线程或分布式集群统计一堆用户的相关统计值,由于用户的统计值是共享数据,因此需要保证线程安全。如果类是无状态的,那它永远是线程安全的。参考探索并发编程二写线程安全的代码 线程安全类 保证类线程安全的措施: 不共享线程间的变量; 设置属性变量为不可变变量; 每个共享的可变变量都使用一个确定的锁保护; 保证线程安全的思路: 1. 通过架构设计 通过上层的架构设计和业务分析来避...

    mylxsw 评论0 收藏0
  • 浅谈Java并发编程系列(四)—— 原子性、可见性与有序性

    摘要:内存模型是围绕着在并发过程中如何处理原子性可见性和有序性这个特征来建立的,我们来看下哪些操作实现了这个特性。可见性可见性是指当一个线程修改了共享变量的值,其他线程能够立即得知这个修改。 Java内存模型是围绕着在并发过程中如何处理原子性、可见性和有序性这3个特征来建立的,我们来看下哪些操作实现了这3个特性。 原子性(atomicity): 由Java内存模型来直接保证原子性变量操作包括...

    tianren124 评论0 收藏0
  • 浅谈Java并发编程系列(二)—— Java内存模型

    摘要:物理计算机并发问题在介绍内存模型之前,先简单了解下物理计算机中的并发问题。基于高速缓存的存储交互引入一个新的问题缓存一致性。写入作用于主内存变量,把操作从工作内存中得到的变量值放入主内存的变量中。 物理计算机并发问题 在介绍Java内存模型之前,先简单了解下物理计算机中的并发问题。由于处理器的与存储设置的运算速度有几个数量级的差距,所以现代计算机加入一层读写速度尽可能接近处理器的高速缓...

    Edison 评论0 收藏0
  • 浅谈Java并发编程系列(三)—— volatile型变量

    摘要:禁止指令重排序优化。只有当线程对变量执行的前一个动作是时,才能对执行动作并且,只有当对变量执行的后一个动作是时,线程才能对变量执行动作。变量不需要与其他的状态变量共同参与不变约束。在某些情况下,的同步机制性要优于锁。 当一个变量定义为volatile之后,它具备两种特性: 保证此变量对所有线程的可见性,这里的可见性是指当一条线程修改了这个变量的值,新值对于其他线程来说是可以立即得知的...

    zxhaaa 评论0 收藏0

发表评论

0条评论

Vicky

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<