资讯专栏INFORMATION COLUMN

Java 多线程(5):Fork/Join 型线程池与 Work-Stealing 算法

IamDLY / 3199人阅读

摘要:时,标准类库添加了,作为对型线程池的实现。类图用来专门定义型任务完成将大任务分割为小任务以及合并结果的工作。

JDK 1.7 时,标准类库添加了 ForkJoinPool,作为对 Fork/Join 型线程池的实现。Fork 在英文中有 分叉 的意思,而 Join合并 的意思。ForkJoinPool 的功能也是如此:Fork 将大任务分叉为多个小任务,然后让小任务执行,Join 是获得小任务的结果,然后进行合并,将合并的结果作为大任务的结果 —— 并且这会是一个递归的过程 —— 因为任务如果足够大,可以将任务多级分叉直到任务足够小。

由此可见,ForkJoinPool 可以满足 并行 地实现 分治算法(Divide-and-Conquer) 的需要。

ForkJoinPool 的类图如下:

可以看到 ForkJoinPool 实现了 ExecutorService 接口,所以首先 ForkJoinPool 也是一个 ExecutorService线程池)。因而 RunnableCallable 类型的任务,ForkJoinPool 也可以通过 submitinvokeAllinvokeAny 等方法来执行。但是标准类库还为 ForkJoinPool 定义了一种新的任务,它就是 ForkJoinTask

ForkJoinTask 类图:

ForkJoinTask 用来专门定义 Fork/Join 型任务 —— 完成将大任务分割为小任务以及合并结果的工作。一般我们不需要直接使用 ForkJoinTask,而是通过继承它的子类 RecursiveActionRecursiveTask 并实现对应的抽象方法 —— compute ,来定义我们自己的任务。其中,RecursiveAction 是不带返回值的 Fork/Join 型任务,所以使用此类任务并不产生结果,也就不涉及到结果的合并;而 RecursiveTask 是带返回值的 Fork/Join 型任务,使用此类任务的话,在任务结束前,我们需要进行结果的合并。其中,通过 ForkJoinTaskfork 方法,我们可以产生子任务并执行;通过 join 方法,我们可以获得子任务的结果。

ForkJoinPool 可以使用三种方法用来执行 ForkJoinTask

invoke 方法:

invoke 方法用来执行一个带返回值的任务(通常继承自RecursiveTask),并且该方法是阻塞的,直到任务执行完毕,该方法才会停止阻塞并返回任务的执行结果。

submit 方法:

除了从 ExecutorService 继承的 submit 方法外,ForkJoinPool 还定义了用来执行 ForkJoinTasksubmit 方法 —— 一般该 submit 方法用来执行带返回值的ForkJoinTask(通常继承自RecursiveTask)。该方法是非阻塞的,调用之后将任务提交给 ForkJoinPool 去执行便立即返回,返回的便是已经提交到 ForkJoinPool 去执行的 task —— 由类图可知 ForkJoinTask 实现了 Future 接口,所以可以直接通过 task 来和已经提交的任务进行交互。

execute 方法:

除了从 Executor 获得的 execute 方法外,ForkJoinPool 也定义了用来执行ForkJoinTaskexecute 方法 —— 一般该 execute 方法用来执行不带返回值的ForkJoinTask(通常继承自RecursiveAction) ,该方法同样是非阻塞的。

现在让我们来实践下 ForkJoinPool 的功能:计算 π 的值。计算 π 的值有一个通过多项式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多项式的项数越多,计算出的 π 的值越精确。

首先我们定义用来估算 π 的 PiEstimateTask

class PiEstimateTask extends RecursiveTask {

    private final long begin;
    private final long end;
    private final long threshold; // 分割任务的临界值

    public PiEstimateTask(long begin, long end, long threshold) {
        this.begin = begin;
        this.end = end;
        this.threshold = threshold;
    }

    @Override
    protected Double compute() {  // 实现 compute 方法
        if (end - begin <= threshold) {  // 临界值之下,不再分割,直接计算

            int sign; // 符号,多项式中偶数位取 1,奇数位取 -1(位置从 0 开始)
            double result = 0.0;
            
            for (long i = begin; i < end; i++) {
                sign = (i & 1) == 0 ? 1 : -1;
                result += sign / (i * 2.0 + 1);
            }

            return result * 4;
        }

        // 分割任务
        long middle = (begin + end) / 2;
        PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold);
        PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold);

        leftTask.fork();  // 异步执行 leftTask
        rightTask.fork(); // 异步执行 rightTask

        double leftResult = leftTask.join();   // 阻塞,直到 leftTask 执行完毕返回结果
        double rightResult = rightTask.join(); // 阻塞,直到 rightTask 执行完毕返回结果

        return leftResult + rightResult; // 合并结果
    }

}

然后我们使用 ForkJoinPoolinvoke 执行 PiEstimateTask

public class ForkJoinPoolTest {

    public static void main(String[] args) throws Exception {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
    
        // 计算 10 亿项,分割任务的临界值为 1 千万
        PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    
        double pi = forkJoinPool.invoke(task); // 阻塞,直到任务执行完毕返回结果
    
        System.out.println("π 的值:" + pi);
        
        forkJoinPool.shutdown(); // 向线程池发送关闭的指令
    }
}

运行结果:

我们也可以使用 submit 方法异步的执行任务(此处 submit 方法返回的 future 指向的对象即提交任务时的 task):

public static void main(String[] args) throws Exception {
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);

    PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000);
    Future future = forkJoinPool.submit(task); // 不阻塞
    
    double pi = future.get();
    System.out.println("π 的值:" + pi);
    System.out.println("future 指向的对象是 task 吗:" + (future == task));
    
    forkJoinPool.shutdown(); // 向线程池发送关闭的指令
}

运行结果:

值得注意的是,选取一个合适的分割任务的临界值,对 ForkJoinPool 执行任务的效率有着至关重要的影响。临界值选取过大,任务分割的不够细,则不能充分利用 CPU;临界值选取过小,则任务分割过多,可能产生过多的子任务,导致过多的线程间的切换和加重 GC 的负担从而影响了效率。所以,需要根据实际的应用场景选择一个合适的分割任务的临界值。

ForkJoinPool 相比于 ThreadPoolExecutor,还有一个非常重要的特点(优点)在于,ForkJoinPool具有 Work-Stealing (工作窃取)的能力。所谓 Work-Stealing,在 ForkJoinPool 中的实现为:线程池中每个线程都有一个互不影响的任务队列(双端队列),线程每次都从自己的任务队列的队头中取出一个任务来运行;如果某个线程对应的队列已空并且处于空闲状态,而其他线程的队列中还有任务需要处理但是该线程处于工作状态,那么空闲的线程可以从其他线程的队列的队尾取一个任务来帮忙运行 —— 感觉就像是空闲的线程去偷人家的任务来运行一样,所以叫 “工作窃取”。

Work-Stealing 的适用场景是不同的任务的耗时相差比较大,即某些任务需要运行较长时间,而某些任务会很快的运行完成,这种情况下用 Work-Stealing 很合适;但是如果任务的耗时很平均,则此时 Work-Stealing 并不适合,因为窃取任务时不同线程需要抢占锁,这可能会造成额外的时间消耗,而且每个线程维护双端队列也会造成更大的内存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作为对 ThreadPoolExecutor 的补充。

总结:
ForkJoinPoolThreadPoolExecutor 都是 ExecutorService(线程池),但ForkJoinPool 的独特点在于:

ThreadPoolExecutor 只能执行 RunnableCallable 任务,而 ForkJoinPool 不仅可以执行 RunnableCallable 任务,还可以执行 Fork/Join 型任务 —— ForkJoinTask —— 从而满足并行地实现分治算法的需要;

ThreadPoolExecutor 中任务的执行顺序是按照其在共享队列中的顺序来执行的,所以后面的任务需要等待前面任务执行完毕后才能执行,而 ForkJoinPool 每个线程有自己的任务队列,并在此基础上实现了 Work-Stealing 的功能,使得在某些情况下 ForkJoinPool 能更大程度的提高并发效率。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66494.html

相关文章

  • Fork/Join框架简介

    摘要:第二步执行任务并合并结果。使用两个类来完成以上两件事情我们要使用框架,必须首先创建一个任务。用于有返回结果的任务。如果任务顺利执行完成了,则设置任务状态为,如果出现异常,则纪录异常,并将任务状态设置为。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的...

    W_BinaryTree 评论0 收藏0
  • Java7任务并行执行神器:Fork&Join框架

    摘要:对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。是叉子分叉的意思,即将大任务分解成并行的小任务,是连接结合的意思,即将所有并行的小任务的执行结果汇总起来。使用方法会阻塞并等待子任务执行完并得到其结果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小...

    Luosunce 评论0 收藏0
  • Java 8 的 JVM 有快?Fork-Join 性能基准测试

    摘要:这减轻了手动重复执行相同基准测试的痛苦,并简化了获取结果的流程。处理项目的代码并从标有注释的方法处生成基准测试程序。用和运行该基准测试得到以下结果。同时,和的基线测试结果也有略微的不同。 Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处java.util.concurrent 改动,但...

    Euphoria 评论0 收藏0
  • java 8 实战》读书笔记 -第六章 用流收集数据

    摘要:分区函数返回一个布尔值,这意味着得到的分组的键类型是,于是它最多可以分为两组是一组,是一组。当遍历到流中第个元素时,这个函数执行时会有两个参数保存归约结果的累加器已收集了流中的前个项目,还有第个元素本身。 一、收集器简介 把列表中的交易按货币分组: Map transactionsByCurrencies = transactions.stream().collect(groupi...

    Airy 评论0 收藏0
  • Java 8 新特性之并行流与串行流

    摘要:概述简介并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流中将并行进行了优化,我们可以很容易的对数据进行并行操作,可以声明性地通过与在并行流与顺序流之间进行切换。 1. 概述 1.1 简介 并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流 Java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作,Stream API 可以声明性...

    2json 评论0 收藏0

发表评论

0条评论

IamDLY

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<