摘要:运行机制分为源,中间操作,终止操作。反过来说,目前还无法专为某个并行流指定这个值。我们在本节中已经指出,并行流不总是比顺序流快。特别是和等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。
1 StreamStream是一组用来处理数组,集合的API。
1.1 特性不是数据结构,没有内部存储。
不支持索引访问。
延迟计算
支持并行
很容易生成数据或集合
支持过滤,查找,转换,汇总,聚合等操作。
Stream分为源source,中间操作,终止操作。
流的源可以是一个数组,集合,生成器方法,I/O通道等等。
一个流可以有零个或多个中间操作,每一个中间操作都会返回一个新的流,供下一个操作使用,一个流只会有一个终止操作。
Stream只有遇到终止操作,它的源才会开始执行遍历操作。
1.3 Stream的创建通过数组,Stream.of()
通过集合
通过Stream.generate方法来创建
通过Stram.iterate方法
其他API
import java.util.Arrays;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.Stream;
public class CreateStream {
//通过数组,Stream.of()
static void gen1(){
String[] str = {"a","b","c"};
Stream str1 = Stream.of(str);
}
//通过集合
static void gen2(){
List strings = Arrays.asList("a", "b", "c");
Stream stream = strings.stream();
}
//通过Stream.generate方法来创建
static void gen3(){
//这是一个无限流,通过这种方法创建在操作的时候最好加上limit进行限制
Stream generate = Stream.generate(() -> 1);
generate.limit(10).forEach(x -> System.out.println(x));
}
//通过Stram.iterate方法
static void gen4(){
Stream iterate = Stream.iterate(1, x -> x +1);
iterate.forEach(x -> System.out.println(x));
}
//其他API
static void gen5(){
String str = "abc";
IntStream chars = str.chars();
chars.forEach(x -> System.out.println(x));
}
}
2 Stream常用的API
2.1 中间操作
该操作会接受一个谓词(一个返回boolean的函数)作为参数,并返回一个包括所有符合谓词的元素的流。说白了就是给一个条件,filter会根据这个条件截取流中得数据。
public static void testFilter(){ Listintegers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); //截取所有能被2整除得数据 List collect = integers.stream().filter(i -> i % 2 == 0).collect(Collectors.toList()); System.out.println("collect = " + collect); }
结果:
collect = [2, 4, 6, 8, 10]
该操作会返回一个元素各异(根据流所生成元素的hashCode和equals方法实现)的流。
public static void main(String[] args) { Listnumbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4); List collect = numbers.stream().distinct().collect(Collectors.toList()); System.out.println("collect = " + collect); }
结果:
collect = [1, 2, 3, 4]
对流中得数据进行排序,可以以自然序或着用Comparator 接口定义的排序规则来排序一个流。Comparator 能使用lambada表达式来初始化,还能够逆序一个已经排序的流。
public static void main(String[] args) { Listintegers = Arrays.asList(5, 8, 2, 6, 41, 11); //排序默认为顺序 顺序 = [2, 5, 6, 8, 11, 41] List sorted = integers.stream().sorted().collect(Collectors.toList()); System.out.println("顺序 = " + sorted); //逆序 逆序 = [41, 11, 8, 6, 5, 2] List reverseOrder = integers.stream().sorted(Comparator.reverseOrder()).collect(Collectors.toList()); System.out.println("逆序 = " + reverseOrder); //也可以接收一个lambda List ages = integers.stream().sorted(Comparator.comparing(User::getAge)).collect(Collectors.toList()); }
该方法会返回一个不超过给定长度的流。
public static void testLimit(){ Listintegers = Arrays.asList(1, 2, 1, 3, 3, 2, 4); //截取流中得前三个元素 collect = [1, 2, 1] List collect = integers.stream().limit(3).collect(Collectors.toList()); System.out.println("collect = " + collect); }
该方法会返回一个扔掉了前面n个元素的流。如果流中元素不足n个,则返回一个空流。
public static void testSkip(){ Listintegers = Arrays.asList(1, 2, 1, 3, 3, 2, 4); //丢掉流中得前三个元素 collect = [3, 3, 2, 4] List collect = integers.stream().skip(3).collect(Collectors.toList()); System.out.println("collect = " + collect); }
该方法会接受一个函数作为参数,这个函数会被应用到每个元素上,并将其映射成一个新的元素。就是根据指定函数获取流中得每个元素得数据并重新组合成一个新的元素。
public static void main(String[] args) { //自己建好得一个获取对象list得方法 ListdishList = Dish.getDishList(); //获取每一道菜得名称 并放到一个list中 List collect = dishList.stream().map(Dish::getName).collect(Collectors.toList()); //collect = [pork, beef, chicken, french fries, rice, season fruit, pizza, prawns, salmon] System.out.println("collect = " + collect); }
该方法key可以让你把一个流中的每个值都换成另一个流,然后把所有的流都链接起来成为一个流。
给 定 单 词 列 表["Hello","World"] ,你想要返回列表 ["H","e","l", "o","W","r","d"],你可能会认为这很容易,通过map你可以把每个单词映射成一张字符表,然后调用 distinct 来过滤重复的字符,但是这个方法的问题在于,传递给 map 方法的Lambda为每个单词返回了一个 String[] ( String列表)。因此, map 返回的流实际上是Stream
正确写法应该是通过flatMap对其扁平化并作出对应处理。
public static void main(String[] args) { String[] words = {"Hello", "World"}; Listcollect = Stream.of(words). //数组转换流 map(w -> w.split("")). //去掉“”并获取到两个String[] flatMap(Arrays::stream). //方法调用将两个String[]扁平化为一个stream distinct(). //去重 collect(Collectors.toList()); //collect = [H, e, l, o, W, r, d] System.out.println("collect = " + collect); } }
peek 的设计初衷就是在流的每个元素恢复运行之前,插入执行一个动作。
public static void main(String[] args) { Listnumbers = Arrays.asList(2, 3, 4, 5); List result = numbers.stream() .peek(x -> System.out.println("from stream: " + x)) .map(x -> x + 17) .peek(x -> System.out.println("after map: " + x)) .filter(x -> x % 2 == 0) .peek(x -> System.out.println("after filter: " + x)) .limit(3) .peek(x -> System.out.println("after limit: " + x)) .collect(Collectors.toList()); }
结果:
from stream: 2 after map: 19 from stream: 3 after map: 20 after filter: 20 after limit: 20 from stream: 4 after map: 21 from stream: 5 after map: 22 after filter: 22 after limit: 22
从上面得代码已经可以看出来,collect是将最终stream中得数据收集起来,最终生成一个list,set,或者map。
public static void main(String[] args) { ListdishList = Dish.getDishList(); //list List collect = dishList.stream().limit(2).collect(Collectors.toList()); //set Set collect1 = dishList.stream().limit(2).collect(Collectors.toSet()); //map Map collect2 = dishList.stream().limit(2).collect(Collectors.toMap(Dish::getName, Dish::getType)); }
这里面生成map得toMap方法有三个重载,传入得参数都不同,这里使用得是传入两个Function类型得参数。当然,Collectors的功能还不止这些,下面的收集器中会有其他的详解。
2.2 终止操作循环 forEach
计算 min、max、count、average
匹配 anyMatch、allMatch、noneMatch、findFirst、findAny
汇聚 reduce
收集器 collect
2.3 查找和匹配另一个常见的数据处理套路是看看数据集中的某些元素是否匹配一个给定的属性。Stream API通过allMatch,anyMatch,noneMatch,findFirst和findAny方法提供了这样的工具。
查找和匹配都是终端操作。
anyMatch方法可以回答“流中是否有一个元素能匹配到给定的谓词”。会返回一个boolean值。
public class AnyMatch { public static void main(String[] args) { Listdish = Dish.getDish(); boolean b = dish.stream().anyMatch(Dish::isVegetarian); System.out.println(b); } }
allMatch方法和anyMatch类似,校验流中是否都能匹配到给定的谓词。
class AllMatch{ public static void main(String[] args) { Listdish = Dish.getDish(); //是否所有菜的热量都小于1000 boolean b = dish.stream().allMatch(d -> d.getCalories() < 1000); System.out.println(b); } }
noneMatch方法可以确保流中没有任何元素与给定的谓词匹配。
class NoneMatch{ public static void main(String[] args) { Listdish = Dish.getDish(); //没有任何菜的热量大于等于1000 boolean b = dish.stream().allMatch(d -> d.getCalories() >= 1000); System.out.println(b); } }
anyMatch,noneMatch,allMatch这三个操作都用到了所谓的短路。
findAny方法将返回当前流中的任意元素。
class FindAny{ public static void main(String[] args) { Listdish = Dish.getDish(); Optional any = dish.stream().filter(Dish::isVegetarian).findAny(); System.out.println("any = " + any); } }
findFirst方法能找到你想要的第一个元素。
class FindFirst{ public static void main(String[] args) { List2.4 归约 reducedish = Dish.getDish(); Optional any = dish.stream().filter(Dish::isVegetarian).findFirst(); System.out.println("any = " + any); } }
此类查询需要将流中所有元素反复结合起来,得到一个值,比如一个 Integer 。这样的查询可以被归类为归约操作(将流归约成一个值)。用函数式编程语言的术语来说,这称为折叠(fold),因为你可以将这个操 作看成把一张长长的纸(你的流)反复折叠成一个小方块,而这就是折叠操作的结果。
public static void main(String[] args) { Listintegers = Arrays.asList(1, 2, 3, 6, 8); //求list中的和,以0为基数 Integer reduce = integers.stream().reduce(0, (a, b) -> a + b); //Integer的静态方法 int sum = integers.stream().reduce(0, Integer::sum); System.out.println("reduce = " + reduce); }
public static void main(String[] args) { List2.5 收集器 Collectorsintegers = Arrays.asList(1, 2, 3, 6, 8); Optional min = integers.stream().reduce(Integer::min); System.out.println("min = " + min); Optional max = integers.stream().reduce(Integer::max); System.out.println("max = " + max); }
public static void main(String[] args) { Listdish = Dish.getDish(); //创建一个Comparator来进行比较 比较菜的卡路里 Comparator dishComparator = Comparator.comparingInt(Dish::getCalories); //maxBy选出最大值 Optional collect = dish.stream().collect(Collectors.maxBy(dishComparator)); System.out.println("collect = " + collect); //选出最小值 Optional collect1 = dish.stream().collect(Collectors.minBy(dishComparator)); System.out.println("collect1 = " + collect1); }
Collectors.summingInt 。它可接受一个把对象映射为求和所需 int 的函数,并返回一个收集器。
public static void main(String[] args) { Listdish = Dish.getDish(); //计算总和 int collect = dish.stream().collect(Collectors.summingInt(Dish::getCalories)); System.out.println("collect = " + collect); }
public static void main(String[] args) { Listdish = Dish.getDish(); //计算平均数 Double collect = dish.stream().collect(Collectors.averagingInt(Dish::getCalories)); System.out.println("collect = " + collect); }
public static void main(String[] args) { Listdish = Dish.getDish(); String collect = dish.stream().map(Dish::getName).collect(Collectors.joining()); System.out.println("collect = " + collect); }
joining 工厂方法有一个重载版本可以接受元素之间的分界符,这样你就可以得到一个逗号分隔的菜肴名称列表。
String collect = dish.stream().map(Dish::getName).collect(Collectors.joining(", "));
long howManyDishes = dish.stream().collect(Collectors.counting());2.6 分组
public static void main(String[] args) { Listdish = Dish.getDish(); //groupingBy接受一个function作为参数 Map > collect = dish.stream().collect(Collectors.groupingBy(Dish::getType)); System.out.println("collect = " + collect); }
如果想用以分类的条件可能比简单的属性访问器要复杂。例如,你可能想把热量不到400卡路里的菜划分为“低热量”(diet),热量400到700卡路里的菜划为“普通”(normal),高于700卡路里的划为“高热量”(fat)。由于 Dish 类的作者没有把这个操作写成一个方法,你无法使用方法引用,但你可以把这个逻辑写成Lambda表达式。
public static void main(String[] args) { ListdishList = Dish.getDish(); Map > collect = dishList.stream().collect(Collectors.groupingBy(dish->{ if (dish.getCalories() <= 400) { return "DIET"; } else if (dish.getCalories() <= 700) { return "NORMAL"; } else { return "FAT"; } })); System.out.println("collect = " + collect); }
要实现多级分组,我们可以使用一个由双参数版本的 Collectors.groupingBy 工厂方法创建的收集器,它除了普通的分类函数之外,还可以接受 collector 类型的第二个参数。那么要进行二级分组的话,我们可以把一个内层groupingBy 传递给外层 groupingBy ,并定义一个为流中项目分类的二级标准。
public static void main(String[] args) { Listdish = Dish.getDish(); Map >> collect = dish.stream().collect(Collectors.groupingBy(Dish::getType, Collectors.groupingBy(d -> { if (d.getCalories() <= 400) { return "DIET"; } else if (d.getCalories() <= 700) { return "NORMAL"; } else { return "FAT"; } }))); System.out.println("collect = " + collect); }
在上一面,我们看到可以把第二个 groupingBy 收集器传递给外层收集器来实现多级分组。但进一步说,传递给第一个 groupingBy 的第二个收集器可以是任何类型,而不一定是另一个 groupingBy 。
例如,要数一数菜单中每类菜有多少个,可以传递 counting 收集器作为groupingBy 收集器的第二个参数。
MaptypesCount = dish.stream().collect(groupingBy(Dish::getType, counting()));
普通的单参数 groupingBy(f) (其中 f 是分类函数)实际上是 groupingBy(f,toList()) 的简便写法。
把收集器的结果转换为另一种类型:
Collectors.collectingAndThen工厂方法
3 并行流并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让它们都忙起来。
3.1 将顺序流转为并行流你可以把流转换成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用 parallel 方法:
public static long parallelSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .parallel() .reduce(0L, Long::sum); }
Stream 在内部分成了几块。因此可以对不同的块独立并行进行归纳操作,最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。
类似地,你只需要对并行流调用 sequential 方法就可以把它变成顺序流。
配置并行流使用的线程池
看看流的 parallel 方法,你可能会想,并行流用的线程是从哪儿来的?有多少个?怎么自定义这个过程呢?
并行流内部使用了默认的 ForkJoinPool (7.2节会进一步讲到分支/合并框架),它默认的线 程 数 量 就是 你 的 处 理器 数 量 , 这个 值 是 由 Runtime.getRuntime().available-Processors() 得到的。 但 是 你 可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common.parallelism 来改变线程池大小,如下所示: System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12"); 这是一个全局设置,因此它将影响代码中所有的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让 ForkJoinPool 的大小等于处理器数量是个不错的默认值,除非你有很好的理由,否则我们强烈建议你不要修改它。
3.2 高效使用并行流我们声称并行求和方法应该比顺序和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!特别是在优化性能时,你应该始终遵循三个黄金规则:测量,测量,再测量。
如果有疑问,测量。把顺序流转成并行流轻而易举,但却不一定是好事。我们在本节中已经指出,并行流不总是比顺序流快。此外,并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,第一个也是最重要的建议就是用适当的基准来检查其性能。
留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流( IntStream 、LongStream 、 DoubleStream )来避免这种操作,但凡有可能都应该用这些流。
有些操作本身在并行流上的性能就比顺序流差。特别是 limit 和 findFirst 等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如, findAny 会比 findFirst 性能好,因为它不一定要按顺序来执行。你总是可以调用 unordered 方法来把有序流变成无序流。那么,如果你需要流中的n个元素而不是专门要前n个的话,对无序并行流调用limit 可能会比单个有序流(比如数据源是一个 List )更高效。
还要考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
要考虑流背后的数据结构是否易于分解。例如, ArrayList 的拆分效率比 LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用 range 工厂方法创建的原始类型流也可以快速分解。
流自身的特点,以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个 SIZED 流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数却无法预测,导致流本身的大小未知。
还要考虑终端操作中合并步骤的代价是大是小(例如 Collector 中的 combiner 方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
3.3 分支/合并框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是 ExecutorService 接口的一个实现,它把子任务分配给线程池(称为 ForkJoinPool )中的工作线程。
要把任务提交到这个池,必须创建 RecursiveTask 的一个子类,其中 R 是并行化任务(以 及所有子任务)产生的结果类型,或者如果任务不返回结果,则是 RecursiveAction 类型(当 然它可能会更新其他非局部机构)。
要定义 RecursiveTask, 只需实现它唯一的抽象方法compute :
protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。
public class ForkJoinSumCalculator extends java.util.concurrent.RecursiveTask{ private final long[] numbers; private final int start; private final int end; public static final long THRESHOLD = 10_000; public ForkJoinSumCalculator(long[] numbers) { this(numbers, 0, numbers.length); } private ForkJoinSumCalculator(long[] numbers, int start, int end) { this.numbers = numbers; this.start = start; this.end = end; } @Override protected Long compute() { int length = end - start; if (length <= THRESHOLD) { return computeSequentially(); } //创建一个子任务来为数组得前一半求和 ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); //利 用 另 一 个ForkJoinPool线程异步执行新创建的子任务 leftTask.fork(); //创建一个子任务来为数组得后一半求和 ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); //同步执行第二个子任务,有可能进一步递归 Long rightResult = rightTask.compute(); //读取第一个任务得结构,未完成就等待 Long leftResult = leftTask.join(); return leftResult + rightResult; } private long computeSequentially() { long sum = 0; for (int i = start; i < end; i++) { sum += numbers[i]; } return sum; } public static long forkJoinSum(long n) { long[] numbers = LongStream.rangeClosed(1, n).toArray(); ForkJoinTask task = new ForkJoinSumCalculator(numbers); return new ForkJoinPool().invoke(task); } public static void main(String[] args) { long l = ForkJoinSumCalculator.forkJoinSum(5); System.out.println("l = " + l); } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/7293.html
摘要:使用是为了更高效的性能。注意将数组转集合不能进行和操作。的内部类和都是继承,等方法中是默认而且不作任何操作。重新了这些方法而的内部类没有重新,所以会抛出异常。八玩转工具包是什么顾名思义,和的区别就是可以保存多个相同的对象。 Java中那些让你爱不释手工具库,精炼代码量一、JDK1.8 Stream新特性1、St...
摘要:一面试题及剖析今日面试题今天壹哥带各位复习一块可能会令初学者比较头疼的内容,起码当时让我很有些头疼的内容,那就是流。在这里壹哥会从两部分展开介绍流,即与流。除此之外尽量使用字节流。关闭此输入流并释放与流相关联的任何系统资源。 一. 面试题及剖析 1. 今日面试题 今天 壹哥 带各位复习一块可...
摘要:如果不指定字符集,则使用系统默认字符编码,系统的默认字符编码一般是。所以更准确的说,是将一个字节输入流按照给定的字符编码来解码,从而得到一个字符输入流。当然,缺点就是不能选择使用的字符编码。 相对于Python和 C来说,Java的I/O操作API比较复杂,因此本文打算做个简单的介绍。 1. I/O分类 总的来说Java的I/O按照处理数据的粒度和方向来划分,一共可以分为4类: 基...
摘要:第三个问题查找所有来自于剑桥的交易员,并按姓名排序。第六个问题打印生活在剑桥的交易员的所有交易额。第八个问题找到交易额最小的交易。 付诸实战 在本节中,我们会将迄今学到的关于流的知识付诸实践。我们来看一个不同的领域:执行交易的交易员。你的经理让你为八个查询找到答案。 找出2011年发生的所有交易,并按交易额排序(从低到高)。 交易员都在哪些不同的城市工作过? 查找所有来自于剑桥的交易...
摘要:新特性总览标签本文主要介绍的新特性,包括表达式方法引用流默认方法组合式异步编程新的时间,等等各个方面。还有对应的和类型的函数连接字符串广义的归约汇总起始值,映射方法,二元结合二元结合。使用并行流时要注意避免共享可变状态。 Java8新特性总览 标签: java [TOC] 本文主要介绍 Java 8 的新特性,包括 Lambda 表达式、方法引用、流(Stream API)、默认方...
阅读 1428·2021-11-22 15:24
阅读 2519·2021-10-11 11:06
阅读 2323·2021-10-09 09:45
阅读 2525·2021-09-09 09:33
阅读 634·2019-08-30 15:53
阅读 1438·2019-08-30 12:48
阅读 656·2019-08-29 13:47
阅读 499·2019-08-26 18:27