资讯专栏INFORMATION COLUMN

java并发编程学习6--并行流

mgckid / 622人阅读

摘要:类似的你可以用将并行流变为顺序流。中的使用顺序求和并行求和将流转为并行流配置并行流线程池并行流内部使用了默认的,默认的线程数量就是处理器的数量包括虚拟内核通过得到。

【概念

并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每一个数据块的流。在java7之前,并行处理数据很麻烦,第一,需要明确的把包含数据的数据结构分成若干子部分。第二,给每一个子部分分配一个独立的线程。第三,适当的时候进行同步,避免出现数据竞争带来的问题,最后将每一个子部分的结果合并。在java7中引入了forkjoin框架来完成这些步骤,而java8中的stream接口可以让你不费吹灰之力就对数据执行并行处理,而stream接口幕后正是使用的forkjoin框架。不过,对顺序流调用parallel()并不意味着流本身有任何的变化。它在内部实际上就是设了一个boolean标志,表示你想让parallel()之后的操作都并行执行。类似的你可以用sequential()将并行流变为顺序流。这两个方法可以让我们更细化的控制流。

eg.java8中stream的使用:

//顺序求和
public static long sum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            .reduce(0l,Long::sum);
}

//并行求和
public static long parallelSum(long n){
    return Stream.iterate(1l,i -> i + 1)
            .limit(n)
            //将流转为并行流
            .parallel()
            .reduce(0l,Long::sum);
}
【配置并行流线程池

并行流内部使用了默认的forkjoinPool,默认的线程数量就是处理器的数量(包括虚拟内核),
通过:Runtime.getRuntime().availableProcessors() 得到。
通过:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")来改变线程池大小。

【性能测试

我们不应该理所当然的任认为多线程比顺序执行的效率更高,来看下面的例子:

public class Exercise {

    public static void main(String[] args) {
        long num = 1000_000_0;

        long st = System.currentTimeMillis();
        System.out.println("iterate顺序" + sum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("iterate并行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream并行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st));

        st = System.currentTimeMillis();
        System.out.println("LongStream顺序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st));
    }

    //顺序求和
    public static long sum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                .reduce(0l,Long::sum);
    }

    //并行求和
    public static long parallelSum(long n){
        return Stream.iterate(1l,i -> i + 1)
                .limit(n)
                //将流转为并行流
                .parallel()
                .reduce(0l,Long::sum);
    }

    //迭代求和
    public static long forSum(long n){
        long result = 0;
        for(long i = 0 ;i <= n ; i++){
            result += i;
        }
        return result;
    }

    //longStream并行
    public static long longStreamParallelSum(long n){
        return LongStream.rangeClosed(1,n)
                .parallel()
                .reduce(0l,Long::sum);
    }

    //longStream顺序执行
    public static long longStreamSum(long n){
        return LongStream.rangeClosed(1,n)
                .reduce(0l,Long::sum);
    }
}

并行流执行的时间比顺序流和迭代执行的要长很多,两个原因:

iterate()生成的是装箱对象,必须要拆箱才能求和;

iterate()很难分成多个独立的块并行运行,因为每次应用这个函数都要依赖前一次的应用的结果。数字列表在归纳的过程开始时没有准备好,因而无法有效的把流划分成小块来并行处理。但是我们又标记流为并行执行,这就给顺序执行增加了开销,每一次的求和操作都新开启了一个线程。

【使用更有针对性的的方法

LongStream.rangeClosed():

    1. 直接产生long类型数据,没有开箱操作
    2. 生成数字范围,容易拆分成独立的小块
    

由此可见,选择适当的数据结构往往比并行化算法更重要。并行是有代价的。并行过程需要对流做递归划分,把每个子流的操作分配到不同的线程,然后把这些操作的结果合并成一个值。但是多核之间移动数据的代价比我们想象的要大,所以很重要的一点是保证再内核中并行执行的工作时间比内核之间传输数据的时间要长

【正确的使用并行流

错误使用并行流的首要原因就是使用的算法改变了共享变量的状态,因为修改共享变量意味着同步,而使用同步方法就会使的并行毫无意义。以下是一些建议:

1. 测试,并行还是顺序执行最重要的基准就是不停的测试性能。
2. 留意装箱,自动装箱,拆箱会大大降低性能,java8提供了LongStream,IntStream,DoubleStream来避免这两个操作。
3. 有些操作本身就是顺序执行要率高,例如:limit,findFirst等依赖元素顺序的操作。
4. 当执行单个任务的成本高时使用并行,如果单个操作的成本很低,并行执行反而会因为开启线程,标记状态等操作使得效率下降。
5. 小量数据不适用并行。
6. 考虑流中背后的数据结构是否易于分解。ArrayList的拆分效率比LinkedList高得多,因为前者用不着便利就可以平均拆分。另外,range工厂方法的原始类型数据流也可以快速分解。以下时流数据源的可分解性:
   - ArrayList:极佳
   - LinkedList:差
   - IntStream等:极佳
   - Stream.iterate:差
   - HashSet:好
   - TreeSet:好
7. 中间操作改变流的方法,涉及到排序就不适用并行。
8. 终端操作合并流的代价,涉及到排序就不适用并行。
【正确的使用并行

高并发、任务执行时间短的业务,线程池线程数可以设置为CPU核数+1,减少线程上下文的切换

并发不高、任务执行时间长的业务要区分开看:

假如是业务时间长集中在IO操作上,也就是IO密集型的任务,因为IO操作并不占用CPU,所以不要让所有的CPU闲下来,可以加大线程池中的线程数目,让CPU处理更多的业务

假如是业务时间长集中在计算操作上,也就是计算密集型任务,这个就没办法了,和(1)一样吧,线程池中的线程数设置得少一些,减少线程上下文的切换

并发高、业务执行时间长,解决这种类型任务的关键不在于线程池而在于整体架构的设计,看看这些业务里面某些数据是否能做缓存是第一步,增加服务器是第二步,至于线程池的设置,设置参考(2)。最后,业务执行时间长的问题,也可能需要分析一下,看看能不能使用中间件对任务进行拆分和解耦。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/68095.html

相关文章

  • java并发编程学习14--CompletableFuture(一)

    摘要:并行流与目前,我们对集合进行计算有两种方式并行流而更加的灵活,我们可以配置线程池的大小确保整体的计算不会因为等待而发生阻塞。 【回顾Future接口 Future接口时java5引入的,设计初衷是对将来某个时刻会发生的结果建模。它建模了一种异步计算,返回了一个执行预算结果的引用。比如,你去干洗店洗衣服,店员会告诉你什么时候可以来取衣服,而不是让你一直在干洗店等待。要使用Future只需...

    VioletJack 评论0 收藏0
  • Java 并发方案全面学习总结

    摘要:进程线程与协程它们都是并行机制的解决方案。选择是任意性的,并在对实现做出决定时发生。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。此线程池支持定时以及周期性执行任务的需求。 并发与并行的概念 并发(Concurrency): 问题域中的概念—— 程序需要被设计成能够处理多个同时(或者几乎同时)发生的事件 并行(Parallel...

    mengera88 评论0 收藏0
  • 关于分布式计算的一些概念

    摘要:关于三者的一些概括总结离线分析框架,适合离线的复杂的大数据处理内存计算框架,适合在线离线快速的大数据处理流式计算框架,适合在线的实时的大数据处理我是一个以架构师为年之内目标的小小白。 整理自《架构解密从分布式到微服务》第七章——聊聊分布式计算.做了相应补充和修改。 [TOC] 前言 不管是网络、内存、还是存储的分布式,它们最终目的都是为了实现计算的分布式:数据在各个计算机节点上流动,同...

    Ververica 评论0 收藏0
  • Java 8 函数式编程」读书笔记——数据并行

    摘要:限制编写并行流,存在一些与非并行流不一样的约定。底层框架并行流在底层沿用的框架,递归式的分解问题,然后每段并行执行,最终由合并结果,返回最后的值。 本书第六章的读书笔记,也是我这个系列的最后一篇读书笔记。后面7、8、9章分别讲的测试、调试与重构、设计和架构的原则以及使用Lambda表达式编写并发程序,因为笔记不好整理,就不写了,感兴趣的同学自己买书来看吧。 并行化流操作 关于并行与并发...

    leone 评论0 收藏0
  • Java8学习小记

    摘要:但有一个限制它们不能修改定义的方法的局部变量的内容。如前所述,这种限制存在的原因在于局部变量保存在栈上,并且隐式表示它们仅限于其所在线程。 2014年,Oracle发布了Java8新版本。对于Java来说,这显然是一个具有里程碑意义的版本。尤其是那函数式编程的功能,避开了Java那烦琐的语法所带来的麻烦。 这可以算是一篇Java8的学习笔记。将Java8一些常见的一些特性作了一个概要的...

    CHENGKANG 评论0 收藏0

发表评论

0条评论

mgckid

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<