资讯专栏INFORMATION COLUMN

《java 8 实战》读书笔记 -第七章 并行数据处理与性能

刘福 / 3414人阅读

摘要:正确使用并行流错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。高效使用并行流留意装箱有些操作本身在并行流上的性能就比顺序流差还要考虑流的操作流水线的总计算成本。

一、并行流 1.将顺序流转换为并行流

对顺序流调用parallel方法:

public static long parallelSum(long n) { 
 return Stream.iterate(1L, i -> i + 1) 
 .limit(n) 
 .parallel() 
 .reduce(0L, Long::sum); 
}

它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似地,你只需要对并行流调用sequential方法就可以把它变成顺序流。但最后一次parallel或sequential调用会影响整个流水线。

2.测量流性能

iterate生成的是装箱的对象,必须拆箱成数字才能求和;

我们很难把iterate分成多个独立块来并行执行。

iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

3.正确使用并行流

错用并行流而产生错误的首要原因,就是使用的算法改变了某些共享状态。

public class Accumulator { 
 public long total = 0; 
 public void add(long value) { total += value; } 
}

public static long sideEffectParallelSum(long n) { 
 Accumulator accumulator = new Accumulator(); 
 LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); 
 return accumulator.total; 
}
上面的示例在本质上就是顺序的,每次访问total都会出现数据竞争.由于多个线程在同时访问累加器,执行total += value,而这一句虽然看似简单,却不是一个原子操作。所得的结果也是不可控的(错误的)。
4.高效使用并行流

留意装箱

有些操作本身在并行流上的性能就比顺序流差

还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大

对于较小的数据量,选择并行流几乎从来都不是一个好的决定

要考虑流背后的数据结构是否易于分解

流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。

还要考虑终端操作中合并步骤的代价是大是小

二、分支/合并框架(Fork/Join)

详见第六章相关内容
注意:不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

三、Spliterator

Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器”(splitable iterator)。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
Spliterator接口

public interface Spliterator { 
 boolean tryAdvance(Consumer action); 
 Spliterator trySplit(); 
 long estimateSize(); 
 int characteristics(); 
}

与往常一样,T是Spliterator遍历的元素的类型。tryAdvance方法的行为类似于普通的Iterator,因为它会按顺序一个一个使用Spliterator中的元素,并且如果还有其他元素要遍历就返回true。但trySplit是专为Spliterator接口设计的,因为它可以把一些元素划出去分给第二个Spliterator(由该方法返回),让它们两个并行处理。Spliterator还可通过estimateSize方法估计还剩下多少元素要遍历,因为即使不那么确切,能快速算出来是一个值也有助于让拆分均匀一点.

1.拆分过程

将Stream拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

2.实现你自己的 Spliterator

文中提到了reduce的三参数重载方法

 U reduce(U identity,BiFunction accumulator,BinaryOperator combiner)

它的三个参数:

identity: 一个初始化的值;这个初始化的值其类型是泛型U,与Reduce方法返回的类型一致;注意此时Stream中元素的类型是T,与U可以不一样也可以一样,这样的话操作空间就大了;不管Stream中存储的元素是什么类型,U都可以是任何类型,如U可以是一些基本数据类型的包装类型Integer、Long等;或者是String,又或者是一些集合类型ArrayList等;后面会说到这些用法。

accumulator: 其类型是BiFunction,输入是U与T两个类型的数据,而返回的是U类型;也就是说返回的类型与输入的第一个参数类型是一样的,而输入的第二个参数类型与Stream中元素类型是一样的。

combiner: 其类型是BinaryOperator,支持的是对U类型的对象进行操作;

第三个参数combiner主要是使用在并行计算的场景下;如果Stream是非并行时,第三个参数实际上是不生效的。

代码实现:

class WordCounter {
    private final int counter;
    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {
        this.counter = counter;
        this.lastSpace = lastSpace;
    }

    public WordCounter accumulate(Character c) {
        if (Character.isWhitespace(c)) {
            return lastSpace ? this : new WordCounter(counter, true);
        } else {
            return lastSpace ? new WordCounter(counter + 1, false) : this;
        }
    }

    public WordCounter combine(WordCounter wordCounter) {
        return new WordCounter(counter + wordCounter.counter,
            wordCounter.lastSpace);
    }

    public int getCounter() {
        return counter;
    }
}
class WordCounterSpliterator implements Spliterator {
    private final String string;
    private int currentChar = 0;

    public WordCounterSpliterator(String string) {
        this.string = string;
    }

    @Override
    public boolean tryAdvance(Consumer action) {
        action.accept(string.charAt(currentChar++));

        return currentChar < string.length();
    }

    @Override
    public Spliterator trySplit() {
        int currentSize = string.length() - currentChar;

        if (currentSize < 10) {
            return null;
        }

        for (int splitPos = (currentSize / 2) + currentChar;
                splitPos < string.length(); splitPos++) {
            if (Character.isWhitespace(string.charAt(splitPos))) {
                Spliterator spliterator = new WordCounterSpliterator(string.substring(
                            currentChar, splitPos));
                currentChar = splitPos;

                return spliterator;
            }
        }

        return null;
    }

    @Override
    public long estimateSize() {
        return string.length() - currentChar;
    }

    @Override
    public int characteristics() {
        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
    }
}
final String SENTENCE = 
 " Nel mezzo del cammin di nostra vita " + 
 "mi ritrovai in una selva oscura" + 
 " ché la dritta via era smarrita ";


private int countWords(Stream stream) { 
 WordCounter wordCounter = stream.reduce(new WordCounter(0, true), 
 WordCounter::accumulate, 
 WordCounter::combine); 
 return wordCounter.getCounter(); 
}

Spliterator spliterator = new WordCounterSpliterator(SENTENCE); 
Stream stream = StreamSupport.stream(spliterator, true);

System.out.println("Found " + countWords(stream) + " words");

最后打印显示

Found 19 words

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74284.html

相关文章

  • Java8实战》-读书笔记第一章(02)

    摘要:实战读书笔记第一章从方法传递到接着上次的,继续来了解一下,如果继续简化代码。去掉并且生成的数字是万,所消耗的时间循序流并行流至于为什么有时候并行流效率比循序流还低,这个以后的文章会解释。 《Java8实战》-读书笔记第一章(02) 从方法传递到Lambda 接着上次的Predicate,继续来了解一下,如果继续简化代码。 把方法作为值来传递虽然很有用,但是要是有很多类似与isHeavy...

    lushan 评论0 收藏0
  • Java 8 函数式编程」读书笔记——数据并行

    摘要:限制编写并行流,存在一些与非并行流不一样的约定。底层框架并行流在底层沿用的框架,递归式的分解问题,然后每段并行执行,最终由合并结果,返回最后的值。 本书第六章的读书笔记,也是我这个系列的最后一篇读书笔记。后面7、8、9章分别讲的测试、调试与重构、设计和架构的原则以及使用Lambda表达式编写并发程序,因为笔记不好整理,就不写了,感兴趣的同学自己买书来看吧。 并行化流操作 关于并行与并发...

    leone 评论0 收藏0
  • java 8 实战读书笔记 -第四章 引入流

    摘要:第四章引入流一什么是流流是的新成员,它允许你以声明性方式处理数据集合通过查询语句来表达,而不是临时编写一个实现。 第四章 引入流 一、什么是流 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码。 下面两段代码都是用来返回低...

    jeyhan 评论0 收藏0
  • java 8 实战读书笔记 -第六章 用流收集数据

    摘要:分区函数返回一个布尔值,这意味着得到的分组的键类型是,于是它最多可以分为两组是一组,是一组。当遍历到流中第个元素时,这个函数执行时会有两个参数保存归约结果的累加器已收集了流中的前个项目,还有第个元素本身。 一、收集器简介 把列表中的交易按货币分组: Map transactionsByCurrencies = transactions.stream().collect(groupi...

    Airy 评论0 收藏0
  • Java8实战》-第四章读书笔记(引入流Stream)

    摘要:内部迭代与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。流只能遍历一次请注意,和迭代器类似,流只能遍历一次。 流(Stream) 流是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我会在后面的笔记中...

    _ivan 评论0 收藏0
  • java 8 实战读书笔记 -第十一章 CompletableFuture:组合式异步编程

    摘要:方法接受一个生产者作为参数,返回一个对象,该对象完成异步执行后会读取调用生产者方法的返回值。该方法接收一个对象构成的数组,返回由第一个执行完毕的对象的返回值构成的。 一、Future 接口 在Future中触发那些潜在耗时的操作把调用线程解放出来,让它能继续执行其他有价值的工作,不再需要呆呆等待耗时的操作完成。打个比方,你可以把它想象成这样的场景:你拿了一袋子衣服到你中意的干洗店去洗。...

    zhangqh 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<