资讯专栏INFORMATION COLUMN

《java 8 实战》读书笔记 -第六章 用流收集数据

Airy / 1653人阅读

摘要:分区函数返回一个布尔值,这意味着得到的分组的键类型是,于是它最多可以分为两组是一组,是一组。当遍历到流中第个元素时,这个函数执行时会有两个参数保存归约结果的累加器已收集了流中的前个项目,还有第个元素本身。

一、收集器简介

把列表中的交易按货币分组:

Map> transactionsByCurrencies = 
 transactions.stream().collect(groupingBy(Transaction::getCurrency));

从Collectors
类提供的工厂方法(例如groupingBy)创建的收集器。它们主要提供了三大功能:

将流元素归约和汇总为一个值

元素分组

元素分区

二、归约和汇总

数一数菜单里有多少种菜:

long howManyDishes = menu.stream().collect(Collectors.counting()); 

这还可以写得更为直接:

long howManyDishes = menu.stream().count();
1.查找流中的最大值和最小值

可以使用两个收集器,Collectors.maxBy和Collectors.minBy,来计算流中的最大或最小值。这两个收集器接收一个Comparator参数来比较流中的元素.
找出菜单中热量最高的菜:

Comparator dishCaloriesComparator = 
 Comparator.comparingInt(Dish::getCalories); 
Optional mostCalorieDish = 
 menu.stream() 
 .collect(maxBy(dishCaloriesComparator));
2.汇总

Collectors.summingInt
它可接受一个把对象映射为求和所需int的函数,并返回一个收集器;该收集器在传递给普通的collect方法后即执行我们需要的汇总操作。
eg:

int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));

另外,Collectors.summingLong和Collectors.summingDouble方法的作用完全一样,可以用于求和字段为long或double的情况。还有Collectors.averagingInt,连同对应的averagingLong和averagingDouble可以计算数值的平均数。

summarizing操作
通过一次summarizing操作你可以就数出菜单中元素的个数,并得到菜肴热量总和、平均值、最大值和最小值

IntSummaryStatistics menuStatistics = 
menu.stream().collect(summarizingInt(Dish::getCalories)); 

这个收集器会把所有这些信息收集到一个叫作IntSummaryStatistics的类里,它提供了方便的取值(getter)方法来访问结果。打印menuStatisticobject会得到以下输出:

IntSummaryStatistics{count=9, sum=4300, min=120, 
average=477.777778, max=800}

同样,相应的summarizingLong和summarizingDouble工厂方法有相关的LongSummaryStatistics和DoubleSummaryStatistics类型。

3.连接字符串

joining工厂方法返回的收集器会把对流中每一个对象应用toString方法得到的所有字符串连接成一个字符串。

String shortMenu = menu.stream().map(Dish::getName).collect(joining());

joining工厂方法有一个重载版本可以接受元素之间的分界符

String shortMenu = menu.stream().map(Dish::getName).collect(joining(", "));
4.广义的归约汇总

可以用reducing方法创建的收集器来计算你菜单的总热量,如下所示:

int totalCalories = menu.stream().collect(reducing( 
 0, Dish::getCalories, (i, j) -> i + j));

第一个参数是归约操作的起始值。

第二个参数将菜肴转换成一个表示其所含热量的int。

第三个参数是一个BinaryOperator,将两个项目累积成一个同类型的值。这里它就是对两个int求和。

单参数形式的reducing来找到热量最高的菜,如下所示:

Optional mostCalorieDish = 
 menu.stream().collect(reducing( 
 (d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));
相比stream的reduce方法collect方法特别适合表达可变容器上的归约,更关键的是它适合并行操作

计算菜单里所有菜肴的卡路里总和,以不同的方法执行同样的操作:

第一种:

int totalCalories = menu.stream().collect(reducing(0, 
 Dish::getCalories,
 Integer::sum));

第二种:

int totalCalories = 
  menu.stream().map(Dish::getCalories).reduce(Integer::sum).get();//reduce返回的是Optional

第三种:

int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum();

最后一种最佳。

三、分组

假设你要把菜单中的菜按照类型进行分类,有肉的放一组,有鱼的放一组,其他的都放另一组。用Collectors.groupingBy工厂方法返回的收集器就可以轻松地完成这项任务,如下所示:

Map> dishesByType = 
 menu.stream().collect(groupingBy(Dish::getType)); 

其结果是下面的Map:

{FISH=[prawns, salmon], OTHER=[french fries, rice, season fruit, pizza], 
MEAT=[pork, beef, chicken]}
给groupingBy方法传递了一个Function(以方法引用的形式),它提取了流中每一道Dish的Dish.Type。我们把这个Function叫作分类函数

如果Dish中没有定义类型获取方法,可以使用lambda表达式:

public enum CaloricLevel { DIET, NORMAL, FAT } 

Map> dishesByCaloricLevel = menu.stream().collect( 
 groupingBy(dish -> { 
 if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
 else if (dish.getCalories() <= 700) return 
 CaloricLevel.NORMAL; 
 else return CaloricLevel.FAT; 
 } ));
1.多级分组

使用一个由双参数版本的Collectors.groupingBy工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受collector类型的第二个参数:

Map>> dishesByTypeCaloricLevel = 
menu.stream().collect( 
 groupingBy(Dish::getType, 
 groupingBy(dish -> { 
 if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
 else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
 else return CaloricLevel.FAT; 
 } ) 
 ) 
);
这种多级分组操作可以扩展至任意层级,n级分组就会得到一个代表n级树形结构的n级Map
2.按子组收集数据

传递给第一个groupingBy的第二个收集器可以是任何类型,而不一定是另一groupingBy

Map typesCount = menu.stream().collect( 
 groupingBy(Dish::getType, counting())); 

其结果是下面的Map:

{MEAT=3, FISH=2, OTHER=4}
普通的单参数groupingBy(f)(其中f是分类函数)实际上是groupingBy(f, toList())的简便写法。

把收集器的结果转换为另一种类型
查找每个子组中热量最高的Dish

Map mostCaloricByType = 
menu.stream() 
.collect(groupingBy(Dish::getType,
collectingAndThen( 
maxBy(comparingInt(Dish::getCalories)), //maxBy工厂方法生成的收集器的类型是Optional
Optional::get)));

包装的Optional没什么用,把收集器返回的结果转换为另一种类型,你可以使用Collectors.collectingAndThen工厂方法;返回的收集器groupingBy收集器只有在应用分组条件后,第一次在流中找到某个键对应的元素时才会把键加入分组Map中,所以Optional::get这个操作放在这里是安全的,因为reducing收集器永远都不会返回Optional.empty()

与groupingBy联合使用的其他收集器的例子

Map totalCaloriesByType = 
menu.stream().collect(groupingBy(Dish::getType, 
summingInt(Dish::getCalories)));

对于每种类型的Dish,菜单中都有哪些CaloricLevel。我们可以把groupingBy和mapping收集器结合起来,如下所示:

Map> caloricLevelsByType = 
menu.stream().collect( 
groupingBy(Dish::getType, mapping( 
dish -> { 
if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
else return CaloricLevel.FAT;
}, 
toSet() )));//生成的CaloricLevel流传递给一个toSet收集器,
//它和toList类似,不过是把流中的元素累积到一个Set而不是List中,以便仅保留各不相同的值。

但通过使用toCollection,你就可以有更多的控制。例如,你可以给它传递一个构造函数引用来要求HashSet:

Map> caloricLevelsByType = 
menu.stream().collect( 
groupingBy(Dish::getType, mapping( 
dish -> { if (dish.getCalories() <= 400) return CaloricLevel.DIET; 
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL; 
else return CaloricLevel.FAT; }, 
toCollection(HashSet::new) )));

四、分区 1.分区的优势

分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函数。分区函数返回一个布尔值,这意味着得到的分组Map的键类型是Boolean,于是它最多可以分为两组——true是一组,false是一组。例如,如果你是素食者或是请了一位素食的朋友来共进晚餐,可能会想要把菜单按照素食和非素食分开:

Map> partitionedMenu = 
 menu.stream().collect(partitioningBy(Dish::isVegetarian));

计算素食和非素食的数量:

 menu.stream().collect(partitioningBy(Dish::isVegetarian,
 counting()));
2.将数字按质数和非质数分区
public boolean isPrime(int candidate) { 
 int candidateRoot = (int) Math.sqrt((double) candidate); 
 return IntStream.rangeClosed(2, candidateRoot) 
 .noneMatch(i -> candidate % i == 0); 
}

public Map> partitionPrimes(int n) { 
 return IntStream.rangeClosed(2, n).boxed() 
 .collect( 
 partitioningBy(candidate -> isPrime(candidate))); 
}

Collectors类的静态工厂方法:

五、收集器接口
public interface Collector { 
 Supplier supplier(); 
 BiConsumer accumulator(); 
 Function finisher(); 
 BinaryOperator combiner(); 
 Set characteristics(); 
}

T是流中要收集的项目的泛型。

A是累加器的类型,累加器是在收集过程中用于累积部分结果的对象。

R是收集操作得到的对象(通常但并不一定是集合)的类型。

例如,你可以实现一个ToListCollector类,将Stream中的所有元素收集List里,它的签名如下:

public class ToListCollector implements Collector, List>
1.理解 Collector 接口声明的方法 (1)建立新的结果容器:supplier方法

在调用时它会创建一个空的累加器实例,供数据收集过程使用

public Supplier> supplier() { 
 return () -> new ArrayList(); 
}

或者使用构造函数引用;

public Supplier> supplier() { 
 return ArrayList::new; 
}
(2)将元素添加到结果容器:accumulator方法

accumulator方法会返回执行归约操作的函数。当遍历到流中第n个元素时,这个函数执行时会有两个参数:保存归约结果的累加器(已收集了流中的前 n1 个项目),还有第n个元素本身。该函数将返回void,因为累加器是原位更新,即函数的执行改变了它的内部状态以体现遍历的元素的效果。对于ToListCollector,这个函数仅仅会把当前项目添加至已经遍历过的项目的列表:

public BiConsumer, T> accumulator() { 
 return (list, item) -> list.add(item); 
} 

你也可以使用方法引用,这会更为简洁:

public BiConsumer, T> accumulator() { 
 return List::add; 
}
(3)对结果容器应用最终转换:finisher方法

在遍历完流后,finisher方法必须返回在累积过程的最后要调用的一个函数,以便将累加器对象转换为整个集合操作的最终结果。

public Function, List> finisher() { 
 return Function.identity(); //累加器对象恰好符合预期的最终结果,
//因此无需进行转换。所以finisher方法只需返回identity函数
}
(4) 合并两个结果容器:combiner方法

combiner方法会返回一个供归约操作使用的函数,它定义了对流的各个子部分进行并行处理时,各个子部分归约所得的累加器要如何合并。

public BinaryOperator> combiner() { 
 return (list1, list2) -> { 
 list1.addAll(list2); 
 return list1; } 
}

有了这第四个方法,就可以对流进行并行归约了,会用到Java 7中引入的Fork/Join框架和Spliterator抽象

Fork/Join是什么?
Fork/Join框架是Java7提供的并行执行任务框架,思想是将大任务分解成小任务,然后小任务又可以继续分解,然后每个小任务分别计算出结果再合并起来,最后将汇总的结果作为大任务结果。其思想和MapReduce的思想非常类似。对于任务的分割,要求各个子任务之间相互独立,能够并行独立地执行任务,互相之间不影响。

Fork/Join的运行流程图如下:

我们可以通过Fork/Join单词字面上的意思去理解这个框架。Fork是叉子分叉的意思,即将大任务分解成并行的小任务,Join是连接结合的意思,即将所有并行的小任务的执行结果汇总起来。

工作窃取算法
ForkJoin采用了工作窃取(work-stealing)算法,若一个工作线程的任务队列为空没有任务执行时,便从其他工作线程中获取任务主动执行。为了实现工作窃取,在工作线程中维护了双端队列,窃取任务线程从队尾获取任务,被窃取任务线程从队头获取任务。这种机制充分利用线程进行并行计算,减少了线程竞争。但是当队列中只存在一个任务了时,两个线程去取反而会造成资源浪费。

工作窃取的运行流程图如下:

Fork/Join核心类
1.ForkJoinPool
ForkJoinPool是ForkJoin框架中的任务调度器,和ThreadPoolExecutor一样实现了自己的线程池,提供了三种调度子任务的方法:
execute:异步执行指定任务,无返回结果;
invoke、invokeAll:同步执行指定任务,等待完成才返回结果;
submit:异步执行指定任务,并立即返回一个Future对象;
2.ForkJoinTask
Fork/Join框架中的实际的执行任务类,有以下两种实现,一般继承这两种实现类即可。
RecursiveAction:用于无结果返回的子任务;
RecursiveTask:用于有结果返回的子任务;
Fork/Join框架实战
下面实现一个Fork/Join小例子,从1+2+...10亿,每个任务只能处理1000个数相加,超过1000个的自动分解成小任务并行处理;并展示了通过不使用Fork/Join和使用时的时间损耗对比。

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ForkJoinTask extends RecursiveTask {
   private static final long MAX = 1000000000L;
   private static final long THRESHOLD = 1000L;
   private long start;
   private long end;

   public ForkJoinTask(long start, long end) {
       this.start = start;
       this.end = end;
   }

   public static void main(String[] args) {
       test();
       System.out.println("--------------------");
       testForkJoin();
   }

   private static void test() {
       System.out.println("test");
       long start = System.currentTimeMillis();
       Long sum = 0L;
       for (long i = 0L; i <= MAX; i++) {
           sum += i;
       }
       System.out.println(sum);
       System.out.println(System.currentTimeMillis() - start + "ms");
   }

   private static void testForkJoin() {
       System.out.println("testForkJoin");
       long start = System.currentTimeMillis();
       ForkJoinPool forkJoinPool = new ForkJoinPool();
       Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX));
       System.out.println(sum);
       System.out.println(System.currentTimeMillis() - start + "ms");
   }

   @Override
   protected Long compute() {
       long sum = 0;
       if (end - start <= THRESHOLD) {
           for (long i = start; i <= end; i++) {
               sum += i;
           }
           return sum;
       } else {
           long mid = (start + end) / 2;

           ForkJoinTask task1 = new ForkJoinTask(start, mid);
           task1.fork();

           ForkJoinTask task2 = new ForkJoinTask(mid + 1, end);
           task2.fork();

           return task1.join() + task2.join();
       }
   }

}

这里需要计算结果,所以任务继承的是RecursiveTask类。ForkJoinTask需要实现compute方法,在这个方法里首先需要判断任务是否小于等于阈值1000,如果是就直接执行任务。否则分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会阻塞并等待子任务执行完并得到其结果。

程序输出:

test
500000000500000000
4992ms
--------------------
testForkJoin
500000000500000000
508ms

需要特别注意的是:

ForkJoinPool 使用submit 或 invoke 提交的区别:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit是异步执行,只有在Future调用get的时候会阻塞。
这里继承的是RecursiveTask,还可以继承RecursiveAction。前者适用于有返回值的场景,而后者适合于没有返回值的场景
这一点是最容易忽略的地方,其实这里执行子任务调用fork方法并不是最佳的选择,最佳的选择是invokeAll方法。

leftTask.fork();  
rightTask.fork();

替换为

invokeAll(leftTask, rightTask);

具体说一下原理:对于Fork/Join模式,假如Pool里面线程数量是固定的,那么调用子任务的fork方法相当于A先分工给B,然后A当监工不干活,B去完成A交代的任务。所以上面的模式相当于浪费了一个线程。那么如果使用invokeAll相当于A分工给B后,A和B都去完成工作。这样可以更好的利用线程池,缩短执行的时间。

(5) characteristics方法

返回一个不可变的Characteristics集合,它定义了收集器的行为——尤其是关于流是否可以并行归约,以及可以使用哪些优化的提示。
Characteristics是一个包含三个项目的枚举。

UNORDERED——归约结果不受流中项目的遍历和累积顺序的影响。

CONCURRENT——accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。

IDENTITY_FINISH——这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。

@Override 
 public Set characteristics() { 
 return Collections.unmodifiableSet(EnumSet.of( 
 IDENTITY_FINISH, CONCURRENT));
 }
2.进行自定义收集而不去实现Collector

Stream有一个重载的collect方法可以接受另外三个函数——supplier、accumulator和combiner,其语义和Collector接口的相应方法返回的函数完全相同。

List dishes = menuStream.collect( 
 ArrayList::new,
 List::add,
 List::addAll);//它永远都是一个IDENTITY_FINISH和CONCURRENT但并非UNORDERED的收集器。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/74250.html

相关文章

  • Java8实战》-第四章读书笔记(引入流Stream)

    摘要:内部迭代与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。流只能遍历一次请注意,和迭代器类似,流只能遍历一次。 流(Stream) 流是什么 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码了!我会在后面的笔记中...

    _ivan 评论0 收藏0
  • java 8 实战读书笔记 -第十四章 函数式编程的技巧

    摘要:但是,最好使用差异化的类型定义,函数签名如下其实二者说的是同一件事。后者的返回值和初始函数的返回值相同,即。破坏式更新和函数式更新的比较三的延迟计算的设计者们在将引入时采取了比较特殊的方式。四匹配模式语言中暂时并未提供这一特性,略。 一、无处不在的函数 一等函数:能够像普通变量一样使用的函数称为一等函数(first-class function)通过::操作符,你可以创建一个方法引用,...

    nemo 评论0 收藏0
  • java 8 实战读书笔记 -第四章 引入流

    摘要:第四章引入流一什么是流流是的新成员,它允许你以声明性方式处理数据集合通过查询语句来表达,而不是临时编写一个实现。 第四章 引入流 一、什么是流 流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,流还可以透明地并行处理,你无需写任何多线程代码。 下面两段代码都是用来返回低...

    jeyhan 评论0 收藏0
  • Java 8 函数式编程」读书笔记——高级集合类和收集

    摘要:本章是该书的第五章主要讲了方法引用和收集器方法引用形如这样的表达式可以简写为这种简写的语法被称为方法引用方法引用无需考虑参数因为一个方法引用可以在不同的情况下解析为不同的表达式这依赖于的推断方法引用的类型方法引用可以分为四类引用静态方法 本章是该书的第五章, 主要讲了方法引用和收集器 方法引用 形如: artist -> artist.getName() (String arg) ->...

    imingyu 评论0 收藏0
  • 引入流

    摘要:流的使用一般包括三件事一个数据源来执行一个查询一个中间操作链,形成一条流水线一个终端操作,执行流水线并生成结果以上便是流的一些基础知识,下一章会更加深入理解流。实战第四章引入流读书笔记欢迎加入咖啡馆的春天。 流是什么 几乎每个 Java 应用都会制造和处理集合。流 允许我们以声明性方式处理集合(通过类似于 SQL 查询语句来表达,而不是临时写一个实现)。此外 流 还可以透明地并行处理,...

    phodal 评论0 收藏0
  • Java8新特性总览

    摘要:新特性总览标签本文主要介绍的新特性,包括表达式方法引用流默认方法组合式异步编程新的时间,等等各个方面。还有对应的和类型的函数连接字符串广义的归约汇总起始值,映射方法,二元结合二元结合。使用并行流时要注意避免共享可变状态。 Java8新特性总览 标签: java [TOC] 本文主要介绍 Java 8 的新特性,包括 Lambda 表达式、方法引用、流(Stream API)、默认方...

    mayaohua 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<