资讯专栏INFORMATION COLUMN

一起学并发编程 - 简易线程池实现

Harriet666 / 1075人阅读

摘要:并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。

开发中经常会遇到各种池(如:连接池,线程池),它们的作用就是为了提高性能及减少开销,在JDK1.5以后的java.util.concurrent包中内置了很多不同使用场景的线程池,为了更好的理解它们,自己手写一个线程池,加深印象。

概述

1.什么是池

它的基本思想是一种对象池,程序初始化的时候开辟一块内存空间,里面存放若干个线程对象,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省系统的资源。

2.使用线程池的好处

合理的使用线程池可以重复利用已创建的线程,这样就可以减少在创建线程和销毁线程上花费的时间和资源。并且,线程池在某些情况下还能动态调整工作线程的数量,以平衡资源消耗和工作效率。同时线程池还提供了对池中工作线程进行统一的管理的相关方法。这样就相当于我们一次创建,就可以多次使用,大量的节省了系统频繁的创建和销毁线程所需要的资源。

简易版实现

包含功能:

1.创建线程池,销毁线程池,添加新任务

2.没有任务进入等待,有任务则处理掉

3.动态伸缩,扩容

4.拒绝策略

介绍了线程池的原理以及主要组件之后,就让我们来手动实现一个自己的线程池,以加深理解和深入学习。因为自己实现的简易版本所以不建议生产中使用,生产中使用java.util.concurrent会更加健壮和优雅(后续文章会介绍)

代码

以下线程池相关代码均在SimpleThreadPoolExecutor.java中,由于为了便于解读因此以代码块的形式呈现

维护一个内部枚举类,用来标记当前任务线程状态,在Thread中其实也有.

private enum TaskState {
    FREE, RUNNABLE, BLOCKED, TERMINATED;
}

定义拒绝策略接口,以及默认实现

static class DiscardException extends RuntimeException {
    private static final long serialVersionUID = 8827362380544575914L;

    DiscardException(String message) {
        super(message);
    }
}

interface DiscardPolicy {//拒绝策略接口
    void discard() throws DiscardException;
}

任务线程具体实现

1.继承Thread,重写run方法。

2.this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty() 如果当前线程处于空闲状态且没有任何任务了就将它wait住,让出CPU执行权

3.如果有任务就去执行FIFO(先进先出)策略

4.定义close方法,关闭线程,当然这里不能暴力关闭,所以这里有需要借助interrupt

public static class WorkerTask extends Thread {
    // 线程状态
    private TaskState taskState;
    // 线程编号
    private static int threadInitNumber;
    /**
     * 生成线程名,参考Thread.nextThreadNum();
     *
     * @return
     */
    private static synchronized String nextThreadName() {
        return THREAD_NAME_PREFIX + (++threadInitNumber);
    }

    WorkerTask() {
        super(THREAD_GROUP, nextThreadName());
    }

    @Override
    public void run() {
        Runnable target;
        //说明该线程处于空闲状态
        OUTER:
        while (this.taskState != TaskState.TERMINATED) {
            synchronized (TASK_QUEUE) {
                while (this.taskState == TaskState.FREE && TASK_QUEUE.isEmpty()) {
                    try {
                        this.taskState = TaskState.BLOCKED;//此处标记
                        //没有任务就wait住,让出CPU执行权
                        TASK_QUEUE.wait();
                        //如果被打断说明当前线程执行了 shutdown() 方法  线程状态为 TERMINATED 直接跳到 while 便于退出
                    } catch (InterruptedException e) {
                        break OUTER;
                    }
                }
                target = TASK_QUEUE.removeFirst();//遵循FIFO策略
            }
            if (target != null) {
                this.taskState = TaskState.RUNNABLE;
                target.run();//开始任务了
                this.taskState = TaskState.FREE;
            }
        }
    }

    void close() {//优雅关闭线程
        this.taskState = TaskState.TERMINATED;
        this.interrupt();
    }
}

简易版线程池,主要就是维护了一个任务队列线程集,为了动态扩容,自己也继承了Thread去做监听操作,对外提供submit()提交执行任务shutdown()等待所有任务工作完毕,关闭线程池

public class SimpleThreadPoolExecutor extends Thread {

    // 线程池大小
    private int threadPoolSize;
    // 最大接收任务
    private int queueSize;
    // 拒绝策略
    private DiscardPolicy discardPolicy;
    // 是否被销毁
    private volatile boolean destroy = false;

    private final static int DEFAULT_MIN_THREAD_SIZE = 2;// 默认最小线程数
    private final static int DEFAULT_ACTIVE_THREAD_SIZE = 5;// 活跃线程
    private final static int DEFAULT_MAX_THREAD_SIZE = 10;// 最大线程
    private final static int DEFAULT_WORKER_QUEUE_SIZE = 100;// 最多执行多少任务
    private final static String THREAD_NAME_PREFIX = "MY-THREAD-NAME-";//线程名前缀
    private final static String THREAD_POOL_NAME = "SIMPLE-POOL";//线程组的名称
    private final static ThreadGroup THREAD_GROUP = new ThreadGroup(THREAD_POOL_NAME);//线程组
    private final static List WORKER_TASKS = new ArrayList<>();// 线程容器
    // 任务队列容器,也可以用Queue 遵循 FIFO 规则
    private final static LinkedList TASK_QUEUE = new LinkedList<>();
    // 拒绝策略
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException("[拒绝执行] - [任务队列溢出...]");
    };

    private int minSize;//最小线程
    private int maxSize;//最大线程
    private int activeSize;//活跃线程

    SimpleThreadPoolExecutor() {
        this(DEFAULT_MIN_THREAD_SIZE, DEFAULT_ACTIVE_THREAD_SIZE, DEFAULT_MAX_THREAD_SIZE, DEFAULT_WORKER_QUEUE_SIZE, DEFAULT_DISCARD_POLICY);
    }

    SimpleThreadPoolExecutor(int minSize, int activeSize, int maxSize, int queueSize, DiscardPolicy discardPolicy){
        this.minSize = minSize;
        this.activeSize = activeSize;
        this.maxSize = maxSize;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        initPool();
    }

    void submit(Runnable runnable) {
        if (destroy) {
            throw new IllegalStateException("线程池已销毁...");
        }
        synchronized (TASK_QUEUE) {
            if (TASK_QUEUE.size() > queueSize) {//如果当前任务队超出队列限制,后续任务拒绝执行
                discardPolicy.discard();
            }
            // 1.将任务添加到队列
            TASK_QUEUE.addLast(runnable);
            // 2.唤醒等待的线程去执行任务
            TASK_QUEUE.notifyAll();
        }
    }

    void shutdown() throws InterruptedException {
        int activeCount = THREAD_GROUP.activeCount();
        while (!TASK_QUEUE.isEmpty() && activeCount > 0) {
            // 如果还有任务,那就休息一会
            Thread.sleep(100);
        }
        int intVal = WORKER_TASKS.size();//如果线程池中没有线程,那就不用关了
        while (intVal > 0) {
            for (WorkerTask task : WORKER_TASKS) {
                //当任务队列为空的时候,线程状态才会为 BLOCKED ,所以可以打断掉,相反等任务执行完在关闭
                if (task.taskState == TaskState.BLOCKED) {
                    task.close();
                    intVal--;
                } else {
                    Thread.sleep(50);
                }
            }
        }
        this.destroy = true;
        //资源回收
        TASK_QUEUE.clear();
        WORKER_TASKS.clear();
        this.interrupt();
        System.out.println("线程关闭");
    }

    private void createWorkerTask() {
        WorkerTask task = new WorkerTask();
        //刚创建出来的线程应该是未使用的
        task.taskState = TaskState.FREE;
        WORKER_TASKS.add(task);
        task.start();
    }

    /**
     * 初始化操作
     */
    private void initPool() {
        for (int i = 0; i < this.minSize; i++) {
            this.createWorkerTask();
        }
        this.threadPoolSize = minSize;
        this.start();//自己启动自己
    }

    @Override
    public void run() {
        while (!destroy) {
            try {
                Thread.sleep(5_000L);
                if (TASK_QUEUE.size() > activeSize && threadPoolSize < activeSize) { // 第一次扩容到 activeSize 大小
                    for (int i = threadPoolSize; i < activeSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = activeSize;
                    System.out.println("[初次扩充] - [" + toString() + "]");
                } else if (TASK_QUEUE.size() > maxSize && threadPoolSize < maxSize) {// 第二次扩容到最大线程
                    System.out.println();
                    for (int i = threadPoolSize; i < maxSize; i++) {
                        createWorkerTask();
                    }
                    this.threadPoolSize = maxSize;
                    System.out.println("[再次扩充] - [" + toString() + "]");
                } else {
                    //防止线程在submit的时候,其他线程获取到锁干坏事
                    synchronized (WORKER_TASKS) {
                        int releaseSize = threadPoolSize - activeSize;
                        Iterator iterator = WORKER_TASKS.iterator();// List不允许在for中删除集合元素,所以这里需要使用迭代器
                        while (iterator.hasNext()) {
                            if (releaseSize <= 0) {
                                break;
                            }
                            WorkerTask task = iterator.next();
                            //不能回收正在运行的线程,只回收空闲线程
                            if (task.taskState == TaskState.FREE) {
                                task.close();
                                iterator.remove();
                                releaseSize--;
                            }
                        }
                        System.out.println("[资源回收] - [" + toString() + "]");
                    }
                    threadPoolSize = activeSize;
                }
            } catch (InterruptedException e) {
                System.out.println("资源释放");
            }
        }
    }

    @Override
    public String toString() {
        return "SimpleThreadPoolExecutor{" +
                "threadPoolSize=" + threadPoolSize +
                ", taskQueueSize=" + TASK_QUEUE.size() +
                ", minSize=" + minSize +
                ", maxSize=" + maxSize +
                ", activeSize=" + activeSize +
                "}";
    }
}
测试一把

创建一个测试类

public class SimpleExecutorTest {

    public static void main(String[] args) throws InterruptedException {

        SimpleThreadPoolExecutor executor = new SimpleThreadPoolExecutor();
        IntStream.range(0, 30).forEach(i ->
                executor.submit(() -> {
                    System.out.printf("[线程] - [%s] 开始工作...
", Thread.currentThread().getName());
                    try {
                        Thread.sleep(2_000L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.printf("[线程] - [%s] 工作完毕...
", Thread.currentThread().getName());
                })
        );
        //executor.shutdown();如果放开注释即会执行完所有任务关闭线程池
    }
}

日志分析: 从日志中可以看到,初始化的时候是2个线程在工作,执行速度较为缓慢,当经过第一次扩容后,会观察到线程池里线程个数增加了,执行任务的速度就越来越快了,本文一共扩容了2次,第一次是扩容到activeSize的大小,第二次是扩容到maxSize,在执行任务的过程中,当线程数过多的时候就会触发回收机制...

[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[线程] - [MY-THREAD-NAME-1] 工作完毕...
[线程] - [MY-THREAD-NAME-1] 开始工作...
[线程] - [MY-THREAD-NAME-2] 工作完毕...
[线程] - [MY-THREAD-NAME-2] 开始工作...
[初次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=44, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-3] 开始工作...
...
[线程] - [MY-THREAD-NAME-6] 开始工作...
[线程] - [MY-THREAD-NAME-7] 开始工作...
[再次扩充] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=30, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-10] 开始工作...
...
[线程] - [MY-THREAD-NAME-5] 开始工作...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=10, taskQueueSize=4, minSize=2, maxSize=10, activeSize=5}]
[线程] - [MY-THREAD-NAME-4] 工作完毕...
...
[线程] - [MY-THREAD-NAME-7] 工作完毕...
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
[资源回收] - [SimpleThreadPoolExecutor{threadPoolSize=5, taskQueueSize=0, minSize=2, maxSize=10, activeSize=5}]
总结

通过本文,大致可以了解线程池的工作原理和实现方式,学习的过程中,就是要知其然知其所以然。这样才能更好地驾驭它,更好地去理解和使用,也能更好地帮助我们触类旁通,后面的文章中会详细介绍java.util.concurrent中的线程池

- 说点什么

全文代码:https://gitee.com/battcn/battcn-concurent/tree/master/Chapter1-1/battcn-thread/src/main/java/com/battcn/chapter12

个人QQ:1837307557

battcn开源群(适合新手):391619659

微信公众号:battcn(欢迎调戏)

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/67758.html

相关文章

  • 一起并发编程 - 处理异常中止的线程

    摘要:在之前,不能为线程单独设置或指定一个默认的,为了设置,需要继承并覆写方法。幸运的是后线程提供了一个方法,用来捕获并处理因线程中抛出的未知异常,以避免程序终止。 在单线程的开发过程中,通常采用try-catch的方式进行异常捕获,但是这种方式在多线程环境中会显得无能为力,而且还有可能导致一些问题的出现,比如发生异常的时候不能及时回收系统资源,或者无法及时关闭当前的连接... 概述 Ja...

    zacklee 评论0 收藏0
  • 一起并发编程 - 等待与通知

    摘要:如果有其它线程调用了相同对象的方法,那么处于该对象的等待池中的线程就会全部进入该对象的锁池中,从新争夺锁的拥有权。 wait,notify 和 notifyAll,这些在多线程中被经常用到的保留关键字,在实际开发的时候很多时候却并没有被大家重视,而本文则是对这些关键字的使用进行描述。 存在即合理 在java中,每个对象都有两个池,锁池(monitor)和等待池(waitset),每个...

    Meathill 评论0 收藏0
  • java并发编程习之线程-ThreadPoolExecutor(三)

    摘要:是所有线程池实现的父类,我们先看看构造函数构造参数线程核心数最大线程数线程空闲后,存活的时间,只有线程数大于的时候生效存活时间的单位任务的阻塞队列创建线程的工程,给线程起名字当线程池满了,选择新加入的任务应该使用什么策略,比如抛异常丢弃当前 ThreadPoolExecutor ThreadPoolExecutor是所有线程池实现的父类,我们先看看构造函数 构造参数 corePool...

    阿罗 评论0 收藏0
  • Python

    摘要:最近看前端都展开了几场而我大知乎最热语言还没有相关。有关书籍的介绍,大部分截取自是官方介绍。但从开始,标准库为我们提供了模块,它提供了和两个类,实现了对和的进一步抽象,对编写线程池进程池提供了直接的支持。 《流畅的python》阅读笔记 《流畅的python》是一本适合python进阶的书, 里面介绍的基本都是高级的python用法. 对于初学python的人来说, 基础大概也就够用了...

    dailybird 评论0 收藏0
  • 一起并发编程 - 守护线程

    摘要:的作用是为其他线程的运行提供服务,比如说线程。在某些平台上,指定一个较高的参数值可能使线程在抛出之前达到较大的递归深度。参数的值与最大递归深度和并发程度之间的关系细节与平台有关。 今天研究了下Java线程基础知识,发现以前太多知识知识略略带过了,比较说Java的线程机制,在Java中有两类线程:User Thread(用户线程)、Daemon Thread(守护线程),以及构造器中的s...

    junnplus 评论0 收藏0

发表评论

0条评论

Harriet666

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<