摘要:正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量,也许性能会有提高。
正常情况下,一个流在执行一次终端操作之后便结束了。本文通过复制流内数据的方式,曲折的实现了同一个流上执行多次操作。
Demo只是思路,其性能并不一定高效,尤其是数据都在内存中处理时复制的开销很大。但如果流涉及大量I/O,也许性能会有提高。
public class StreamForker{ private final Stream stream; private final Map
accept方法将原始流中所有的数据添加到各个BlockingQueue内,此处实现了复制
class ForkingStreamConsumerimplements Consumer , Results { static final Object END_OF_STREAM = new Object(); private final List > queues; private final Map > actions; public ForkingStreamConsumer(List > queues, Map > actions) { this.queues = queues; this.actions = actions; } @Override public void accept(T t) { queues.forEach(q -> q.add(t)); } @SuppressWarnings("unchecked") void finish() { accept((T) END_OF_STREAM); } @SuppressWarnings("unchecked") @Override public R get(Object key) { try { return ((Future ) actions.get(key)).get(); } catch (Exception e) { throw new RuntimeException(e); } } }
此处重写了tryAdvance接口,只是简单的从BlockingQueue中取出数据,执行action。业务逻辑中复制流是为了做什么事情,action就是这件事情。ForkingStreamConsumer.END_OF_STREAM是Queue中数据结束的标示
class BlockingQueueSpliteratorimplements Spliterator { private final BlockingQueue q; BlockingQueueSpliterator(BlockingQueue q) { this.q = q; } @Override public boolean tryAdvance(Consumer super T> action) { T t; while (true) { try { t = q.take(); break; } catch (InterruptedException e) { } } if (t != ForkingStreamConsumer.END_OF_STREAM) { action.accept(t); return true; } return false; } @Override public Spliterator trySplit() { return null; } @Override public long estimateSize() { return 0; } @Override public int characteristics() { return 0; } }
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/77289.html
摘要:输出流只有在所有的输入流都完成以后才能完成,任何一条输入流上的错误都将立即推送到输出流上。如果没有转入输入流,输出流将会立即发出结束通知。返回值以数组形式获取到的每一个输入流的值,或者来自映射函数的值。返回值仅从最新的内部流上取值的流。 接着上一节的操作符继续,让我们直奔主题。 组合类操作符 组合类的操作符可以将不同流数据按一定的规则进行合并,从而获得所需要的完整数据。 combine...
摘要:使用的操作符这条从左到右的横线代表经过操作符转换后的输出流。返回值通过判定函数检测的值组成的流。返回值持续发出输入流上的值,直到通知流上发出值为止。 上期介绍过了rxjs中的三大件,Observable,subscription,subject,但是在开发过程我们最常接触到的东西非操作符莫属。比如上期代码中曾出现过的from就是一个操作符。rxjs中的操作符大致上可以分为几类,创建类,...
摘要:但不同的是,在的遍历调用过程中,如果一个事件还没有触发完毕获取到返回值,就触发了下一个事件,则将忽略返回的值。这样,我们就可以避免异步的返回值因为返回较慢,反而覆盖了之后异步的返回值。 Steam in ReactiveX showImg(https://segmentfault.com/img/bVFReX?w=100&h=100); ReactiveX,又称 Reactive Ex...
阅读 2015·2021-09-29 09:35
阅读 1950·2019-08-30 14:15
阅读 2975·2019-08-30 10:56
阅读 958·2019-08-29 16:59
阅读 572·2019-08-29 14:04
阅读 1303·2019-08-29 12:30
阅读 1022·2019-08-28 18:19
阅读 511·2019-08-26 11:51