摘要:返回的对象同时是类型的,拥有方法。由于调用后,开始执行,因此,会返回一个供调用者来终止执行。是的一个衍生类,具有最新的值的概念。举一个形象的例子,表示一个人的生日,而则表示一个人的岁数。
什么是Subject? 在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer都有自己独立的执行环境),而Subject可以共享一个执行环境。
Subject是一种可以多路推送的可观察对象。与EventEmitter类似,Subject维护着自己的Observer。
每一个Subject都是一个Observable(可观察对象) 对于一个Subject,你可以订阅(subscribe)它,Observer会和往常一样接收到数据。从Observer的视角看,它并不能区分自己的执行环境是普通Observable的单路推送还是基于Subject的多路推送。
Subject的内部实现中,并不会在被订阅(subscribe)后创建新的执行环境。它仅仅会把新的Observer注册在由它本身维护的Observer列表中,这和其他语言、库中的addListener机制类似。
每一个Subject也可以作为Observer(观察者) Subject同样也是一个由next(v),error(e),和 complete()这些方法组成的对象。调用next(theValue)方法后,Subject会向所有已经在其上注册的Observer多路推送theValue。
下面的例子中,我们在Subject上注册了两个Observer,并且多路推送了一些数值:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(1); subject.next(2);
控制台输出结果如下:
observerA: 1 observerB: 1 observerA: 2 observerB: 2
既然Subject是一个Observer,你可以把它作为subscribe(订阅)普通Observable时的参数,如下面例子所示:
var subject = new Rx.Subject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); var observable = Rx.Observable.from([1, 2, 3]); observable.subscribe(subject); // 你可以传递Subject来订阅observable
执行后结果如下:
observerA: 1 observerB: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3
通过上面的实现:我们发现可以通过Subject将普通的Observable单路推送转换为多路推送。这说明了Subject的作用——作为单路Observable转变为多路Observable的桥梁。
还有几种特殊的Subject 类型,分别是BehaviorSubject,ReplaySubject,和 AsyncSubject。
多路推送的Observable在以后的语境中,每当提到“多路推送的Observable”,我们特指通过Subject构建的Observable执行环境。否则“普通的Observable”只是一个不会共享执行环境并且被订阅后才生效的一系列值。
通过使用Subject可以创建拥有相同执行环境的多路的Observable。
下面展示了多路的运作方式:Subject从普通的Observable订阅了数据,然后其他Observer又订阅了这个Subject,示例如下:
var source = Rx.Observable.from([1, 2, 3]); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); // 通过`subject.subscribe({...})`订阅Subject的Observer: multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); // 让Subject从数据源订阅开始生效: multicasted.connect();
multicast方法返回一个类似于Observable的可观察对象,但是在其被订阅后,它会表现Subject的特性。 multicast 返回的对象同时是ConnectableObservable类型的,拥有connect() 方法。
connect()方法非常的重要,它决定Observable何时开始执行。由于调用connect()后,Observable开始执行,因此,connect()会返回一个Subscription供调用者来终止执行。
引用计数通过手动调用connect()返回的Subscription控制执行十分繁杂。通常,我们希望在有第一个Observer订阅Subject后自动connnect,当所有Observer都取消订阅后终止这个Subject。
我们来分析一下下面例子中subscription的过程:
第一个Observer 订阅了多路推送的 Observable
多路Observable被连接
向第一个Observer发送 值为0的next通知
第二个Observer订阅了多路推送的 Observable
向第一个Observer发送 值为1的next通知
向第二个Observer发送 值为1的next通知
第一个Observer取消了对多路推送的Observable的订阅
向第二个Observer发送 值为2的next通知
第二个Observer取消了对多路推送的Observable的订阅
取消对多路推送的Observable的连接
通过显式地调用connect(),代码如下:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var multicasted = source.multicast(subject); var subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log("observerA: " + v) }); subscriptionConnect = multicasted.connect(); setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { subscription1.unsubscribe(); }, 1200); setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); }, 2000);
如果你不想显式地调用connect()方法,可以在ConnectableObservable类型的Observable上调用refCount()方法。方法会进行引用计数:记录Observable被订阅的行为。当订阅数从 0 到 1时refCount() 会调用connect() 方法。到订阅数从1 到 0,他会终止整个执行过程。
refCount 使得多路推送的Observable在被订阅后自动执行,在所有观察者取消订阅后,停止执行。
下面是示例:
var source = Rx.Observable.interval(500); var subject = new Rx.Subject(); var refCounted = source.multicast(subject).refCount(); var subscription1, subscription2, subscriptionConnect; console.log("observerA subscribed"); subscription1 = refCounted.subscribe({ next: (v) => console.log("observerA: " + v) }); setTimeout(() => { console.log("observerB subscribed"); subscription2 = refCounted.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 600); setTimeout(() => { console.log("observerA unsubscribed"); subscription1.unsubscribe(); }, 1200); setTimeout(() => { console.log("observerB unsubscribed"); subscription2.unsubscribe(); }, 2000);
执行输出结果如下:
observerA subscribed observerA: 0 observerB subscribed observerA: 1 observerB: 1 observerA unsubscribed observerB: 2 observerB unsubscribed
只有ConnectableObservables拥有refCount()方法,调用后会返回一个Observable而不是新的ConnectableObservable。
BehaviorSubjectBehaviorSubject是Subject的一个衍生类,具有“最新的值”的概念。它总是保存最近向数据消费者发送的值,当一个Observer订阅后,它会即刻从BehaviorSubject收到“最新的值”。
BehaviorSubjects非常适于表示“随时间推移的值”。举一个形象的例子,Subject表示一个人的生日,而Behavior则表示一个人的岁数。(生日只是一天,一个人的岁数会保持到下一次生日之前。)
下面例子中,展示了如何用 0初始化BehaviorSubject,当Observer订阅它时,0是第一个被推送的值。紧接着,在第二个Observer订阅BehaviorSubject之前,它推送了2,虽然订阅在推送2之后,但是第二个Observer仍然能接受到2:
var subject = new Rx.BehaviorSubject(0 /* 初始值 */); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(3);
输出结果如下:
observerA: 0 observerA: 1 observerA: 2 observerB: 2 observerA: 3 observerB: 3ReplaySubject
ReplaySubject 如同于BehaviorSubject是 Subject 的子类。通过 ReplaySubject可以向新的订阅者推送旧数值,就像一个录像机ReplaySubject可以记录Observable的一部分状态(过去时间内推送的值)。
.一个ReplaySubject可以记录Observable执行过程中推送的多个值,并向新的订阅者回放它们。
你可以指定回放值的数量:
var subject = new Rx.ReplaySubject(3 /* 回放数量 */); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5);
输出如下:
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerB: 2 observerB: 3 observerB: 4 observerA: 5 observerB: 5
除了回放数量,你也可以以毫秒为单位去指定“窗口时间”,决定ReplaySubject记录多久以前Observable推送的数值。下面的例子中,我们把回放数量设置为100,把窗口时间设置为500毫秒:
var subject = new Rx.ReplaySubject(100, 500 /* windowTime */); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); var i = 1; setInterval(() => subject.next(i++), 200); setTimeout(() => { subject.subscribe({ next: (v) => console.log("observerB: " + v) }); }, 1000);
第二个Observer接受到3(600ms), 4(800ms) 和 5(1000ms),这些值均在订阅之前的500毫秒内推送(窗口长度 1000ms - 600ms = 400ms < 500ms):
observerA: 1 observerA: 2 observerA: 3 observerA: 4 observerA: 5 observerB: 3 observerB: 4 observerB: 5 observerA: 6 observerB: 6 ...AsyncSubject
AsyncSubject是Subject的另外一个衍生类,Observable仅会在执行完成后,推送执行环境中的最后一个值。
var subject = new Rx.AsyncSubject(); subject.subscribe({ next: (v) => console.log("observerA: " + v) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log("observerB: " + v) }); subject.next(5); subject.complete();
输出结果如下:
observerA: 5 observerB: 5
AsyncSubject 与 last() 操作符相似,等待完成通知后推送执行过程的最后一个值。
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/79377.html
摘要:项目简介本次使用了和开发了一个地址输入框,主要实现的功能有限制输入符合条件的字符并每隔两位可以自动添加用于分割的冒号。项目屏蔽了的事件处理,同时使用来手动控制光标。继承于和因此同时具有和两者的方法。后面的和都是需要利用最新的来进行判断的。 项目简介 本次使用了RxJS和react开发了一个mac地址输入框,主要实现的功能有限制输入符合条件的字符1-9,a-f,并每隔两位可以自动添加用于...
摘要:是的缩写,起源于,是一个基于可观测数据流结合观察者模式和迭代器模式的一种异步编程的应用库。是基于观察者模式和迭代器模式以函数式编程思维来实现的。学习之前我们需要先了解观察者模式和迭代器模式,还要对流的概念有所认识。 RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stre...
摘要:到底是什么先上代码输出这里可以把想象成一个函数,这意味着你每次调用都会导致传入里的回调函数重新执行一次调用的方式为相当于。接收函数返回值的方式也从改为通过传入回调函数的方式获取。具体看代码运行结果如上的第一个回调函数里的结构是推荐的结构。 通俗的方式理解Rx.js 序言 今早看民工叔的文章的时候, 发现对Rxjs所知甚少, 于是去官方看了下教程, 整理出一些东西, 写成此文。Rxjs据...
摘要:技术积累经过社区的努力学习资料还是很多的,官方中文文档就已经很不错,不过我们先从天精通初步感受一下然后配合一些中文文档来补充知识点,最后再根据官方文档来校验整个知识体系。资料学习操作符的时候可以对照弹珠图的交互弹珠图的中文版中文文档 前言 最近准备毕设,技术选型的时候因为功能的一些需求准备将RxJs融入到项目中,考虑RxJs的时候因为之前的技术栈还犹豫了一下,查了一些资料以及粗略浏览了...
阅读 3202·2021-11-25 09:43
阅读 3209·2021-11-23 09:51
阅读 3522·2019-08-30 13:08
阅读 1572·2019-08-29 12:48
阅读 3597·2019-08-29 12:26
阅读 399·2019-08-28 18:16
阅读 2565·2019-08-26 13:45
阅读 2433·2019-08-26 12:15