摘要:实际上,在并行流上使用新的方法。此外,我们了解到所有并行流操作共享相同的范围。因此,您可能希望避免实施慢速阻塞流操作,因为这可能会减慢严重依赖并行流的应用程序的其他部分。
流可以并行执行,以增加大量输入元素的运行时性能。并行流ForkJoinPool通过静态ForkJoinPool.commonPool()方法使用公共可用的流。底层线程池的大小最多使用五个线程 - 具体取决于可用物理CPU核心的数量:
ForkJoinPool commonPool = ForkJoinPool.commonPool(); System.out.println(commonPool.getParallelism()); // 3
在我的机器上,公共池初始化为默认值为3的并行度。通过设置以下JVM参数可以减小或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持创建并行元素流的方法parallelStream()。或者,您可以在给定流上调用中间方法parallel(),以将顺序流转换为并行流。
为了评估并行流的并行执行行为,下一个示例将有关当前线程的信息打印出来:
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s] ", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s] ", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .forEach(s -> System.out.format("forEach: %s [%s] ", s, Thread.currentThread().getName()));
通过调查调试输出,我们应该更好地理解哪些线程实际用于执行流操作:
filter: b1 [main] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: c2 [ForkJoinPool.commonPool-worker-3] map: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: A2 [ForkJoinPool.commonPool-worker-1] map: b1 [main] forEach: B1 [main] filter: a1 [ForkJoinPool.commonPool-worker-3] map: a1 [ForkJoinPool.commonPool-worker-3] forEach: A1 [ForkJoinPool.commonPool-worker-3] forEach: C1 [ForkJoinPool.commonPool-worker-2]
如您所见,并行流利用公共中的所有可用线程ForkJoinPool来执行流操作。输出在连续运行中可能不同,因为实际使用的特定线程的行为是非确定性的。
让我们通过一个额外的流操作来扩展该示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1") .parallelStream() .filter(s -> { System.out.format("filter: %s [%s] ", s, Thread.currentThread().getName()); return true; }) .map(s -> { System.out.format("map: %s [%s] ", s, Thread.currentThread().getName()); return s.toUpperCase(); }) .sorted((s1, s2) -> { System.out.format("sort: %s <> %s [%s] ", s1, s2, Thread.currentThread().getName()); return s1.compareTo(s2); }) .forEach(s -> System.out.format("forEach: %s [%s] ", s, Thread.currentThread().getName()));
结果可能最初看起来很奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3] filter: c1 [ForkJoinPool.commonPool-worker-2] map: c1 [ForkJoinPool.commonPool-worker-2] filter: a2 [ForkJoinPool.commonPool-worker-1] map: a2 [ForkJoinPool.commonPool-worker-1] filter: b1 [main] map: b1 [main] filter: a1 [ForkJoinPool.commonPool-worker-2] map: a1 [ForkJoinPool.commonPool-worker-2] map: c2 [ForkJoinPool.commonPool-worker-3] sort: A2 <> A1 [main] sort: B1 <> A2 [main] sort: C2 <> B1 [main] sort: C1 <> C2 [main] sort: C1 <> B1 [main] sort: C1 <> C2 [main] forEach: A1 [ForkJoinPool.commonPool-worker-1] forEach: C2 [ForkJoinPool.commonPool-worker-3] forEach: B1 [main] forEach: A2 [ForkJoinPool.commonPool-worker-2] forEach: C1 [ForkJoinPool.commonPool-worker-1]
似乎sort只在主线程上顺序执行。实际上,sort在并行流上使用新的Java 8方法Arrays.parallelSort()。如Javadoc中所述,如果排序将按顺序或并行执行,则此方法决定数组的长度:
如果指定数组的长度小于最小粒度,则使用适当的Arrays.sort方法对其进行排序。
回到reduce一节的例子。我们已经发现组合器函数只是并行调用,而不是顺序流调用。让我们看看实际涉及哪些线程:
Listpersons = Arrays.asList( new Person("Max", 18), new Person("Peter", 23), new Person("Pamela", 23), new Person("David", 12)); persons .parallelStream() .reduce(0, (sum, p) -> { System.out.format("accumulator: sum=%s; person=%s [%s] ", sum, p, Thread.currentThread().getName()); return sum += p.age; }, (sum1, sum2) -> { System.out.format("combiner: sum1=%s; sum2=%s [%s] ", sum1, sum2, Thread.currentThread().getName()); return sum1 + sum2; });
控制台输出显示累加器和组合器函数在所有可用线程上并行执行:
accumulator: sum=0; person=Pamela; [main] accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3] accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2] accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1] combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1] combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2] combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,并行流可以为具有大量输入元素的流带来良好的性能提升。但请记住,某些并行流操作reduce,collect需要额外的计算(组合操作),这在顺序执行时是不需要的。
此外,我们了解到所有并行流操作共享相同的JVM范围ForkJoinPool。因此,您可能希望避免实施慢速阻塞流操作,因为这可能会减慢严重依赖并行流的应用程序的其他部分。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/72912.html
摘要:需要注意的是很多流操作本身就会返回一个流,所以多个操作可以直接连接起来,如下图这样,操作可以进行链式调用,并且并行流还可以实现数据流并行处理操作。为集合创建并行流。 上一篇文章,小乐给大家介绍了《Java8新特性之方法引用》,下面接下来小乐将会给大家介绍Java8新特性之Stream,称之为流,本篇文章为上半部分。 1、什么是流? Java Se中对于流的操作有输入输出IO流,而Jav...
摘要:新特性总览标签本文主要介绍的新特性,包括表达式方法引用流默认方法组合式异步编程新的时间,等等各个方面。还有对应的和类型的函数连接字符串广义的归约汇总起始值,映射方法,二元结合二元结合。使用并行流时要注意避免共享可变状态。 Java8新特性总览 标签: java [TOC] 本文主要介绍 Java 8 的新特性,包括 Lambda 表达式、方法引用、流(Stream API)、默认方...
摘要:限制编写并行流,存在一些与非并行流不一样的约定。底层框架并行流在底层沿用的框架,递归式的分解问题,然后每段并行执行,最终由合并结果,返回最后的值。 本书第六章的读书笔记,也是我这个系列的最后一篇读书笔记。后面7、8、9章分别讲的测试、调试与重构、设计和架构的原则以及使用Lambda表达式编写并发程序,因为笔记不好整理,就不写了,感兴趣的同学自己买书来看吧。 并行化流操作 关于并行与并发...
摘要:串行与并行可以分为串行与并行两种,串行流和并行流差别就是单线程和多线程的执行。返回串行流返回并行流和方法返回的都是类型的对象,说明它们在功能的使用上是没差别的。唯一的差别就是单线程和多线程的执行。 Stream是什么 Stream是Java8中新加入的api,更准确的说: Java 8 中的 Stream 是对集合(Collection)对象功能的增强,它专注于对集合对象进行各种非常便...
阅读 1356·2023-04-26 00:35
阅读 2726·2023-04-25 18:32
阅读 3378·2021-11-24 11:14
阅读 781·2021-11-22 15:24
阅读 1431·2021-11-18 10:07
阅读 6559·2021-09-22 10:57
阅读 2786·2021-09-07 09:58
阅读 3574·2019-08-30 15:54