资讯专栏INFORMATION COLUMN

ForkJoinPool的commonPool相关参数配置

TNFE / 2402人阅读

摘要:主要用于实现分而治之的算法,特别是分治之后递归调用的函数,例如等。最适合的是计算密集型的任务,如果存在,线程间同步,等会造成线程长时间阻塞的情况时,最好配合使用。

ForkJoinPool

ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

commonPool
static {
        // initialize field offsets for CAS etc
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class k = ForkJoinPool.class;
            CTL = U.objectFieldOffset
                (k.getDeclaredField("ctl"));
            RUNSTATE = U.objectFieldOffset
                (k.getDeclaredField("runState"));
            STEALCOUNTER = U.objectFieldOffset
                (k.getDeclaredField("stealCounter"));
            Class tk = Thread.class;
            PARKBLOCKER = U.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            Class wk = WorkQueue.class;
            QTOP = U.objectFieldOffset
                (wk.getDeclaredField("top"));
            QLOCK = U.objectFieldOffset
                (wk.getDeclaredField("qlock"));
            QSCANSTATE = U.objectFieldOffset
                (wk.getDeclaredField("scanState"));
            QPARKER = U.objectFieldOffset
                (wk.getDeclaredField("parker"));
            QCURRENTSTEAL = U.objectFieldOffset
                (wk.getDeclaredField("currentSteal"));
            QCURRENTJOIN = U.objectFieldOffset
                (wk.getDeclaredField("currentJoin"));
            Class ak = ForkJoinTask[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }

        commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
        defaultForkJoinWorkerThreadFactory =
            new DefaultForkJoinWorkerThreadFactory();
        modifyThreadPermission = new RuntimePermission("modifyThread");

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;
    }
makeCommonPool
/**
     * Creates and returns the common pool, respecting user settings
     * specified via system properties.
     */
    private static ForkJoinPool makeCommonPool() {
        int parallelism = -1;
        ForkJoinWorkerThreadFactory factory = null;
        UncaughtExceptionHandler handler = null;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
            String fp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.threadFactory");
            String hp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.exceptionHandler");
            if (pp != null)
                parallelism = Integer.parseInt(pp);
            if (fp != null)
                factory = ((ForkJoinWorkerThreadFactory)ClassLoader.
                           getSystemClassLoader().loadClass(fp).newInstance());
            if (hp != null)
                handler = ((UncaughtExceptionHandler)ClassLoader.
                           getSystemClassLoader().loadClass(hp).newInstance());
        } catch (Exception ignore) {
        }
        if (factory == null) {
            if (System.getSecurityManager() == null)
                factory = defaultForkJoinWorkerThreadFactory;
            else // use security-managed default
                factory = new InnocuousForkJoinWorkerThreadFactory();
        }
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;
        return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                                "ForkJoinPool.commonPool-worker-");
    }
配置参数

通过代码指定,必须得在commonPool初始化之前(parallel的stream被调用之前,一般可在系统启动后设置)注入进去,否则无法生效。
通过启动参数指定无此限制,较为安全

parallelism(即配置线程池个数)
可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.

static final int MAX_CAP      = 0x7fff;   //32767

如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1.

代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

或者参数指定

-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

threadFactory
默认为defaultForkJoinWorkerThreadFactory,没有securityManager的话。

/**
     * Default ForkJoinWorkerThreadFactory implementation; creates a
     * new ForkJoinWorkerThread.
     */
    static final class DefaultForkJoinWorkerThreadFactory
        implements ForkJoinWorkerThreadFactory {
        public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
            return new ForkJoinWorkerThread(pool);
        }
    }

代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.threadFactory",YourForkJoinWorkerThreadFactory.class.getName());

参数指定

-Djava.util.concurrent.ForkJoinPool.common.threadFactory=com.xxx.xxx.YourForkJoinWorkerThreadFactory

exceptionHandler
如果没有设置,默认为null

/**
     * Callback from ForkJoinWorkerThread constructor to establish and
     * record its WorkQueue.
     *
     * @param wt the worker thread
     * @return the worker"s queue
     */
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        wt.setDaemon(true);                           // configure thread
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.exceptionHandler",YourUncaughtExceptionHandler.class.getName());

参数指定

-Djava.util.concurrent.ForkJoinPool.common.exceptionHandler=com.xxx.xxx.YourUncaughtExceptionHandler
WorkQueue
// Mode bits for ForkJoinPool.config and WorkQueue.config
    static final int MODE_MASK    = 0xffff << 16;  // top half of int
    static final int LIFO_QUEUE   = 0;
    static final int FIFO_QUEUE   = 1 << 16;
    static final int SHARED_QUEUE = 1 << 31;       // must be negative

控制是FIFO还是LIFO

        /**
         * Takes next task, if one exists, in order specified by mode.
         */
        final ForkJoinTask nextLocalTask() {
            return (config & FIFO_QUEUE) == 0 ? pop() : poll();
        }

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

queue capacity

        /**
         * Capacity of work-stealing queue array upon initialization.
         * Must be a power of two; at least 4, but should be larger to
         * reduce or eliminate cacheline sharing among queues.
         * Currently, it is much larger, as a partial workaround for
         * the fact that JVMs often place arrays in locations that
         * share GC bookkeeping (especially cardmarks) such that
         * per-write accesses encounter serious memory contention.
         */
        static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

        /**
         * Maximum size for queue arrays. Must be a power of two less
         * than or equal to 1 << (31 - width of array entry) to ensure
         * lack of wraparound of index calculations, but defined to a
         * value a bit less than this to help users trap runaway
         * programs before saturating systems.
         */
        static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

超出报异常

        /**
         * Initializes or doubles the capacity of array. Call either
         * by owner or with lock held -- it is OK for base, but not
         * top, to move while resizings are in progress.
         */
        final ForkJoinTask[] growArray() {
            ForkJoinTask[] oldA = array;
            int size = oldA != null ? oldA.length << 1 : INITIAL_QUEUE_CAPACITY;
            if (size > MAXIMUM_QUEUE_CAPACITY)
                throw new RejectedExecutionException("Queue capacity exceeded");
            int oldMask, t, b;
            ForkJoinTask[] a = array = new ForkJoinTask[size];
            if (oldA != null && (oldMask = oldA.length - 1) >= 0 &&
                (t = top) - (b = base) > 0) {
                int mask = size - 1;
                do { // emulate poll from old array, push to new array
                    ForkJoinTask x;
                    int oldj = ((b & oldMask) << ASHIFT) + ABASE;
                    int j    = ((b &    mask) << ASHIFT) + ABASE;
                    x = (ForkJoinTask)U.getObjectVolatile(oldA, oldj);
                    if (x != null &&
                        U.compareAndSwapObject(oldA, oldj, x, null))
                        U.putObjectVolatile(a, j, x);
                } while (++b != t);
            }
            return a;
        }
doc

Java 并发编程笔记:如何使用 ForkJoinPool 以及原理

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/66704.html

相关文章

  • Java 8 Stream并行流

    摘要:实际上,在并行流上使用新的方法。此外,我们了解到所有并行流操作共享相同的范围。因此,您可能希望避免实施慢速阻塞流操作,因为这可能会减慢严重依赖并行流的应用程序的其他部分。 流可以并行执行,以增加大量输入元素的运行时性能。并行流ForkJoinPool通过静态ForkJoinPool.commonPool()方法使用公共可用的流。底层线程池的大小最多使用五个线程 - 具体取决于可用物理C...

    yzd 评论0 收藏0
  • CompletableFuture执行线程

    默认使用的线程池 不传executor时默认使用ForkJoinPool.commonPool() IntStream.range(0, 15).parallel().forEach(i -> { System.out.println(Thread.currentThread()); }); 输出 Thread[ForkJoinPool.commonPoo...

    mo0n1andin 评论0 收藏0
  • Java 8 并发教程:原子变量和 ConcurrentMa

    摘要:并发教程原子变量和原文译者飞龙协议欢迎阅读我的多线程编程系列教程的第三部分。如果你能够在多线程中同时且安全地执行某个操作,而不需要关键字或上一章中的锁,那么这个操作就是原子的。当多线程的更新比读取更频繁时,这个类通常比原子数值类性能更好。 Java 8 并发教程:原子变量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...

    bitkylin 评论0 收藏0
  • Java 8 并发: 原子变量和 ConcurrentMap

    摘要:在有些情况下,原子操作可以在不使用关键字和锁的情况下解决多线程安全问题。但其内部的结果不是一个单一的值这个类的内部维护了一组变量来减少多线程的争用。当来自多线程的更新比读取更频繁时这个类往往优于其他的原子类。 原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMap AtomicInteger java...

    yy13818512006 评论0 收藏0
  • Web Spider -- 做一个简单爬虫 (愿给您启示)

    摘要:一个简单的爬虫代码已托管这里有一个简单的例子根据提供的种子爬取数据指定对应的抓取规则自己定义抓取的链接简单的控制台打印结果建筑工地上的青年如何自我成长知乎国内专做进口行业的公司多不不包括货代公司知乎如何有效地进行后天 Web Spider 一个简单的爬虫 showImg(https://segmentfault.com/img/bVbckso?w=939&h=813); 代码已托管 这...

    bingo 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<