资讯专栏INFORMATION COLUMN

通俗的方式理解RxJS

jzzlee / 455人阅读

摘要:到底是什么先上代码输出这里可以把想象成一个函数,这意味着你每次调用都会导致传入里的回调函数重新执行一次调用的方式为相当于。接收函数返回值的方式也从改为通过传入回调函数的方式获取。具体看代码运行结果如上的第一个回调函数里的结构是推荐的结构。

通俗的方式理解Rx.js 序言

今早看民工叔的文章的时候, 发现对Rxjs所知甚少, 于是去官方看了下教程, 整理出一些东西, 写成此文。
Rxjs据说会在2017年流行起来, 因为其处理异步逻辑,数据流, 事件非常擅长。 但是其学习曲线相比Promise, EventEmitter陡峭了不少。 而且民工叔也说:"由于RxJS的抽象程度很高,所以,可以用很简短代码表达很复杂的含义,这对开发人员的要求也会比较高,需要有比较强的归纳能力。" 本文就Rx.js的几个核心概念做出阐述。 尽可能以通俗易懂的方式解释这些概念。要是本文有误或不完善的地方,欢迎指出。

Observable到底是什么

先上代码:

let foo = Rx.Observable.create(observer => {
  console.log("Hello");
  observer.next(42);
});

foo.subscribe(x => console.log(x));
foo.subscribe(y => console.log(y));

输出

"Hello"
42
"Hello"
42

这里可以把foo想象成一个函数,这意味着你每次调用foo都会导致传入Rx.Observable.create里的回调函数重新执行一次, 调用的方式为foo.subscribe(callback), 相当于foo()。 接收函数返回值的方式也从var a = foo()改为通过传入回调函数的方式获取。第三行的observer.next表示返回一个值, 你可以调用多次,每次调用observer.next后, 会先将next里的值返回给foo.subcribe里的回调函数, 执行完后再返回。observer.complete, observer.error来控制流程。 具体看代码:

var observable = Rx.Observable.create(observer => {
  try {
    observer.next(1);
    console.log("hello");
    observer.next(2);
    observer.next(3);
    observer.complete();
    observer.next(4);
  } catch (err) {
    observer.error(err); 
  }
});

let = subcription = observable.subscribe(value => {
  console.log(value)
})

运行结果:

1
hello
2
3

如上的第一个回调函数里的结构是推荐的结构。 当observable的执行出现异常的时候,通过observer.error将错误返回, 然而observable.subscribe的回调函数无法接收到.因为observer.complete已经调用, 因此observer.next(4)的返回是无效的. Observable不是可以返回多个值的Promise 虽然获得Promise的值的方式也是通过then函数这种类似的方式, 但是new Promise(callback)里的callback回调永远只会执行一次!因为Promise的状态是不可逆的

可以使用其他方式创建Observable, 看代码:

var clicks = Rx.Observable.fromEvent(document, "click");
clicks.subscribe(x => console.log(x));

当用户对document产生一个click行为的时候, 就会打印事件对象到控制台上。

Observer是什么

先看代码:

let foo = Rx.Observable.create(observer => {
  console.log("Hello");
  observer.next(42);
});

let observer = x => console.log(x);
foo.subscribe(observer);

代码中的第二个变量就是observer. 没错, observer就是当Observable"返回"值的时候接受那个值的函数!第一行中的observer其实就是通过foo.subscribe传入的callback. 只不过稍加封装了。 怎么封装的? 看代码:

let foo = Rx.Observable.create(observer => {
  try {
    console.log("Hello");
    observer.next(42);
    observer.complete();
    observer.next(10);
  } catch(e) { observer.error(e) }
  
});

let observer = {
  next(value) { console.log(value) },
  complete() { console.log("completed"),
  error(err) { console.error(err) }
}
foo.subscribe(observer);

你看到observer被定义成了一个对象, 其实这才是完整的observer. 传入一个callback到observable.subcribe相当于传入了{ next: callback }

Subcription里的陷阱

Subscription是什么, 先上代码:

var observable = Rx.Observable.interval(1000);
var subscription = observable.subscribe(x => console.log(x));

setTimeout(() => {
  subscription.unsubscribe();
}, 3100)

运行结果:

0
1
2

Rx.Observable.interval可以返回一个能够发射(返回)0, 1, 2, 3..., n数字的Observable, 返回的时间间隔这里是1000ms。 第二行中的变量就是subscription。 subscription有一个unsubscribe方法, 这个方法可以让subscription订阅的observable发射的数据被observer忽略掉.通俗点说就是取消订阅。

unsubscribe存在一个陷阱。 先看代码:

var foo = Rx.Observable.create((observer) => {
  var i = 0
  setInterval(() => {
    observer.next(i++)
    console.log("hello")
  }, 1000)
})

const subcription = foo.subscribe((i) => console.log(i))
subcription.unsubscribe()

运行结果:

hello
hello
hello
......
hello

unsubscribe只会让observer忽略掉observable发射的数据,但是setInterval依然会继续执行。 这看起来似乎是一个愚蠢的设计。 所以不建议这样写。

Subject

Subject是一种能够发射数据给多个observer的Observable, 这让Subject看起来就好像是EventEmitter。 先上代码:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});
subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

subject.next(1);
subject.next(2);

运行结果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2

与Observable不同的是, Subject发射数据给多个observer。 其次, 定义subject的时候并没有传入callback, 这是因为subject自带next, complete, error等方法。从而可以发射数据给observer。 这和EventEmitter很类似。observer并不知道他subscribe的是Obervable还是Subject。 对observer来说是透明的。 而且Subject还有各种派生, 比如说:

BehaviorSubject 能够保留最近的数据,使得当有subscribe的时候,立马发射出去。看代码:

var subject = new Rx.BehaviorSubject(0); // 0 is the initial value

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});

subject.next(1);
subject.next(2);

subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

subject.next(3);

运行结果:

observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

ReplaySubject 能够保留最近的一些数据, 使得当有subscribe的时候,将这些数据发射出去。看代码:

var subject = new Rx.ReplaySubject(3); 

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

subject.next(5);

输出结果:

observerA: 1
observerA: 2
observerA: 3
observerA: 4
observerB: 2
observerB: 3
observerB: 4
observerA: 5
observerB: 5

第一行的声明表示ReplaySubject最大能够记录的数据的数量是3。

AsyncSubject 只会发射结束前的一个数据。 看代码:

var subject = new Rx.AsyncSubject();

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});

subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);

subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

subject.next(5);
subject.complete();

输出结果:

observerA: 5
observerB: 5

既然subject有next, error, complete三种方法, 那subject就可以作为observer! 看代码:

var subject = new Rx.Subject();

subject.subscribe({
  next: (v) => console.log("observerA: " + v)
});
subject.subscribe({
  next: (v) => console.log("observerB: " + v)
});

var observable = Rx.Observable.from([1, 2, 3]);

observable.subscribe(subject);

输出结果:

observerA: 1
observerB: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

也就是说, observable.subscribe可以传入一个subject来订阅其消息。 这就好像是Rxjs中的一颗语法糖, Rxjs有专门的实现。

Multicasted Observables 是一种借助Subject来将数据发射给多个observer的Observable。 看代码:

var source = Rx.Observable.from([1, 2, 3]);
var subject = new Rx.Subject();
var multicasted = source.multicast(subject);

multicasted.subscribe({
  next: (v) => console.log("observerA: " + v)
});
multicasted.subscribe({
  next: (v) => console.log("observerB: " + v)
});

multicasted.connect();

Rx.Observable.from能够逐一发射数组中的元素, 在multicasted.connect()调用之前的任何subscribe都不会导致source发射数据。multicasted.connect()相当于之前的observable.subscribe(subject)。因此不能将multicasted.connect()写在subscribe的前面。因为这会导致在执行multicasted.connect()的时候source发射数据, 但是subject又没保存数据, 导致两个subscribe无法接收到任何数据。

最好是第一个subscribe的时候能够得到当前已有的数据, 最后一个unsubscribe的时候就停止Observable的执行, 相当于Observable发射的数据都被忽略。

refCount就是能够返回这样的Observable的方法

var source = Rx.Observable.interval(500);
var subject = new Rx.Subject();
var refCounted = source.multicast(subject).refCount();
var subscription1, subscription2, subscriptionConnect;

console.log("observerA subscribed");
subscription1 = refCounted.subscribe({
  next: (v) => console.log("observerA: " + v)
});

setTimeout(() => {
  console.log("observerB subscribed");
  subscription2 = refCounted.subscribe({
    next: (v) => console.log("observerB: " + v)
  });
}, 600);

setTimeout(() => {
  console.log("observerA unsubscribed");
  subscription1.unsubscribe();
}, 1200);

setTimeout(() => {
  console.log("observerB unsubscribed");
  subscription2.unsubscribe();
}, 2000);

输出结果:

observerA subscribed
observerA: 0
observerB subscribed
observerA: 1
observerB: 1
observerA unsubscribed
observerB: 2
observerB unsubscribed
What"s Operators?

Observable上有很多方法, 比如说map, filter, merge等等。 他们基于调用它们的observable,返回一个全新的observable。 而且他们都是纯方法。 operators分为两种, instance operators 和 static operators。 instance operators是存在于observable实例上的方法, 也就是实例方法; static operators是存在于Observable这个类型上的方法, 也就是静态方法。Rxjs拥有很多强大的operators。

自己实现一个operators:

function multiplyByTen(input) {
  var output = Rx.Observable.create(function subscribe(observer) {
    input.subscribe({
      next: (v) => observer.next(10 * v),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
  });
  return output;
}

var input = Rx.Observable.from([1, 2, 3, 4]);
var output = multiplyByTen(input);
output.subscribe(x => console.log(x));

输出结果:

10
20
30
40
Rx.js实践
import React from "react";
import ReactDOM from "react-dom";
import Rx from "rx";

class Main extends React.Component {
  constructor (props) {
    super(props);
    this.state = {count: 0};
  }

  // Click events are now observables! No more proactive approach.
  componentDidMount () {
    const plusBtn = document.getElementById("plus");
    const minusBtn = document.getElementById("minus");

    const plus$ = Rx.Observable.fromEvent(plusBtn, "click").map(e => 1);
    const minus$ = Rx.Observable.fromEvent(minusBtn, "click").map(e => -1);

    Rx.Observable.merge(plus$, minus$).scan((acc, n) => acc + n)
      .subscribe(value => this.setState({count: value}));
  }

  render () {
    return (
        
count: {this.state.count}
); } } ReactDOM.render(
, document.getElementById("app"));

merge用于合并两个observable产生一个新的observable。 scan类似于Array中的reduce。 这个例子实现了点击plus的时候+1, 点击minus的时候-1。

Rx.js适用的场景

多个复杂的异步或事件组合在一起。

处理多个数据序列

假如没有被复杂的异步,事件, 数据序列困扰, 如果promise已经足够的话, 就没必要适用Rx.js。

Summary

Observable, Observer, Subscription, Subscrib, Subject概念。

RxJS适用于解决复杂的异步,事件问题。

文章参考

让我们一起来学习 RxJS ---by 饿了么前端

21-use-rxjs-for-orchestrating-asynchronous-and-event-based-computations

RxJS文档

RxJS 入门指引和初步应用 ---by 民工叔

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/81707.html

相关文章

  • 2018前端值得关注技术

    摘要:年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。自从谷歌提出后,就持续的获得了业界的关注,热度可见一斑。就在今年,谷歌也宣布将获得与安卓原生应用同等的待遇与权限。但是无论都值得关注。 1.前言 2017悄然过去,2018已经来到。人在进步,技术在发展。2018年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。下面就我个人的判断进行一个预测判断,希望能对大家...

    xiao7cn 评论0 收藏0
  • 2018前端值得关注技术

    摘要:年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。自从谷歌提出后,就持续的获得了业界的关注,热度可见一斑。就在今年,谷歌也宣布将获得与安卓原生应用同等的待遇与权限。但是无论都值得关注。 1.前言 2017悄然过去,2018已经来到。人在进步,技术在发展。2018年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。下面就我个人的判断进行一个预测判断,希望能对大家...

    用户84 评论0 收藏0
  • 【响应式编程思维艺术】 (1)Rxjs专题学习计划

    摘要:由于技术栈的学习,笔者需要在原来函数式编程知识的基础上,学习的使用。笔者在社区发现了一个非常高质量的响应式编程系列教程共篇,从基础概念到实际应用讲解的非常详细,有大量直观的大理石图来辅助理解流的处理,对培养响应式编程的思维方式有很大帮助。 showImg(https://segmentfault.com/img/bVus8n); [TOC] 一. 响应式编程 响应式编程,也称为流式编程...

    lscho 评论0 收藏0
  • 【CuteJavaScript】Angular6入门项目(3.编写服务和引入RxJS

    摘要:发布通过回调方法向发布事件。观察者一个回调函数的集合,它知道如何去监听由提供的值。 本文目录 一、项目起步 二、编写路由组件 三、编写页面组件 1.编写单一组件 2.模拟数据 3.编写主从组件 四、编写服务 1.为什么需要服务 2.编写服务 五、引入RxJS 1.关于RxJS 2.引入RxJS 3.改造数据获取方式 六、改造组件 1.添...

    RebeccaZhong 评论0 收藏0
  • RxJS Observable - 一个奇特函数

    摘要:形式上比普通函数直接返回值啰嗦一些。这里和的重要区别在于,模式下,可以决定什么时候返回值,以及返回几个值即调用回调函数的次数。 前言 RxJS 的 Observable 有点难理解,其实 RxJS 相关的概念都有点难理解。毕竟 RxJS 引入了响应式编程这种新的模式,会不习惯是正常的。不过总得去理解嘛,而认识新的事物时,如果能够参照一个合适的已知事物比对着,会比较容易理解吧。对于 Ob...

    omgdog 评论0 收藏0

发表评论

0条评论

jzzlee

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<