资讯专栏INFORMATION COLUMN

聊聊并发(五)——线程池

xiaochao / 1977人阅读

摘要:线程池一种线程使用模式。线程池不仅能够保证内核的充分利用,还能防止过分调度。相关起提供了线程池相关顶级接口,及子接口和工具类。线程池的最大线程数,要大于。可扩容创建一个可根据需要线程数,创建新的线程的线程池。

一、概述

1、介绍

  在使用线程时,需要new一个,用完了又要销毁,这样频繁的创建和销毁很耗资源,所以就提供了线程池。道理和连接池差不多,连接池是为了避免频繁的创建和释放连接,所以在连
接池中就有一定数量的连接,要用时从连接池拿出,用完归还给连接池,线程池也一样。
  线程池:一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多
个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

  脑图:https://www.processon.com/view/link/61849ba4f346fb2ecc4546e5

2、简单使用

  线程池用法很简单, 分为三步。首先用工具类Executors创建线程池,然后给线程池分配任务,最后关闭线程池就行了。

 1 public class ThreadPoolTest { 2     public static void main(String[] args) throws Exception { 3  4         // 1.创建一个 10 个线程数的线程池 5         ExecutorService service = Executors.newFixedThreadPool(10); 6  7         // 2.执行一个Runnable 8         service.execute(new Number1()); 9 10         // 2.提交一个Callable11         Future future = service.submit(new Number2());12         Integer integer = future.get();13         System.out.println("result = " + integer);14 15         // 关闭线程池16         service.shutdown();17     }18 }19 20 class Number1 implements Runnable {21 22     @Override23     public void run() {24         System.out.println("----打印Runnable----");25     }26 }27 28 // 10以内数求和29 class Number2 implements Callable {30     private int sum = 0;31 32     @Override33     public Integer call() {34         for (int i = 0; i <= 10; i++) {35             if (i % 2 == 0) {36                 sum += i;37             }38         }39         return sum;40     }41 }

  注意:线程用完,要关闭线程池,否则程序依然在运行中

3、相关API

  JDK 5.0 起提供了线程池相关API:顶级接口Executor,及子接口 ExecutorService 和工具类Executors。

  JUC包描述:图片来源API文档

  Executors:工具类,线程池的工厂类,用于创建并返回不同类型的线程池。

 1 // 一池N线程:创建一个固定(可重用)线程数的线程池。 2 ExecutorService Executors.newFixedThreadPool(int nThreads) 3  4 // 一池一线程:创建一个只有一个线程的线程池。 5 ExecutorService Executors.newSingleThreadExecutor() 6  7 // 可扩容:创建一个可根据需要线程数,创建新的线程的线程池。 8 ExecutorService Executors.newCachedThreadPool() 9 10 // 可用于调度:创建一个线程池,它可安排在给定延迟后运行命令或者定期的执行。11 ScheduledExecutorService Executors.newScheduledThreadPool(int corePoolSize)

  ExecutorService:

 1 // 执行任务/命令,没有返回值,一般用来执行Runnable 2 void execute(Runnable command) 3  4 // 可用于提交一个Runnable,但没有返回值 5 Future submit(Runnable task) 6  7 // 执行任务,有返回值,一般用来执行Callable 8  Future submit(Callable task) 9 10 // 关闭连接池11 void shutdown()

4、使用举例

  代码示例:创建固定 5 个线程的线程池为 10 个任务服务。

 1 public class ThreadPoolTest { 2     public static void main(String[] args) { 3         // 1.创建一个 10 个线程数的线程池 4         ExecutorService service = Executors.newFixedThreadPool(5); 5  6         try { 7             for (int i = 0; i < 10; i++) { 8                 int finalI = i; 9                 service.execute(() -> {10 11                     System.out.println(Thread.currentThread().getName() + " 为客户 " + finalI + " 办理业务~");12 13 //                    try {14 //                        Thread.sleep(1000_000);15 //                    } catch (InterruptedException e) {16 //                        e.printStackTrace();17 //                    }18 19                 });20             }21         } finally {22             service.shutdown();23         }24     }25 }26 27 // 可能的一种结果28 pool-1-thread-1 为客户 0 办理业务~29 pool-1-thread-2 为客户 1 办理业务~30 pool-1-thread-2 为客户 6 办理业务~31 pool-1-thread-2 为客户 7 办理业务~32 pool-1-thread-2 为客户 8 办理业务~33 pool-1-thread-1 为客户 5 办理业务~34 pool-1-thread-2 为客户 9 办理业务~35 pool-1-thread-3 为客户 2 办理业务~36 pool-1-thread-4 为客户 3 办理业务~37 pool-1-thread-5 为客户 4 办理业务~

  可以看到,银行 5 个窗口为 10 个客户相继服务。若前面服务时间长(打开注释),线程池便没有新的线程来执行任务了。程序会陷入等待中。
  代码示例:创建单个线程的线程池为 10 个线程服务。代码同上,只修改:

1 ExecutorService service = Executors.newSingleThreadExecutor();

  代码示例:创建可扩容线程的线程池为 10 个线程服务。代码同上,只修改:

1 ExecutorService service = Executors.newCachedThreadPool();

5、线程池好处

  为什么要用线程池管理线程呢?当然是为了线程复用。
  背景:经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大。
  思路:提前创建好多个线程,放入线程池中,使用时直接获取,使用完放回池中。可以避免频繁的创建和销毁,实现重复利用。类似生活中的公共交通工具。
  好处:提高响应速度(减少了创建新线程的时间);降低资源消耗(重复利用线程池中线程,不需要每次都创建);便于线程管理。

二、线程池设计与实现

1、介绍

  前面介绍了三种(固定数、单一的、可变的)创建线程池的方式,实际工作用哪一个呢?都不使用!为什么呢?
  《阿里巴巴Java开发手册》明确规定:线程池不允许使用Executors创建,而是通过ThreadPoolExecutor的方式,规避资源耗尽风险。

  查看源码,可以看到,用Executors创建线程池的三种方式中,都 new 了一个 ThreadPoolExecutor。所以,实际生产一般通过 ThreadPoolExecutor 的 7 个参数,自定义线程池。
  源码示例:7 个参数的构造器。

 1 public ThreadPoolExecutor(int corePoolSize, 2                           int maximumPoolSize, 3                           long keepAliveTime, 4                           TimeUnit unit, 5                           BlockingQueue workQueue, 6                           ThreadFactory threadFactory, 7                           RejectedExecutionHandler handler) { 8     if (corePoolSize < 0 || 9         maximumPoolSize <= 0 ||10         maximumPoolSize < corePoolSize ||11         keepAliveTime < 0)12         throw new IllegalArgumentException();13     if (workQueue == null || threadFactory == null || handler == null)14         throw new NullPointerException();15     this.corePoolSize = corePoolSize;16     this.maximumPoolSize = maximumPoolSize;17     this.workQueue = workQueue;18     this.keepAliveTime = unit.toNanos(keepAliveTime);19     this.threadFactory = threadFactory;20     this.handler = handler;21 }

2、银行服务

  介绍线程池之前,先来看一个生活中的案例。银行业务办理流程,如图:

  某银行一共有 5 个服务窗口,但平时一般只开放两个,另外三个不开放。大厅中还有 10 个等待服务的座位。某天:
  (1)客人1(用Thread1表示)来办理业务,他就直接去开放的窗口1办理(假设他需要服务的时间很长,一直在服务中,后面的也一样)。
  (2)Thread2来办理业务,由于窗口1在服务中,所以他去了开放的窗口2办理。
  (3)Thread3来办理业务,由于窗口1和窗口2都在服务中,所以他去了大厅的等待服务座位上排队等待。
  (4)Thread4~Thread12 同理Thread3。
  (5)Thread13来办理业务,由于窗口1和窗口2都在服务中,且此时大厅的等待座位上也已满。银行经理便将关闭的窗口3打开来为Thread13服务。注意:这里并不是Thread13去大厅排队,然后队列中队头元素Thread3出队接受服务。而是直接为Thread13服务。
  (6)Thread14,Thread15来办理业务,会开放窗口4为Thread14服务,开放窗口5为Thread15服务。
  (7)Thread16来办理业务,此时,已无可用窗口,且大厅的等待座位上也已满。银行便拒绝再为 Thread16 服务。
  说明:若 Thread13、Thread14、Thread15 业务办理完毕后,没有新的客人来银行办理业务。那么窗口3、窗口4、窗口5会在一定时间后又关闭起来。

3、核心参数(重要)

  下面介绍 ThreadPoolExecutor 构造器中的 7 个核心参数。

  corePoolSize:线程池的核心线程数。
  maximumPoolSize:线程池的最大线程数,要大于corePoolSize。
  keepAliveTime:非核心线程闲置下来最多存活的时间。
  unit:线程池中非核心线程保持存活的时间单位,与keepAliveTime一起使用。
  workQueue:用来保存提交后,等待执行任务的阻塞队列。
  threadFactory:创建线程的工厂类。
  handler:拒绝策略。

  在理解上一节"银行服务"的过程后,就不难理解上面 7 个参数的含义。

  corePoolSize = 2:窗口1 + 窗口2。
  maximumPoolSize = 5:窗口1 + 窗口2 + 窗口3 + 窗口4 + 窗口5。
  workQueue = 10:银行大厅排队队列的大小。关于阻塞队列 BlockingQueue workQueue 请看这篇。
  keepAliveTime + unit:"窗口3、窗口4、窗口5会在一定时间后又关闭起来"的时间。
  handler:"银行便拒绝再为 Thread16 服务"的拒绝方式。

  在了解 ThreadPoolExecutor 7个核心参数的作用后,再看Executors创建的三种线程池的源码,就不难理解他们的作用。也就明白为什么《阿里巴巴Java开发手册》中禁止使用Executors创建,而要使用ThreadPoolExecutor自定义线程池。
  源码示例:
  一池N线程:创建一个固定(可重用)线程数的线程池。

 1 ExecutorService Executors.newFixedThreadPool(int nThreads); 2  3 public static ExecutorService newFixedThreadPool(int nThreads) { 4     return new ThreadPoolExecutor(nThreads, nThreads, 5                                   0L, TimeUnit.MILLISECONDS, 6                                   new LinkedBlockingQueue()); 7 } 8  9 public LinkedBlockingQueue() {10     this(Integer.MAX_VALUE);11 }

  一池一线程:创建一个只有一个线程的线程池。

 1 ExecutorService Executors.newSingleThreadExecutor(); 2  3 public static ExecutorService newSingleThreadExecutor() { 4     return new FinalizableDelegatedExecutorService 5         (new ThreadPoolExecutor(1, 1, 6                                 0L, TimeUnit.MILLISECONDS, 7                                 new LinkedBlockingQueue())); 8 } 9 10 public LinkedBlockingQueue() {11     this(Integer.MAX_VALUE);12 }

  可扩容:创建一个可根据需要线程数,创建新的线程的线程池。

1 ExecutorService Executors.newCachedThreadPool();2 3 public static ExecutorService newCachedThreadPool() {4     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,5                                   60L, TimeUnit.SECONDS,6                                   new SynchronousQueue());7 }

4、工作流程(重要)

  在理解"银行服务"的过程后,其实也就说清楚了线程池的工作流程。只是一些细节没有说,比如:
  (1)窗口3为Thread13服务完成后,Thread14才来,情况如何?
  (2)……
  代码示例:银行 2+3 个窗口为陆续来的 16 个客户服务。

 1 public class ThreadPoolDemo { 2     public static void main(String[] args) throws Exception { 3         // 模拟上图中 2 + 3 + 10(大厅排队长度) 的线程池 4         ThreadPoolExecutor executor = new ThreadPoolExecutor( 5                 2, 5 6                 , 6, TimeUnit.SECONDS 7                 , new ArrayBlockingQueue<>(10) 8                 , Executors.defaultThreadFactory() 9                 , new ThreadPoolExecutor.AbortPolicy());10 11         // 创建16个线程模拟16个客户12         final int num = 16;13         for (int i = 1; i <= num; i++) {14             int finalI = i;15 16             // 这里主要为了给线程起名字.17             Thread thread = new Thread(() -> {18                 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 号客人开始服务");19 20                 // 假设 13 和 16 服务很快.21                 if (finalI != 16 && finalI != 13) {22                     try {23                         // 正在为 finalI 号客户服务中24                         Thread.sleep(1000_000);25                     } catch (InterruptedException e) {26                         e.printStackTrace();27                     }28                 }29 30                 System.out.println(Thread.currentThread().getName() + "=====" + finalI + " 号客人服务结束");31             }, "------" + i);32 33             System.out.println(thread.getName() + " 号客人来了");34             executor.execute(thread);35 36             // 让主线程休息一下,保证上面开启的线程先执行.37             Thread.sleep(200);38         }39 40         // 保证上面的线程执行完41         Thread.sleep(1_000);42 43         System.out.println("===核心线程数===" + executor.getCorePoolSize());44         System.out.println("===总任务数=====" + executor.getTaskCount());45 46         final BlockingQueue queue = executor.getQueue();47         System.out.println("===正在排队====" + queue.size());48 49         System.out.println("===最大线程数===" + executor.getMaximumPoolSize());50         System.out.println("==============" + executor.getPoolSize());51         System.out.println("===完成任务数===" + executor.getCompletedTaskCount());52         System.out.println("==============" + executor.getLargestPoolSize());53 54         executor.shutdown();55     }56 }57 58 // 结果(程序未停止)59 ------1 号客人来了60 pool-1-thread-1=====1 号客人开始服务61 ------2 号客人来了62 pool-1-thread-2=====2 号客人开始服务63 ------3 号客人来了64 ------4 号客人来了65 ------5 号客人来了66 ------6 号客人来了67 ------7 号客人来了68 ------8 号客人来了69 ------9 号客人来了70 ------10 号客人来了71 ------11 号客人来了72 ------12 号客人来了 // 到这里都不难理解73 ------13 号客人来了74 pool-1-thread-3=====13 号客人开始服务75 pool-1-thread-3=====13 号客人服务结束 // 开放窗口3为客户13立刻服务完毕.76 pool-1-thread-3=====3 号客人开始服务 // 阻塞队列,队头 客户3 出队接受窗口3的服务77 ------14 号客人来了 // 加入阻塞队列队尾78 ------15 号客人来了79 pool-1-thread-4=====15 号客人开始服务80 ------16 号客人来了81 pool-1-thread-5=====16 号客人开始服务82 pool-1-thread-5=====16 号客人服务结束83 pool-1-thread-5=====4 号客人开始服务84 ===核心线程数===285 ===总任务数=====1686 ===正在排队====987 ===最大线程数===588 ==============589 ===完成任务数===290 ==============5

  其他的情况,可通过修改代码示例中相关参数进行测试,自然就理解。

5、如何配置线程数

  线程在Java中属于稀缺资源,线程池不是越大越好,也不是越小越好。那么,线程池的参数要如何设置才合理呢?
  任务分为CPU密集型、IO密集型、混合型。
  CPU密集型:大部分都在用CPU跟内存,加密,逻辑操作,业务处理等。
  IO密集型:数据库链接,网络通讯传输等。
  CPU密集型:一般推荐线程池不要过大,一般是CPU数 + 1,+1是因为可能存在页缺失(就是可能存在有些数据在硬盘中需要多来一个线程将数据读入内存)。如果线程池数太大,可能会频繁的进行线程上下文切换跟任务调度。
  获得当前CPU核心数代码如下:

1 Runtime.getRuntime().availableProcessors();

  IO密集型:线程数适当大一点,机器的CPU核心数*2。
  混合型:可以考虑根绝情况将它拆分成CPU密集型和IO密集型任务,如果执行时间相差不大,拆分可以提升吞吐量,反之没有必要。

三、拒绝策略

1、介绍

  当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,就会调用这个接口里的这个方法。也就是"银行便拒绝再为 Thread16 服务"的拒绝方式。

1 public interface RejectedExecutionHandler {2     void rejectedExecution(Runnable r, ThreadPoolExecutor executor);3 }

2、4种拒绝策略

  ThreadPoolExecutor 提供了四种拒绝策略,分别是
  AbortPolicy:直接抛出异常,这也是默认策略。
  CallerRunsPolicy:返回给调用者处理。用调用者所在线程来运行任务。
  DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
  DiscardPolicy:不处理,直接丢弃当前任务。

  代码示例:4种拒绝策略,代码同上,只需修改:

 1 // 1.去掉这个if条件 2 if (finalI != 16 && finalI != 13) {} 3  4  5  6 // 2.1 线程池创建时拒绝策略为: 7 new ThreadPoolExecutor.AbortPolicy() 8 // 2.1 结果,直接抛出了异常.(只截取了最后一点) 9 ------16 号客人来了10 Exception in thread "main" java.util.concurrent.RejectedExecutionException11 12 13 14 // 2.2 线程池创建时拒绝策略为:15 new ThreadPoolExecutor.CallerRunsPolicy()16 // 2.2 结果,返回给调用者main处理了.(只截取了最后一点)17 ------16 号客人来了18 main=====16 号客人开始服务19 20 21 22 // 2.3 线程池创建时拒绝策略为:23 new ThreadPoolExecutor.DiscardOldestPolicy()24 // 2.3 结果,见下图25 26 27 28 // 2.4 线程池创建时拒绝策略为:29 new ThreadPoolExecutor.DiscardPolicy()30 // 2.4 结果(丢弃了任务,没有任何处理)

 

  通过debug断点的方式,可以查看到:DiscardOldestPolicy策略中,此时阻塞队列中是客户4~客户16。也就是客户3 出队,被抛弃,客户16入队等待。

3、自定义拒绝策略

  如果不使用线程池提供的4种拒绝策略,也可以自己实现拒绝策略的接口,实现对这些超出数量的任务的处理。比如:为被拒绝的任务开启一个新的线程执行,如下。

 1 // 线程池创建时拒绝策略为: 2 new MyRejectedExecutionHandler() 3 // 结果 4 ------16 号客人来了 5 ---开启新线程处理任务---=====16 号客人开始服务 6  7  8 // 自定义的策略拒绝 9 class MyRejectedExecutionHandler implements RejectedExecutionHandler {10 11     @Override12     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {13         new Thread(r, "---开启新线程处理任务---").start();14     }15 }

  参考文档:https://www.matools.com/api/java8

  《阿里巴巴Java开发手册》百度网盘:https://pan.baidu.com/s/1FZ9DNr0sF1mAc6Nq_6QZmg    密码: 4sw1

作者:Craftsman-L

本博客所有文章仅用于学习、研究和交流目的,版权归作者所有,欢迎非商业性质转载。

如果本篇博客给您带来帮助,请作者喝杯咖啡吧!点击下面打赏,您的支持是我最大的动力!

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/123754.html

相关文章

  • 聊聊面试中关于并发问题的应对方案

    摘要:这里呢,我直接给出高并发场景通常都会考虑的一些解决思路和手段结尾如何有效的准备面试中并发类问题,我已经给出我的理解。 showImg(https://segmentfault.com/img/bV7Viy?w=550&h=405); 主题 又到面试季了,从群里,看到许多同学分享了自己的面试题目,我也抽空在网上搜索了一些许多公司使用的面试题,目前校招和社招的面试题基本都集中在几个大方向上...

    xzavier 评论0 收藏0
  • 并发 - 收藏集 - 掘金

    摘要:在中一般来说通过来创建所需要的线程池,如高并发原理初探后端掘金阅前热身为了更加形象的说明同步异步阻塞非阻塞,我们以小明去买奶茶为例。 AbstractQueuedSynchronizer 超详细原理解析 - 后端 - 掘金今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供的队列式...

    levius 评论0 收藏0
  • 并发 - 收藏集 - 掘金

    摘要:在中一般来说通过来创建所需要的线程池,如高并发原理初探后端掘金阅前热身为了更加形象的说明同步异步阻塞非阻塞,我们以小明去买奶茶为例。 AbstractQueuedSynchronizer 超详细原理解析 - 后端 - 掘金今天我们来研究学习一下AbstractQueuedSynchronizer类的相关原理,java.util.concurrent包中很多类都依赖于这个类所提供的队列式...

    fantix 评论0 收藏0
  • 后端ing

    摘要:当活动线程核心线程非核心线程达到这个数值后,后续任务将会根据来进行拒绝策略处理。线程池工作原则当线程池中线程数量小于则创建线程,并处理请求。当线程池中的数量等于最大线程数时默默丢弃不能执行的新加任务,不报任何异常。 spring-cache使用记录 spring-cache的使用记录,坑点记录以及采用的解决方案 深入分析 java 线程池的实现原理 在这篇文章中,作者有条不紊的将 ja...

    roadtogeek 评论0 收藏0
  • 技术经理:求求你,别再乱改数据库连接的大小了!

    摘要:你仅仅需要一个大小为数据库连接池,然后让剩下的业务线程都在队列里等待就可以了。你应该经常会看到一些用户量不是很大的应用中,为应付大约十来个的并发,却将数据库连接池设置成,的情况。请不要过度配置您的数据库连接池的大小。 文章翻译整理自: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing欢迎关注个人微信公众...

    darkbug 评论0 收藏0

发表评论

0条评论

xiaochao

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<