资讯专栏INFORMATION COLUMN

RxJs 核心概念之Observable

forrest23 / 612人阅读

摘要:函数调用后同步计算并返回单一值生成器函数遍历器遍历过程中同步计算并返回个到无穷多个值异步执行中返回或者不返回单一值同步或者异步计算并返回个到无穷多个值是函数概念的拓展既不像,也不像是。如果不调用函数,就不会执行如果如果不订阅,也不会执行。

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。下方表格对Observable进行了定位(为解决基于推送的多值问题):

MagicQ 单值 多值
拉取(Pull) 函数 遍历器
推送(Push) Promise Observable

:当observable被订阅后,会立即(同步地)推送123 三个值;1秒之后,继续推送4这个值,最后结束(推送结束通知):

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

为得到observable推送的值,我们需要订阅(subscribe)这个Observable:

var observable = Rx.Observable.create(function (observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  setTimeout(() => {
    observer.next(4);
    observer.complete();
  }, 1000);
});

console.log("just before subscribe");
observable.subscribe({
  next: x => console.log("got value " + x),
  error: err => console.error("something wrong occurred: " + err),
  complete: () => console.log("done"),
});
console.log("just after subscribe");

程序执行后,将在控制台输出如下结果:

just before subscribe
got value 1
got value 2
got value 3
just after subscribe
got value 4
done
拉取(Pull) V.S. 推送(Push)

拉取推送是数据生产者和数据消费者之间通信的两种不同机制。

何为拉取? 在拉取系统中,总是由消费者决定何时从生产者那里获得数据。生产者对数据传递给消费者的时间毫无感知(被动的生产者,主动的消费者)。

JavaScript函数是典型的拉取系统:函数是数据的生产者,对函数进行调用的代码(消费者)从函数调用后的返回值中拉取单值进行消费。

// 函数是数据的生产者
let getLuckyNumber = function() {
    return 7;
};

/* let代码段是数据的消费者,
 * getLuckyNumber对调用时间毫无感知。 
 */
let luckNumber = getLuckyNumber();

ES2015 引入了的 生成器函数 | 遍历器 (function*)同样是基于拉取的系统: 调用 iterator.next()的代码段是消费者,它可以从生成器函数中拉取多个值。

function* getLessThanTen() {
  var i = 0;
  while(i < 11) {
    yield i++;
  }
}

// 生产者
let iterator = getLessThanTen();

// 消费者
iterator.next(); // Object {value: 0, done: false}
iterator.next(); // Object {value: 1, done: false}
MagicQ 生产者 消费者
拉取 被动: 在被请求时产生数据 主动: 决定何时请求数据
推送 主动: 控制数据的产生逻辑 被动: 获得数据后进行响应

何为推送? 在推送系统中生产者决定何时向消费者传递数据,消费者对何时收到数据毫无感知(被动的消费者)。

现代JavaScript中Promise是典型的推送系统。作为数据生产者的Promise通过resolve()向数据消费者——回调函数传递数据:与函数不同,Promise决定向回调函数推送值的时间。

RxJS在 JavaScript 中引入了Observable(可观察对象)这个新的推送系统。Observable是多数据值的生产者,向Observer(被动的消费者)推送数据。

函数 调用后同步计算并返回单一值

生成器函数 | 遍历器 遍历过程中同步计算并返回0个到无穷多个值

Promise 异步执行中返回或者不返回单一值

Observable 同步或者异步计算并返回0个到无穷多个值

Observable 是函数概念的拓展

Observable既不像EventEmitter,也不像是Promise。Observable 中的 Subject 进行多路推送时与 EventEmitter 行为上有些类似,但是实际上Observable与EventEmitter并不相同。

Observable 更像是一个不需要传入参数的函数,它拓展了函数的概念使得它可以返回多个值。

看看下面的例子:

function foo() {
  console.log("Hello");
  return 42;
}

var x = foo.call(); // same as foo()
console.log(x);
var y = foo.call(); // same as foo()
console.log(y);

输出结果如下:

"Hello"
42
"Hello"
42

通过Observable可以实现同样的行为:

var foo = Rx.Observable.create(function (observer) {
  console.log("Hello");
  observer.next(42);
});

foo.subscribe(function (x) {
  console.log(x);
});
foo.subscribe(function (y) {
  console.log(y);
});

输出结果相同:

"Hello"
42
"Hello"
42

不论Observable还是函数都是在运行时进行求值计算的。如果不调用函数,console.log("Hello")就不会执行;如果如果不subscribe(订阅)Observable,console.log("Hello")也不会执行。此外,调用或者订阅都是独立的:两次调用产生两个独立的作用域,两次订阅同样会产生两个独立的作用域。EventEmitter总是在同一个作用域中,发射前也不会在意自己是否已经被订阅;Observable不会被共享而产生副作用,并且总是在被订阅时才执行。

订阅Observable与调用函数类似。

一些人认为Observable总是是异步的,这个观点并不正确,如果在控制台log函数中调用函数:

console.log("before");
console.log(foo.call());
console.log("after");

显然可以看到以下输出:

"before"
"Hello"
42
"after"

Observable的行为完全一样:

console.log("before");
foo.subscribe(function (x) {
  console.log(x);
});
console.log("after");

输出结果为:

"before"
"Hello"
42
"after"

订阅 foo完全是同步的,与函数的调用一样。

Observable可以异步或者同步地产生数据。

那Observable 与函数的不同之处在哪里? Observable可以在一个时间过程中‘返回’多个值,而函数却不能。在函数中你不可以这么做:

function foo() {
  console.log("Hello");
  return 42;
  return 100; // 这个语句永远不会被执行。
}

虽然函数只能有一个返回值,但是在Observable中你完全可以这么做:

var foo = Rx.Observable.create(function (observer) {
  console.log("Hello");
  observer.next(42);
  observer.next(100); // 返回另一个值
  observer.next(200); // 返回另一个值
});

console.log("before");
foo.subscribe(function (x) {
  console.log(x);
});
console.log("after");

输出结果如下:

"before"
"Hello"
42
100
200
"after"

你甚至可以异步地返回值:

var foo = Rx.Observable.create(function (observer) {
  console.log("Hello");
  observer.next(42);
  observer.next(100);
  observer.next(200);
  setTimeout(() => {
    observer.next(300); // happens asynchronously
  }, 1000);
});

console.log("before");
foo.subscribe(function (x) {
  console.log(x);
});
console.log("after");

输出结果:

"before"
"Hello"
42
100
200
"after"
300

结论:

func.call() 意味着“同步地给我一个值”

observable.subscribe() 意味着“不管是同步或者异步,给我一些值”

Observable 剖析

通过使用 Rx.Observable.create 或者是创建操作符创建一个Observable; Observable 被 Observer(观察者) 订阅; 在执行时 向观察者发送next / error / complete 通知;同时执行过程可以被 终止
Observable 类型的实例具备了以上四个方面的特性,与其他类型如:Observer 和 Subscription 紧密相关。

我们重点关注以下四个方面:

创建

订阅

执行

终止

创建

Rx.Observable.createObservable 构造函数的别名,接受一个参数: subscribe函数。

以下例子会创建一个Observable,每一秒钟向其订阅者发射一个"hi" 字符串。

var observable = Rx.Observable.create(function subscribe(observer) {
  var id = setInterval(() => {
    observer.next("hi")
  }, 1000);
});

除了使用create创建Observable,我们通常还使用创建操作符, 如 offrominterval, 等来创建Observable。

上面例子中,subscribe函数是定义Observable最重要的部分。我们接下来了解订阅的含义。

订阅

上面例子中的observable 可以以如下方式 订阅

observable.subscribe(x => console.log(x));

observable.subscribeObservable.create(function subscribe(observer) {...})中的subscribe 同名并非巧合。虽然在Rx中它们不是同一个对象,但是在工程中,我们可以在概念上视两者为等价物。

调用subscribe的观察者并不会共享同一个Observable。观察者调用observable.subscribe 时,Observable.create(function subscribe(observer) {...})中的subscribe会在调用它的观察者作用域中执行。每一次observable.subscribe的调用,都是彼此独立的。

订阅Observable如同调用函数,需要提供相应的回调方法。

订阅机制与处理事件的addEventListener / removeEventListenerAPI完全不同。通过observable.subscribe,观察者并不需要在Observable中进行注册,Observable也不需要维护订阅者的列表。

订阅后便进入了Observable的执行阶段,在执行阶段值和事件将会被传递给观察者供其消费。

执行

只有在被订阅之后Observable才会执行,执行的逻辑在Observable.create(function subscribe(observer) {...})中描述,执行后将会在特定时间段内,同步或者异步地成产多个数据值。

Observable在执行过程中,可以推送三种类型的值:

"Next" 通知: 实际产生的数据,包括数字、字符串、对象等

"Error" 通知:一个JavaScript错误或者异常

"Complete" 通知:一个不带有值的事件

“Next” 通知是最重要和常用的类型:表示事件传递给观察者的数据。错误和完成通知仅会在执行阶段推送其一,并不会同时推送错误和完成通知。

通过所谓的“Observable语法”或者“契约”可以最好地表达这个规则,“Observable语法”借助于正则表达式:

next*(error|complete)?

在Observable的执行过程中,0个或者多个“Next”通知会被推送。在错误或者完成通知被推送后,Observable不会再推送任何其他通知。

下面代码展示了Observable 在执行过程中推送3个“Next” 通知然后结束:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

Observable 严格遵守 Observable 契约,后面值为4的“Next” 通知永远不会被推送:

var observable = Rx.Observable.create(function subscribe(observer) {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
  observer.next(4); // 由于违法契约,4不会被推送
});

使用try/catch块包裹 subscribe 代码是一个很赞的想法,如果捕获了异常,可以推送错误通知:

var observable = Rx.Observable.create(function subscribe(observer) {
  try {
    observer.next(1);
    observer.next(2);
    observer.next(3);
    observer.complete();
  } catch (err) {
    observer.error(err); // 捕获异常后推送错误通知
  }
});
终止

Observable的执行可能是无限的,作为观察者需要主动中断执行:我们需要特定的API去终止执行过程。因为特定的观察者都有特定的执行过程,一旦观察者获得想要的数据后就需要终止执行过程以免带来计算时对内存资源的浪费。

observable.subscribe被调用时,观察者会与其执行作用域绑定,同时返回一个Subscription类型的对象:

var subscription = observable.subscribe(x => console.log(x));

Subscription对象表示执行过程,通过极简的API,你可以终止执行过程。详情请阅读Subscription 相关文档。通过调用subscription.unsubscribe() 你可以终止执行过程:

var observable = Rx.Observable.from([10, 20, 30]);
var subscription = observable.subscribe(x => console.log(x));
// Later:
subscription.unsubscribe();

在Observable被订阅后,代表执行过程的Subscription 对象将被返回。对其调用unsubscribe()就可以终止执行。

每一个Observable都需要在 create()的创建过程中定义终止的逻辑。在function subscribe()中返回自定义的unsubscribe就可以实现。

下面的例子说明了如何在终止后释放setInterval的句柄:

var observable = Rx.Observable.create(function subscribe(observer) {
  // 获得定时函数的句柄
  var intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);
  
  // 提供终止方法释放定时函数的句柄
  return function unsubscribe() {
    clearInterval(intervalID);
  };
});

类似于observable.subscribeObservable.create(function subscribe() {...})的关系,我们在subscribe中返回的 unsubscribe 也与subscription.unsubscribe在概念上等价。事实上,如果我们除去Rx的包装,纯粹的JavaScript代码简单清晰:

function subscribe(observer) {
  var intervalID = setInterval(() => {
    observer.next("hi");
  }, 1000);
  
  return function unsubscribe() {
    clearInterval(intervalID);
  };
}

var unsubscribe = subscribe({next: (x) => console.log(x)});

// 一段时间后:
unsubscribe(); // 终止

使用Observable、 Observer 和 Subscription这些概念的原因是,我们可以在Observable 契约之下安全、兼容地调用操作符。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/79350.html

相关文章

  • RxJS 核心概念Subject

    摘要:返回的对象同时是类型的,拥有方法。由于调用后,开始执行,因此,会返回一个供调用者来终止执行。是的一个衍生类,具有最新的值的概念。举一个形象的例子,表示一个人的生日,而则表示一个人的岁数。 什么是Subject? 在RxJS中,Subject是一类特殊的Observable,它可以向多个Observer多路推送数值。普通的Observable并不具备多路推送的能力(每一个Observer...

    weij 评论0 收藏0
  • 【CuteJavaScript】Angular6入门项目(3.编写服务和引入RxJS

    摘要:发布通过回调方法向发布事件。观察者一个回调函数的集合,它知道如何去监听由提供的值。 本文目录 一、项目起步 二、编写路由组件 三、编写页面组件 1.编写单一组件 2.模拟数据 3.编写主从组件 四、编写服务 1.为什么需要服务 2.编写服务 五、引入RxJS 1.关于RxJS 2.引入RxJS 3.改造数据获取方式 六、改造组件 1.添...

    RebeccaZhong 评论0 收藏0
  • RxJS 核心概念Observer & Subscription

    摘要:在中,是一个由回调函数组成的对象,键名分别为和,以此接受推送的不同类型的通知,下面的代码段是的一个示例调用逻辑,只需在订阅后将传入在中,是可选的。当然你也可以将和的回调函数分别传入什么是是一个代表可以终止资源的对象,表示一个的执行过程。 Observer(观察者) 什么是Observer? Observer(观察者)是Observable(可观察对象)推送数据的消费者。在RxJS中,O...

    tinysun1234 评论0 收藏0
  • RxJS和react开发mac地址输入框

    摘要:项目简介本次使用了和开发了一个地址输入框,主要实现的功能有限制输入符合条件的字符并每隔两位可以自动添加用于分割的冒号。项目屏蔽了的事件处理,同时使用来手动控制光标。继承于和因此同时具有和两者的方法。后面的和都是需要利用最新的来进行判断的。 项目简介 本次使用了RxJS和react开发了一个mac地址输入框,主要实现的功能有限制输入符合条件的字符1-9,a-f,并每隔两位可以自动添加用于...

    CastlePeaK 评论0 收藏0
  • 从观察者模式到迭代器模式系统讲解 RxJS Observable(一)

    摘要:是的缩写,起源于,是一个基于可观测数据流结合观察者模式和迭代器模式的一种异步编程的应用库。是基于观察者模式和迭代器模式以函数式编程思维来实现的。学习之前我们需要先了解观察者模式和迭代器模式,还要对流的概念有所认识。 RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stre...

    notebin 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<