资讯专栏INFORMATION COLUMN

[译]RxJS文档03——剖析Observable

netScorpion / 1040人阅读

摘要:通过执行和可以向订阅者推送不同的通知。之后,执行过程可能被处理掉。当调用并得到观察者时,在中传入的函数将会被执行。每次执行都会触发一个多带带针对当前的运行逻辑。通知不发出任何值,表示流的结束。

原文:http://reactivex.io/rxjs/manu...

Rx.Observalbe.create()或者创建操作符,可以 创建(created) Observable流。
Observer则可以 订阅(subscribed) 这个流。
通过 执行(execute) next()error()complete()可以向订阅者推送不同的通知。
之后,执行过程可能被 处理掉(disposed)
这四个方面都被集成在Observable实例当中,但是也有一些方面与其他类型有关,比如ObserverSubscription

Observable的核心关注点是:

创建Observable流

订阅Observable流

执行Observable流

终止Observable流

创建Observable流

Rx.Observable.create可以说是Observable构造函数的别名,他可以接受一个参数:subscribe函数。

以下的例子创建了一个Observable流,每秒钟向Observer发出一个字符串类性值hi

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next("hi")
  }, 1000);
});

Observables流可以使用create()创建,但是通常我们会使用所谓的创建操作符,像of(),from(),interval()等等。

在上面的例子中,订阅函数(subscribe function)是描述Observalbe最重要的部分。那么,让我来看看何谓订阅。

订阅Observable流

在例子中,Observalbe的实例observable可以被订阅,像这样:

observable.subscribe(x => console.log(x));

也许你会注意到,observable.subscribe()subscribe函数在Rx.Observable.create(function subscribe(observer){...})中使用了相同的名字,这并不是巧合。
在库中,他们是不同的,但在实际使用中,你可以认为他们在概念上是相等的。

Observable不在多个Observer之间共享subscribe。当调用observable.subscribe()并得到观察者时,在Rx.Observable.create(function subscribe(observer){...})中传入的函数将会被执行。每次执行observable.subscribe()都会触发一个多带带针对当前Observer的运行逻辑。

订阅一个Observable流就像调用一个函数,流中的数据将会被传递给回调函数中。

一个subscribe函数被调用将会开启一个Observable执行流(Observable execution),向观察者们输出流中的值或者事件。

执行Observable流

代码Rx.Observable.create(function subscribe(observer){...})代表了一个“Observable流”,由于惰性计算,只用当有Observer订阅流时,函数才会被执行。
执行过程中随着时间线产生多个数据,方式是同步或异步二选一。

有三个类型的值会在执行流中发出:

"Next" 通知:发出一个值,比如数字,字符串,对象等等。

"Error"通知:发出一个js错误或者异常。

Complete通知:不发出任何值,表示流的结束。

Next通知是最重要也是最常用的类型:他代表了实际推送给Observer的值。ErrorComplete通知只会在执行流中发出一次,要么是Error,要么是Complete

用正表达式的规则可以很好的表达这种所谓的Observable语法和约定:

next*(error|complete)?

在一个Observable执行流中,会发出0到无限个Next通知。而一旦Error或者Complete通知被发出,执行流将不会再推送任何消息。

下面的例子展示了一个推送了3个NextComplete的流:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observables会严格遵守Observable约定,所以下面的代码将不会推送值4:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // Is not delivered because it would violate the contract
});

在订阅函数中使用try/catch捕获可能抛出的异常,也是一个很不错的做法:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // delivers an error if it caught one
  }
});
终止Observable流

Observable流的执行时间线可能是无限长的,但通常我们只用到有限的时间段和观察者处理业务,因此,我们需要一种中断流执行的API。
由于一个执行过程对于每个Observer是独有的,一旦Observer接收到值,那么也必然需要一种中断执行的方式,从而可以节省计算性能和内存空间。

observable.subscribe()被调用,Observer将被附加到新创建的Observable执行过程中,同时返回了一个对象,Subscription

var subscription = observable.subscribe(x => console.log(x));

Subscription代表了一个持续执行的过程,并且有一套最小化的API允许你中断流的执行过程。可以从这里进一步了解Subscription类型。以下例子展示了使用subscription.unsubscribe()中断持续执行的过程:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

当你订阅流就可以获取一个Subscription,代表了持续执行的过程。调用unsubscribe()就可以中断执行过程。

当我们使用create()创建一个Observable流时,每一个Observable都必须定义它如何处理获取到的资源的处理方式。你可以通过在subscribe()函数中返回一个自定义的unsubscribe函数,达到这个目的。

举个例子,以下展示了如何中断一个使用setInterval()执行interval的过程:

var observable = Rx.Observable.create(function subscribe(observer) {
  // Keep track of the interval resource
  var intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);

  // Provide a way of canceling and disposing the interval resource
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

就像observable.subscribe()类似Observable.create(function subscribe(){..})一样,我们从subscribe()返回的unsubscribe()也概念性的等同于subscription.unsubscribe()
事实上,如果我们移除与响应式编程相关的概念,剩下的就是直白的js代码了:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);

  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// Later:
unsubscribe(); // dispose the resources

我们使用Rx,包括ObservableObserverSubscription,其原因就是为了使用这些安全(就如Observable约定的)和可组合的操作符。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/83628.html

相关文章

  • []RxJS文档01——介绍

    摘要:原文是一个使用可观察量队列解决异步编程和基于事件编程的库。提供了几个管理异步事件的核心概念可观察量,代表了一个由未来获取到的值或事件组成的集合。相当于事件触发器,是向多个广播事件或推送值的唯一方法。 原文:http://reactivex.io/rxjs/manu... RxJS 是一个使用可观察量(observable)队列解决异步编程和基于事件编程的js库。他提供了一个核心的类型O...

    BlackHole1 评论0 收藏0
  • [] RxJS文档05——Subscription

    摘要:原文什么是是一个对象,表示一种可被处置的资源,通常指代一个流的执行过程。在之前版本中的,被称为可被处置的。本质是一个含有方法,用来释放资源或者取消流执行的对象。 原文: http://reactivex.io/rxjs/manu... 什么是Subscription? Subscription是一个对象,表示一种可被处置的资源,通常指代一个Observable流的执行过程。 Subsc...

    walterrwu 评论0 收藏0
  • []RxJS文档07——Operators 操作符

    摘要:原文提供的操作符非常有用,尽管是基础对象。我们称这种现象为操作符订阅链。静态操作符是依赖于类的一组纯函数,通常被用来从头创建流。最常见的静态操作符类型是所谓的创建操作符。贯穿本站的文档,我们会广泛的使用珠宝图去解释操作符是如何生效的。 原文:http://reactivex.io/rxjs/manu... Operators RxJS提供的操作符非常有用,尽管Observable是基础...

    cooxer 评论0 收藏0
  • []RxJS文档04——Observer 观察者

    摘要:原文什么是观察者是流推送数据的用户。观察者们就是一组函数的集合,监听着每一个流推送出的不同类型的通知,包括和。如果没有为某个类型的通知提供,流的执行过程仍然会照常进行,但是响应的通知将会被忽略,因为观察者没有提供相应的来接收。 原文: http://reactivex.io/rxjs/manu... 什么是Observer? 观察者(Observer)是Observable流推送数据的...

    xiaolinbang 评论0 收藏0
  • [] RxJS文档02—— Observable 可观察量

    摘要:原文可观察量是一种能惰性推送的集合,他可以包含多个值。是一种惰性计算方式,会在迭代中同步的返回到无限个可能的话返回值。使用一种处理方法,最终可能会或可能不会返回一个值。无论是同步方式还是异步方式,都可以择其一来传递返回值。 原文:http://reactivex.io/rxjs/manu... Observable 可观察量是一种能惰性推送的集合,他可以包含多个值。下面的表格对比了推送...

    A Loity 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<