资讯专栏INFORMATION COLUMN

Java中的线程池

tomato / 1155人阅读

摘要:中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。代码中如果执行了方法,线程池会提前创建并启动所有核心线程。线程池最大数量线程池允许创建的线程最大数量。被称为是可重用固定线程数的线程池。

Java中的线程池运用场景非常广泛,几乎所有的一步或者并发执行程序都可以使用。那么线程池有什么好处呢,以及他的实现原理是怎么样的呢?

使用线程池的好处

在开发过程中,合理的使用线程池能够带来以下的一些优势,这些优势同时也是一些其他池化的优势,例如数据库连接池,http连接池等。

降低资源消耗,通过重复利用已经创建的线程降低线程创建和销毁造成的消耗。

提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。

提高线程的可管理性 线程的比较稀缺的资源,如果无限的创建,不仅会消耗系统资源,还会降低系统的稳定性。

线程池实现原理

我们创建线程池完成之后,当把一个任务提交给线程池处理的时候,线程池的处理流程如下:
1)线程池判断核心线程池的任务是否都在执行任务,如果不是,则创建一个新的线程来执行任务,如何核心线程池的线程都在执行任务,则进入下一个流程
2)线程池判断工作队列是否已满,如果工作队列没有满,那么新提交的任务将会存储在工作队列中,如果工作队列满了,那会进入到下一个流程
3)线程池判断线程池中的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,那么使用饱和策略来处理这个任务

线程池使用
创建线程池

创建线程池可以使用java中已经内置的一些默认配置的线程池,也可以使用ThreadPoolExecutor来自己配置参数来创建线程池,阿里代码规约中推荐后者来创建线程池,这样创建的线程池更加符合业务的实际需求。
下面是我创建线程池的一个例子:

ExecutorService executor = new ThreadPoolExecutor(2, 5,  30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy());

 public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
         Executors.defaultThreadFactory(), handler);
}

在上面的这段代码里new ThreadPoolExecutor中一共有6个参数,我们来解析一下这些参数的意义,这样下次创建线程池的时候能够更加灵活的使用。

corePoolSize (核心线程的数量):任务提交到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的核心线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池核心数大小时就不再创建。代码中如果执行了prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。

maximumPoolSize (线程池最大数量):线程池允许创建的线程最大数量。如果队列满了,并且创建的线程数小于最大线程数,那么线程池会再创建新的线程执行任务,如果使用无界队列,那么这个参数设置了意义不大。

keepAliveTime (线程保持活动时间):线程池的工作线程空闲后,保持存活的时间,如果任务很多,切任务执行时间较长,这个值可以设置的大一点。

TimeUnit (上面的那个时间的单位)

BlockingQueue workQueue (任务队列):用于保存等待执行的任务的阻塞队列。可以选择的有以下:

ArrayBlockingQueue:基于数组的有界阻塞队列 FIFO

LinkedBlockingQueue:基于链表的有界阻塞队列 FIFO 吞吐量高于ArrayBlockingQueue, FixedThreadPool基于这个实现

SynchronousQueue: 不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入处于阻塞状态,吞吐量高于LinkedBlockingQueue,静态方法:Executors.newCachedThreadPool使用了这个队列

PriorityBlockingQueue 具有优先级的无界阻塞队列

handler (饱和策略):当队列和线程池都满了,说明线程池已经处于饱和状态,那么必须采取一种策略处理新提交的任务,目前提供四种策略

AbortPolicy:直接抛出异常

CallerRunsPolicy: 只用调用者所在线程来运行任务

DiscardOldestPolicy :丢弃队列里最近的一个任务,执行当前任务

DiscardPolicy : 不处理丢弃掉

也可以实现RejectedExecutionHandler接口自定义处理方式。

向线程池提交任务

线程池创建完成之后,就可以处理提交的任务,任务如何提交到线程池呢,线程池提供了两个方法:submit()和execute()方法。

 public static void main(String[] args) {
    //提交没有返回结果的任务
    EXECUTOR_SERVICE.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello world");
        }
    });
    Future future = EXECUTOR_SERVICE.submit(new Runnable() {
        @Override
        public void run() {
            System.out.println("hello world1");
        }
    });
    //提交有返回结果的任务,返回一个future的对象,调用get方法获取返回值,阻塞直到返回
    Future future1 = EXECUTOR_SERVICE.submit(new Callable() {
        @Override
        public Object call() throws Exception {
            System.out.println("hello world1");
            return "succ";
        }
    });
    try {
        System.out.println(future.get());
        System.out.println(future1.get());
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    EXECUTOR_SERVICE.shutdownNow();
}
关闭线程池

Java对ExecutorService关闭方式有两种,一是调用shutdown()方法,二是调用shutdownNow()方法。
这两个方法虽然都可以关闭线程池,但是有一些区别

shutdown()

1、调用之后不允许继续往线程池内继续添加线程,如果继续添加会出现RejectedExecutionException异常;
2、线程池的状态变为SHUTDOWN状态;
3、所有在调用shutdown()方法之前提交到ExecutorSrvice的任务都会执行;
4、一旦所有线程结束执行当前任务,ExecutorService才会真正关闭。

shutdownNow()

1、该方法返回尚未执行的 task 的 List;
2、线程池的状态变为STOP状态;
3、阻止所有正在等待启动的任务, 并且停止当前正在执行的任务;

最好的关闭线程池的方法:

package multithread.threadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * @author pangjianfei
 * @Date 2019/6/21
 * @desc 测试线程池
 */
public class TestThreadPoolExecutor {

    static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        EXECUTOR_SERVICE.execute( ()-> System.out.println("hello world"));
        Future future = EXECUTOR_SERVICE.submit(() -> System.out.println("hello world1"));
        Future future1 = EXECUTOR_SERVICE.submit(() -> {System.out.println("hello world1");return "succ";});
        Future future2 = EXECUTOR_SERVICE.submit(() -> {Thread.sleep(5000);System.out.println("hello world2");return "线程执行成功了";});
        try {
            System.out.println(future.get());
            System.out.println(future1.get());
            System.out.println(future2.get());
        } catch (Exception e) {
            e.printStackTrace();
        }
        Runnable run = () -> {
            long sum = 0;
            boolean flag = true;
            while (flag && !Thread.currentThread().isInterrupted()) {
                sum += 1;
                if (sum == Long.MAX_VALUE) {
                    flag = false;
                }
            }
        };
        EXECUTOR_SERVICE.execute(run);
        //先调用shutdown()使线程池状态改变为SHUTDOWN
        EXECUTOR_SERVICE.shutdown();
        try {
            //2s后检测线程池内的线程是否执行完毕
            if (!EXECUTOR_SERVICE.awaitTermination(2, TimeUnit.SECONDS)) {
                //内部实际通过interrupt来终止线程,所以当调用shutdownNow()时,isInterrupted()会返回true。
                EXECUTOR_SERVICE.shutdownNow();
            }
        } catch (InterruptedException e) {
            EXECUTOR_SERVICE.shutdownNow();
        }
    }
}
合理配置线程池

一般说来,大家认为线程池的大小经验值应该这样设置:(其中N为CPU的个数)

如果是CPU密集型应用,则线程池大小设置为N+1
如果是IO密集型应用,则线程池大小设置为2N+1
IO优化中,更加合理的方式是:最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)* CPU数目

linux查看CPU的数量:

# 查看物理CPU个数
cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l

# 查看每个物理CPU中core的个数(即核数)
cat /proc/cpuinfo| grep "cpu cores"| uniq

# 查看逻辑CPU的个数
cat /proc/cpuinfo| grep "processor"| wc -l
线程池的监控

基于下面的方法可以对线程池中的一些数据进行监控:

//当前排队线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getQueue().size();
//当前活动的线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getActiveCount();
//执行完成的线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getCompletedTaskCount();
//总线程数
((ThreadPoolExecutor)EXECUTOR_SERVICE).getTaskCount();
Executor框架
Executor框架的调度模型

Executor框架的调度模型是一种二级调度模型。
在HotSpot VM 的线程模型中,Java线程会被一对一的映射为本地操作系统的线程。Java线程的启动与销毁都与本地线程同步。操作系统会调度所有线程并将它们分配给可用的CPU。
在上层,我们通常会创建任务,然后将任务提交到调度器,调度器(Executor框架)将这些任务映射为对应数量的线程;底层,操作系统会将这些线程映射到硬件处理器上,这里不是由应用程序控制。模型图如下:

Executor框架的结构和成员

Executor框架主要包括三部分:

任务: 定义的被执行的任务需要实现两个接口之一:Runable、Callable;

任务的执行: 包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor 和 ScheduledThreadPoolExecutor、ForkJoinPool;

任务的异步计算结果: 包括Future接口和实现Future接口的FutureTask类、ForkJoinTask类。

Executor主要包含的类和接口如下:
Executor是一个接口,他是Executor框架的基础,他将任务的提交和执行分离。
ThreadPoolExecutor是线程池的核心类,用来执行被提交的任务
ScheduleThreadPoolExecutor是一个实现类,可以在给定的延迟后执行命令,或者定期执行命令,比Timer更加灵活。
Future接口和实现Future接口的FutureTask类,代表异步计算的结果

ThreadPoolExecutor详解

Executor框架最核心的类就是ThreadPoolExecutor,他是线程池的实现类,我们在上面看到过创建线程池的例子
ExecutorService executor = new ThreadPoolExecutor(2, 5, 30 , TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.AbortPolicy()); 里面包含了几个核心的参数,通过Executor框架的工具类Executors,可以直接创建3中类型的线程池。

FixedThreadPool

FixedThreadPool被称为是可重用固定线程数的线程池。为什么会这么说,我们看一下他的实现源码:

public static ExecutorService newFixedThreadPool(int nThreads) {
    /**
     * 核心线程数和最大线程数相同,线程等待时间为0表示多余的空闲线程会被立刻终止,LinkedBlockingQueue默认容量是Integer.MAX_VALUE,基本类似于无界队列
     */
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue());
}
SingleThreadExecutor

SingleThreadExecutor是单个Worker线程的Executor.他的实现源码如下:

/**
 * 可以看到核心线程数和最大线程数都是1,相当于是创建一个单线程顺序的执行队列中的任务
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue()));
}
CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池,源码如下:

/**
 * 核心线程数为0,最大线程数为Integer.MAX_VALUE,线程存活时间为1分钟,而且是使用SynchronousQueue没有容量的队列,这种情况下任务提交快的是时候,会创建大量的线程,耗尽CPU的资源
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue());
}
ScheduledThreadPoolExecutor详解

ScheduledThreadPoolExecutor用来在给定的延迟之后运行任务,或者定期执行任务,他比Timer更加灵活,Timer是创建一个后台线程,而这个线程池可以在构造函数中指定多个对应的后台线程。源码如下:

/**
 * 这里使用的是无界的队列,如果核心线程池已满,那么任务会一直提交到队列
 */
public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

ScheduledThreadPoolExecutor的执行主要包含两部分

/**参数的作用:1:执行的线程  2、初始化延时  3、两次开始执行最小时间间隔  4、计时单位*/
/**scheduleAtFixedRate()是按照指定频率执行一个任务,就是前一个任务没有执行完成,但是下次执行时间到,仍然会启动线程执行新的任务*/
scheduledExecutorService.scheduleAtFixedRate(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS);
/**参数作用同上*/
/**scheduleWithFixedDelay()是按照指定频率间隔执行,本次执行结果之后,在延时10s执行下一次的任务*/
scheduledExecutorService.scheduleWithFixedDelay(()-> System.out.println("hello"), 10, 10, TimeUnit.SECONDS);

上面的两种方法都是向ScheduledThreadPoolExecutor的DelayQueue中添加了一个实现了RunnableScheduleFuture接口的ScheduledTask.

 public ScheduledFuture scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    //这里对commond进行了封装
    ScheduledFutureTask sft =
        new ScheduledFutureTask(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

DelayQueue是一个支持优先级的队列,这里默认会按照执行时间进行排序,time小的排在前面。线程在获取到期需要执行的ScheduledTutureTask之后,执行这个任务,执行完成之后会修改time,将其变为下次要执行的时间,修改之后放回到DelayQueue中。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/77839.html

相关文章

  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • Java线程

    摘要:中的线程池是运用场景最多的并发框架。才是真正的线程池。存放任务的队列存放需要被线程池执行的线程队列。所以线程池的所有任务完成后,它最终会收缩到的大小。饱和策略一般情况下,线程池采用的是,表示无法处理新任务时抛出异常。 Java线程池 1. 简介 系统启动一个新线程的成本是比较高的,因为它涉及与操作系统的交互,这个时候使用线程池可以提升性能,尤其是需要创建大量声明周期很短暂的线程时。Ja...

    jerry 评论0 收藏0
  • 一看就懂的Java线程分析详解

    摘要:任务性质不同的任务可以用不同规模的线程池分开处理。线程池在运行过程中已完成的任务数量。如等于线程池的最大大小,则表示线程池曾经满了。线程池的线程数量。获取活动的线程数。通过扩展线程池进行监控。框架包括线程池,,,,,,等。 Java线程池 [toc] 什么是线程池 线程池就是有N个子线程共同在运行的线程组合。 举个容易理解的例子:有个线程组合(即线程池,咱可以比喻为一个公司),里面有3...

    Yangder 评论0 收藏0
  • Java线程的工作原理,好处和注意事项

    摘要:线程池的工作原理一个线程池管理了一组工作线程,同时它还包括了一个用于放置等待执行任务的任务队列阻塞队列。使用线程池可以对线程进行统一的分配和监控。线程池的注意事项虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。 线程池的工作原理一个线程池管理了一组工作线程, 同时它还包括了一个用于放置等待执行 任务的任务队列(阻塞队列) 。 一个线程池管理了一组工作线程, 同时它还...

    ZweiZhao 评论0 收藏0
  • 跟着阿里p7一起学java高并发 - 第18天:玩转java线程,这一篇就够了

    摘要:高并发系列第篇文章。简单的说,在使用了线程池之后,创建线程变成了从线程池中获取一个空闲的线程,然后使用,关闭线程变成了将线程归还到线程池。如果调用了线程池的方法,线程池会提前把核心线程都创造好,并启动线程池允许创建的最大线程数。 java高并发系列第18篇文章。 本文主要内容 什么是线程池 线程池实现原理 线程池中常见的各种队列 自定义线程创建的工厂 常见的饱和策略 自定义饱和策略 ...

    AdolphLWQ 评论0 收藏0

发表评论

0条评论

tomato

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<