资讯专栏INFORMATION COLUMN

Fork/Join 框架详解(基于 JDK 8)

Karuru / 3413人阅读

摘要:框架使用的是工作窃取算法。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。方法返回对象,如果任务被取消了则返回,如果任务没有完成或者没有抛出异常则返回。

概述

Fork 就是把一个大任务切分为若干个子任务并行地执行,Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。Fork/Join 框架使用的是工作窃取算法。

工作窃取算法

工作窃取算法是指某个线程从其他队列里窃取任务来执行。对于一个比较大的任务,可以把它分割为若干个互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个多带带的线程来执行队列里的任务,线程和队列一一对应。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务需要处理,于是它就去其他线程的队列里窃取一个任务来执行。由于此时它们访问同一个队列,为了减小竞争,通常会使用双端队列。被窃取任务的线程永远从双端队列的头部获取任务,窃取任务的线程永远从双端队列的尾部获取任务。

工作窃取算法的优缺点

优点:充分利用线程进行并行计算,减少了线程间的竞争。
缺点:双端队列只存在一个任务时会导致竞争,会消耗更多的系统资源,因为需要创建多个线程和多个双端队列。

Fork/Join 框架的异常处理

ForkJoinTask 在执行的时候可能抛出异常,但没有办法在主线程中直接捕获异常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法检查任务是否已经抛出异常或已经被取消。getException() 方法返回 Throwable 对象,如果任务被取消了则返回 CancellationException,如果任务没有完成或者没有抛出异常则返回 null

Fork/Join 框架的实现原理 fork() 方法的实现原理

当调用 ForkJoinTask 的 fork() 方法时,程序会调用 ForkJoinPool.WorkQueuepush() 方法异步地执行这个任务,然后立即返回结果。代码如下:

public final ForkJoinTask fork() {
    Thread t;
    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        ((ForkJoinWorkerThread)t).workQueue.push(this);
    else
        ForkJoinPool.common.externalPush(this);
    return this;
}

push() 方法把当前任务存放在一个 ForkJoinTask 数组队列里,然后再调用 ForkJoinPoolsignalWork() 方法唤醒或创建一个工作线程来执行任务。代码如下:

final void push(ForkJoinTask task) {
    ForkJoinTask[] a; ForkJoinPool p;
    int b = base, s = top, n;
    if ((a = array) != null) {    // ignore if queue removed
        int m = a.length - 1;     // fenced write for task visibility
        U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
        U.putOrderedInt(this, QTOP, s + 1);
        if ((n = s - b) <= 1) {
            if ((p = pool) != null)
                p.signalWork(p.workQueues, this);
        }
        else if (n >= m)
            growArray();
    }
}
join() 方法的实现原理

当调用 ForkJoinTask 的 join() 方法时,程序会调用 doJoin() 方法,通过 doJoin() 方法来判断返回什么结果

public final V join() {
    int s;
    if ((s = doJoin() & DONE_MASK) != NORMAL)
        reportException(s);
    return getRawResult();
}
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
    return (s = status) < 0 ? s :
        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
        (w = (wt = (ForkJoinWorkerThread)t).workQueue).
        tryUnpush(this) && (s = doExec()) < 0 ? s :
        wt.pool.awaitJoin(w, this, 0L) :
        externalAwaitDone();
}
private void reportException(int s) {
    if (s == CANCELLED)
        throw new CancellationException();
    if (s == EXCEPTIONAL)
        rethrow(getThrowableException());
}
public abstract V getRawResult();

实例代码:

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask {
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) { // 如果任务足够小,就计算任务
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else { // 如果任务大于阈值,分裂成两个子任务执行
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask countTask = new CountTask(1, 100);
        peekNextLocalTask();
        Future result = forkJoinPool.submit(countTask);
        try {
            if (countTask.isCompletedAbnormally()) {
                System.out.println(countTask.getException());
            }
            System.out.println(result.get());
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

参考资料
Java 并发编程的艺术

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/75169.html

相关文章

  • Java核心技术教程整理,长期更新

    以下是Java技术栈微信公众号发布的关于 Java 的技术干货,从以下几个方面汇总。 Java 基础篇 Java 集合篇 Java 多线程篇 Java JVM篇 Java 进阶篇 Java 新特性篇 Java 工具篇 Java 书籍篇 Java基础篇 8张图带你轻松温习 Java 知识 Java父类强制转换子类原则 一张图搞清楚 Java 异常机制 通用唯一标识码UUID的介绍及使用 字符串...

    Anchorer 评论0 收藏0
  • Java多线程进阶(一)—— J.U.C并发包概述

    摘要:整个包,按照功能可以大致划分如下锁框架原子类框架同步器框架集合框架执行器框架本系列将按上述顺序分析,分析所基于的源码为。后,根据一系列常见的多线程设计模式,设计了并发包,其中包下提供了一系列基础的锁工具,用以对等进行补充增强。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首发于一世流云专栏:https...

    anonymoussf 评论0 收藏0
  • Java 8 的 JVM 有多快?Fork-Join 性能基准测试

    摘要:这减轻了手动重复执行相同基准测试的痛苦,并简化了获取结果的流程。处理项目的代码并从标有注释的方法处生成基准测试程序。用和运行该基准测试得到以下结果。同时,和的基线测试结果也有略微的不同。 Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处java.util.concurrent 改动,但...

    Euphoria 评论0 收藏0
  • Java多线程进阶(四三)—— J.U.C之executors框架Fork/Join框架(1) 原

    摘要:同时,它会通过的方法将自己注册到线程池中。线程池中的每个工作线程都有一个自己的任务队列,工作线程优先处理自身队列中的任务或顺序,由线程池构造时的参数决定,自身队列为空时,以的顺序随机窃取其它队列中的任务。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首发于一世流云的专栏:https://segmentfau...

    cooxer 评论0 收藏0
  • Java的Fork/Join任务

    摘要:方法返回对象,如果任务被取消了则返回。如果任务没有完成或者没有抛出异常则返回。 一. Fork/Join 1 . 简单介绍 a . Fork/Join为JKD1.7引入,适用于对大量数据进行拆分成多个小任务进行计算的框架,最后把所有小任务的结果汇总合并得到最终的结果 b . 相关类 public abstract class RecursiveTask extends Fork...

    venmos 评论0 收藏0

发表评论

0条评论

Karuru

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<