资讯专栏INFORMATION COLUMN

使用 Executors,ThreadPoolExecutor,创建线程池,源码分析理解

Chiclaim / 2350人阅读

摘要:源码分析创建可缓冲的线程池。源码分析使用创建线程池源码分析的构造函数构造函数参数核心线程数大小,当线程数,会创建线程执行最大线程数,当线程数的时候,会把放入中保持存活时间,当线程数大于的空闲线程能保持的最大时间。

之前创建线程的时候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThreadExecutor 这四个方法。
当然 Executors 也是用不同的参数去 new ThreadPoolExecutor 实现的,本文先分析前四种线程创建方式,后在分析 new ThreadPoolExecutor 创建方式

使用 Executors 创建线程池 1.newFixedThreadPool()

由于使用了LinkedBlockingQueue所以maximumPoolSize没用,当corePoolSize满了之后就加入到LinkedBlockingQueue队列中。
每当某个线程执行完成之后就从LinkedBlockingQueue队列中取一个。
所以这个是创建固定大小的线程池。

源码分析

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue());
}
2.newSingleThreadPool()

创建线程数为1的线程池,由于使用了LinkedBlockingQueue所以maximumPoolSize 没用,corePoolSize为1表示线程数大小为1,满了就放入队列中,执行完了就从队列取一个。

源码分析

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService
            (
                    new ThreadPoolExecutor(
                            1,
                            1,
                            0L,
                            TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue())
            );
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}
3.newCachedThreadPool()

创建可缓冲的线程池。没有大小限制。由于corePoolSize为0所以任务会放入SynchronousQueue队列中,SynchronousQueue只能存放大小为1,所以会立刻新起线程,由于maxumumPoolSizeInteger.MAX_VALUE所以可以认为大小为2147483647。受内存大小限制。

源码分析

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
            0,
            Integer.MAX_VALUE,
            60L,
            TimeUnit.SECONDS,
            new SynchronousQueue());
}

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}
使用 ThreadPoolExecutor 创建线程池

源码分析 ,ThreadPoolExecutor 的构造函数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
构造函数参数

1、corePoolSize 核心线程数大小,当线程数 < corePoolSize ,会创建线程执行 runnable

2、maximumPoolSize 最大线程数, 当线程数 >= corePoolSize的时候,会把 runnable 放入 workQueue中

3、keepAliveTime 保持存活时间,当线程数大于corePoolSize的空闲线程能保持的最大时间。

4、unit 时间单位

5、workQueue 保存任务的阻塞队列

6、threadFactory 创建线程的工厂

7、handler 拒绝策略

任务执行顺序

1、当线程数小于 corePoolSize时,创建线程执行任务。

2、当线程数大于等于 corePoolSize并且 workQueue 没有满时,放入workQueue

3、线程数大于等于 corePoolSize并且当 workQueue 满时,新任务新建线程运行,线程总数要小于 maximumPoolSize

4、当线程总数等于 maximumPoolSize 并且 workQueue 满了的时候执行 handlerrejectedExecution。也就是拒绝策略。

JDK7提供了7个阻塞队列。(也属于并发容器)

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。

LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。

PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。

DelayQueue:一个使用优先级队列实现的无界阻塞队列。

SynchronousQueue:一个不存储元素的阻塞队列。

LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。

LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

什么是阻塞队列?

阻塞队列是一个在队列基础上又支持了两个附加操作的队列。

2个附加操作:

支持阻塞的插入方法:队列满时,队列会阻塞插入元素的线程,直到队列不满。
支持阻塞的移除方法:队列空时,获取元素的线程会等待队列变为非空。

阻塞队列的应用场景

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。简而言之,阻塞队列是生产者用来存放元素、消费者获取元素的容器。

几个方法

在阻塞队列不可用的时候,上述2个附加操作提供了四种处理方法

方法处理方式 抛出异常 返回特殊值 一直阻塞 超时退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
检查方法 element() peek() 不可用 不可用
JAVA里的阻塞队列

JDK 7 提供了7个阻塞队列,如下

1、ArrayBlockingQueue 数组结构组成的有界阻塞队列。

此队列按照先进先出(FIFO)的原则对元素进行排序,但是默认情况下不保证线程公平的访问队列,即如果队列满了,那么被阻塞在外面的线程对队列访问的顺序是不能保证线程公平(即先阻塞,先插入)的。

2、LinkedBlockingQueue一个由链表结构组成的有界阻塞队列

此队列按照先出先进的原则对元素进行排序

3、PriorityBlockingQueue支持优先级的无界阻塞队列

4、DelayQueue支持延时获取元素的无界阻塞队列,即可以指定多久才能从队列中获取当前元素

5、SynchronousQueue不存储元素的阻塞队列,每一个put必须等待一个take操作,否则不能继续添加元素。并且他支持公平访问队列。

6、LinkedTransferQueue由链表结构组成的无界阻塞TransferQueue队列。相对于其他阻塞队列,多了tryTransfer和transfer方法

transfer方法

如果当前有消费者正在等待接收元素(take或者待时间限制的poll方法),transfer可以把生产者传入的元素立刻传给消费者。如果没有消费者等待接收元素,则将元素放在队列的tail节点,并等到该元素被消费者消费了才返回。

tryTransfer方法

用来试探生产者传入的元素能否直接传给消费者。,如果没有消费者在等待,则返回false。和上述方法的区别是该方法无论消费者是否接收,方法立即返回。而transfer方法是必须等到消费者消费了才返回。

7、LinkedBlockingDeque链表结构的双向阻塞队列,优势在于多线程入队时,减少一半的竞争。

四个拒绝策略

ThreadPoolExecutor默认有四个拒绝策略:

1、ThreadPoolExecutor.AbortPolicy() 直接抛出异常RejectedExecutionException

2、ThreadPoolExecutor.CallerRunsPolicy() 直接调用run方法并且阻塞执行

3、ThreadPoolExecutor.DiscardPolicy() 直接丢弃后来的任务

4、ThreadPoolExecutor.DiscardOldestPolicy() 丢弃在队列中队首的任务

当然可以自己继承RejectedExecutionHandler来写拒绝策略.

TestThreadPoolExecutor 示例 TestThreadPoolExecutor.java
package io.ymq.thread.TestThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:39
 **/
public class TestThreadPoolExecutor {
    public static void main(String[] args) {

        long currentTimeMillis = System.currentTimeMillis();

        // 构造一个线程池
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
                TimeUnit.SECONDS, new ArrayBlockingQueue(3)
        );

        for (int i = 1; i <= 10; i++) {
            try {
                String task = "task=" + i;
                System.out.println("创建任务并提交到线程池中:" + task);
                threadPool.execute(new ThreadPoolTask(task));

                Thread.sleep(100);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }

        try {
            //等待所有线程执行完毕当前任务。
            threadPool.shutdown();

            boolean loop = true;
            do {
                //等待所有线程执行完毕当前任务结束
                loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
            } while (loop);

            if (loop != true) {
                System.out.println("所有线程执行完毕");
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            System.out.println("耗时:" + (System.currentTimeMillis() - currentTimeMillis));
        }


    }
}
ThreadPoolTask.java
package io.ymq.thread.TestThreadPoolExecutor;

import java.io.Serializable;

/**
 * 描述:
 *
 * @author yanpenglei
 * @create 2017-10-12 15:40
 **/
public class ThreadPoolTask implements Runnable, Serializable {

    private Object attachData;

    ThreadPoolTask(Object tasks) {
        this.attachData = tasks;
    }

    public void run() {

        try {

            System.out.println("开始执行任务:" + attachData + "任务,使用的线程池,线程名称:" + Thread.currentThread().getName());

            System.out.println();

        } catch (Exception e) {
            e.printStackTrace();
        }
        attachData = null;
    }

}

遇到java.util.concurrent.RejectedExecutionException

第一

你的线程池 ThreadPoolExecutor 显示的 shutdown() 之后,再向线程池提交任务的时候。 如果你配置的拒绝策略是 AbortPolicy 的话,这个异常就会抛出来。

第二

当你设置的任务缓存队列过小的时候,或者说, 你的线程池里面所有的线程都在干活(线程数== maxPoolSize),并且你的任务缓存队列也已经充满了等待的队列, 这个时候,你再向它提交任务,则会抛出这个异常。

响应

可以看到线程 pool-1-thread-1 到5 循环使用

创建任务并提交到线程池中:task=1
开始执行任务:task=1任务,使用的线程池,线程名称:pool-1-thread-1

创建任务并提交到线程池中:task=2
开始执行任务:task=2任务,使用的线程池,线程名称:pool-1-thread-2

创建任务并提交到线程池中:task=3
开始执行任务:task=3任务,使用的线程池,线程名称:pool-1-thread-3

创建任务并提交到线程池中:task=4
开始执行任务:task=4任务,使用的线程池,线程名称:pool-1-thread-4

创建任务并提交到线程池中:task=5
开始执行任务:task=5任务,使用的线程池,线程名称:pool-1-thread-5

创建任务并提交到线程池中:task=6
开始执行任务:task=6任务,使用的线程池,线程名称:pool-1-thread-1

创建任务并提交到线程池中:task=7
开始执行任务:task=7任务,使用的线程池,线程名称:pool-1-thread-2

创建任务并提交到线程池中:task=8
开始执行任务:task=8任务,使用的线程池,线程名称:pool-1-thread-3

创建任务并提交到线程池中:task=9
开始执行任务:task=9任务,使用的线程池,线程名称:pool-1-thread-4

创建任务并提交到线程池中:task=10
开始执行任务:task=10任务,使用的线程池,线程名称:pool-1-thread-5

所有线程执行完毕
耗时:1015
测试代码

github https://github.com/souyunku/ymq-example/tree/master/ymq-thread

Contact

作者:鹏磊

出处:http://www.ymq.io

Email:admin@souyunku.com

版权归作者所有,转载请注明出处

Wechat:关注公众号,搜云库,专注于开发技术的研究与知识分享

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67766.html

相关文章

  • 线程源码分析

    摘要:线程池的作用线程池能有效的处理多个线程的并发问题,避免大量的线程因为互相强占系统资源导致阻塞现象,能够有效的降低频繁创建和销毁线程对性能所带来的开销。固定的线程数由系统资源设置。线程池的排队策略与有关。线程池的状态值分别是。 线程池的作用 线程池能有效的处理多个线程的并发问题,避免大量的线程因为互相强占系统资源导致阻塞现象,能够有效的降低频繁创建和销毁线程对性能所带来的开销。 线程池的...

    enda 评论0 收藏0
  • 线程,这一篇或许就够了

    摘要:创建方法最大线程数即源码单线程化的线程池有且仅有一个工作线程执行任务所有任务按照指定顺序执行,即遵循队列的入队出队规则创建方法源码还有一个结合了和,就不介绍了,基本不用。 *本篇文章已授权微信公众号 guolin_blog (郭霖)独家发布 为什么用线程池 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率 >例如: > >记创建线程消耗时间T1,执行...

    UsherChen 评论0 收藏0
  • Java线程简单总结

    摘要:本文主要内容为简单总结中线程池的相关信息。方法簇方法簇用于创建固定线程数的线程池。三种常见线程池的对比上文总结了工具类创建常见线程池的方法,现对三种线程池区别进行比较。 概述 线程可认为是操作系统可调度的最小的程序执行序列,一般作为进程的组成部分,同一进程中多个线程可共享该进程的资源(如内存等)。在单核处理器架构下,操作系统一般使用分时的方式实现多线程;在多核处理器架构下,多个线程能够...

    CoorChice 评论0 收藏0
  • Java ThreadPoolExecutor 线程源码分析

    摘要:线程池常见实现线程池一般包含三个主要部分调度器决定由哪个线程来执行任务执行任务所能够的最大耗时等线程队列存放并管理着一系列线程这些线程都处于阻塞状态或休眠状态任务队列存放着用户提交的需要被执行的任务一般任务的执行的即先提交的任务先被执行调度 线程池常见实现 线程池一般包含三个主要部分: 调度器: 决定由哪个线程来执行任务, 执行任务所能够的最大耗时等 线程队列: 存放并管理着一系列线...

    greatwhole 评论0 收藏0
  • 从0到1玩转线程

    摘要:提交任务当创建了一个线程池之后我们就可以将任务提交到线程池中执行了。提交任务到线程池中相当简单,我们只要把原来传入类构造器的对象传入线程池的方法或者方法就可以了。 我们一般不会选择直接使用线程类Thread进行多线程编程,而是使用更方便的线程池来进行任务的调度和管理。线程池就像共享单车,我们只要在我们有需要的时候去获取就可以了。甚至可以说线程池更棒,我们只需要把任务提交给它,它就会在合...

    darkerXi 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<