资讯专栏INFORMATION COLUMN

RxJS基础教程

defcon / 2891人阅读

摘要:是一个基于可观测数据流在异步编程应用中的库。正如官网所说,是基于观察者模式,迭代器模式和函数式编程。它具有时间与事件响应的概念。通知不再发送任何值。和通知可能只会在执行期间发生一次,并且只会执行其中的一个。

RxJS是一个基于可观测数据流在异步编程应用中的库。

</>复制代码

  1. ReactiveX is a combination of the best ideas from
    the Observer pattern, the Iterator pattern, and functional programming

正如官网所说,RxJS是基于观察者模式,迭代器模式和函数式编程。因此,首先要对这几个模式有所理解

观察者模式

</>复制代码

  1. window.addEventListener("click", function(){
  2. console.log("click!");
  3. })

JS的事件监听就是天生的观察者模式。给window的click事件(被观察者)绑定了一个listener(观察者),当事件发生,回调函数就会被触发

迭代器模式

迭代器模式,提供一种方法顺序访问一个聚合对象中的各种元素,而又不暴露该对象的内部表示。

ES6里的Iterator即可实现:

</>复制代码

  1. let arr = ["a", "b", "c"];
  2. let iter = arr[Symbol.iterator]();
  3. iter.next() // { value: "a", done: false }
  4. iter.next() // { value: "b", done: false }
  5. iter.next() // { value: "c", done: false }
  6. iter.next() // { value: undefined, done: true }

反复调用迭代对象的next方法,即可顺序访问

函数式编程

提到函数式编程,就要提到声明式编程和命令式编程
函数式编程是声明式编程的体现

问题:将数组[1, 2, 3]的每个元素乘以2,然后计算总和。

命令式编程

</>复制代码

  1. const arr = [1, 2, 3];
  2. let total = 0;
  3. for(let i = 0; i < arr.length; i++) {
  4. total += arr[i] * 2;
  5. }

声明式编程

</>复制代码

  1. const arr = [1, 2, 3];
  2. let total = arr.map(x => x * 2).reduce((total, value) => total + value)

声明式的特点是专注于描述结果本身,不关注到底怎么到达结果。而命令式就是真正实现结果的步骤

声明式编程把原始数据经过一系列转换(map, reduce),最后得到想要的数据

现在前端流行的MVC框架(Vue,React,Angular),也都是提倡:编写UI结构时使用声明式编程,在编写业务逻辑时使用命令式编程

RxJS

RxJS里有两个重要的概念需要我们理解:
Observable (可观察对象)
Observer (观察者)

</>复制代码

  1. var btn = document.getElementById("btn");
  2. var handler = function() {
  3. console.log("click");
  4. }
  5. btn.addEventListener("click", handler)

上面这个例子里:
btn这个DOM元素的click事件就是一个Observable
handler这个函数就是一个Observer,当btn的click事件被触发,就会调用该函数

改用RxJS编写;

</>复制代码

  1. Rx.Observable.fromEvent(btn, "click")
  2. .subscribe(() => console.log("click"));

fromEvent把一个event转成了一个Observable,然后它就可以被订阅subscribe

流stream

Observable其实就是数据流stream
是在时间流逝的过程中产生的一系列事件。它具有时间与事件响应的概念。

我们可以把一切输入都当做数据流来处理,比如说:

用户操作

网络响应

定时器

Worker

产生新流

当产生了一个流后,我们可以通过操作符(Operator)对这个流进行一系列加工操作,然后产生一个新的流

</>复制代码

  1. Rx.Observable.fromEvent(window, "click")
  2. .map(e => 1)
  3. .scan((total, now) => total + now)
  4. .subscribe(value => {
  5. console.log(value)
  6. })

map把流转换成了一个每次产生1的新流,然后scan类似reduce,也会产生一个新流,最后这个流被订阅。最终实现了:每次点击累加1的效果

可以用一个效果图来表示该过程:

合并流

也可以对若干个数据流进行组合:

例子:我们要实现下面这个效果:

</>复制代码

  1. Rx.Observable.fromEvent(document.querySelector("input[name=plus]"), "click")
  2. .mapTo(1)
  3. .merge(
  4. Rx.Observable.fromEvent(document.querySelector("input[name=minus]"), "click")
  5. .mapTo(-1)
  6. )
  7. .scan((total, now) => total + now)
  8. .subscribe(value => {
  9. document.querySelector("#counter").innerText = value;
  10. })

merge可以把两个数据流整个在一起,效果可以参考如下:

刚才那个例子的数据流如下:

以RxJS的写法,就是把按下加1当成一个数据流,把按下减1当成一个数据流,再通过merge把两个数据流合并,最后通过scan操作符,把新流上的数据累加,这就是我们想要的计数器效果

扁平化流

有时候,我们的Observable送出的是一个新的Observable:

</>复制代码

  1. var click = Rx.Observable.fromEvent(document.body, "click");
  2. var source = click.map(e => Rx.Observable.of(1, 2, 3));
  3. source.subscribe(value => {
  4. console.log(value)
  5. });

这里,console打印出来的是对象,而不是我们想要的1,2,3,这是因为map返回的Rx.Observable.of(1, 2, 3)本身也是个Observable

用图表示如下:

</>复制代码

  1. click : ------c------------c--------
  2. map(e => Rx.Observable.of(1,2,3))
  3. source : ------o------------o--------
  4. (123)| (123)|

因此,我们订阅到的value值就是一个Observable对象,而不是普通数据1,2,3

我想要的其实不是Observable本身,而是属于这个Observable里面的那些东西,现在这个情形就是Observable里面又有Observable,有两层,可是我想要让它变成一层就好,该怎么办呢?

这就需要把Observable扁平化

</>复制代码

  1. const arr = [1, [2, 3], 4];
  2. // 扁平化后:
  3. const flatArr = [1, 2, 3, 4];

concatAll这个操作符就可以把Observable扁平化

</>复制代码

  1. var click = Rx.Observable.fromEvent(document.body, "click");
  2. var source = click.map(e => Rx.Observable.of(1, 2, 3));
  3. var example = source.concatAll();
  4. example.subscribe(value => {
  5. console.log(value)
  6. })

</>复制代码

  1. click : ------c------------c--------
  2. map(e => Rx.Observable.of(1,2,3))
  3. source : ------o------------o--------
  4. (123)| (123)|
  5. concatAll()
  6. example: ------(123)--------(123)------------

flatMap操作符也可以实现同样的作用,就是写法有些不同:

</>复制代码

  1. var click = Rx.Observable.fromEvent(document.body, "click");
  2. var source = click.flatMap(e => Rx.Observable.of(1, 2, 3));
  3. source.subscribe(value => {
  4. console.log(value)
  5. })

</>复制代码

  1. click : ------c------------c--------
  2. flatMap(e => Rx.Observable.of(1,2,3))
  3. source: ------(123)--------(123)------------
简单拖拽实例

学完前面几个操作符,我们就可以写一个简单的实例了

拖拽的原理是:

监听拖拽元素的mousedown

监听body的mousemove

监听body的mouseup

</>复制代码

</>复制代码

  1. const mouseDown = Rx.Observable.fromEvent(dragDOM, "mousedown");
  2. const mouseUp = Rx.Observable.fromEvent(body, "mouseup");
  3. const mouseMove = Rx.Observable.fromEvent(body, "mousemove");

首先给出3个Observable,分别代表3种事件,我们希望mousedown的时候监听mousemove,然后mouseup时停止监听,于是RxJS可以这么写:

</>复制代码

  1. const source = mouseDown
  2. .map(event => mouseMove.takeUntil(mouseUp))

takeUntil操作符可以在某个条件符合时,发送complete事件

</>复制代码

  1. source: -------e--------------e-----
  2. --m-m-m-m| -m--m-m--m-m|

从图上可以看出,我们还需要把source扁平化,才能获取所需数据。

完整代码:

</>复制代码

  1. const dragDOM = document.getElementById("drag");
  2. const body = document.body;
  3. const mouseDown = Rx.Observable.fromEvent(dragDOM, "mousedown");
  4. const mouseUp = Rx.Observable.fromEvent(body, "mouseup");
  5. const mouseMove = Rx.Observable.fromEvent(body, "mousemove");
  6. mouseDown
  7. .flatMap(event => mouseMove.takeUntil(mouseUp))
  8. .map(event => ({ x: event.clientX, y: event.clientY }))
  9. .subscribe(pos => {
  10. dragDOM.style.left = pos.x + "px";
  11. dragDOM.style.top = pos.y + "px";
  12. })
Observable Observer

前面的例子,我们都在讨论fromEvent转换的Observable,其实还有很多种方法产生一个Observable,其中create也是一种常见的方法,可以用来创建自定义的Observable

</>复制代码

  1. var observable = Rx.Observable.create(function (observer) {
  2. observer.next(1);
  3. observer.next(2);
  4. observer.next(3);
  5. setTimeout(() => {
  6. observer.next(4);
  7. observer.complete();
  8. }, 1000);
  9. });
  10. console.log("just before subscribe");
  11. observable.subscribe({
  12. next: x => console.log("got value " + x),
  13. error: err => console.error("something wrong occurred: " + err),
  14. complete: () => console.log("done"),
  15. });
  16. console.log("just after subscribe");

控制台执行的结果:

</>复制代码

  1. just before subscribe
  2. got value 1
  3. got value 2
  4. got value 3
  5. just after subscribe
  6. got value 4
  7. done

Observable 执行可以传递三种类型的值:

"Next" 通知: 发送一个值,比如数字、字符串、对象,等等。
"Error" 通知: 发送一个 JavaScript 错误 或 异常。
"Complete" 通知: 不再发送任何值。
"Next" 通知是最重要,也是最常见的类型:它们表示传递给观察者的实际数据。"Error" 和 "Complete" 通知可能只会在 Observable 执行期间发生一次,并且只会执行其中的一个。

</>复制代码

  1. var observable = Rx.Observable.create(function subscribe(observer) {
  2. try {
  3. observer.next(1);
  4. observer.next(2);
  5. observer.next(3);
  6. observer.complete();
  7. } catch (err) {
  8. observer.error(err); // 如果捕获到异常会发送一个错误
  9. }
  10. });

Observer观察者只是一组回调函数的集合,每个回调函数对应一种 Observable 发送的通知类型:next、error 和 complete 。

</>复制代码

  1. var observer = {
  2. next: x => console.log("Observer got a next value: " + x),
  3. error: err => console.error("Observer got an error: " + err),
  4. complete: () => console.log("Observer got a complete notification"),
  5. };

Observer和Observable是通过subscribe方法建立联系的

</>复制代码

  1. observable.subscribe(observer);
unsubscribe

observer订阅了Observable之后,还可以取消订阅

</>复制代码

  1. var observable = Rx.Observable.from([10, 20, 30]);
  2. var subscription = observable.subscribe(x => console.log(x));
  3. // 稍后:
  4. subscription.unsubscribe();

unsubscribe陷阱:

</>复制代码

  1. let stream$ = new Rx.Observable.create((observer) => {
  2. let i = 0;
  3. let id = setInterval(() => {
  4. console.log("setInterval");
  5. observer.next(i++);
  6. },1000)
  7. })
  8. let subscription = stream$.subscribe((value) => {
  9. console.log("Value", value)
  10. });
  11. setTimeout(() => {
  12. subscription.unsubscribe();
  13. }, 3000)

3秒后虽然取消了订阅,但是开启的setInterval定时器并不会自动清理,我们需要自己返回一个清理函数

</>复制代码

  1. let stream$ = new Rx.Observable.create((observer) => {
  2. let i = 0;
  3. let id = setInterval(() => {
  4. observer.next(i++);
  5. },1000)
  6. // 返回了一个清理函数
  7. return function(){
  8. clearInterval( id );
  9. }
  10. })
  11. let subscription = stream$.subscribe((value) => {
  12. console.log("Value", value)
  13. });
  14. setTimeout(() => {
  15. subscription.unsubscribe() // 在这我们调用了清理函数
  16. }, 3000)
Ajax异步操作

</>复制代码

</>复制代码

  1. function sendRequest(search) {
  2. return Rx.Observable.ajax.getJSON(`http://deepred5.com/cors.php?search=${search}`)
  3. .map(response => response)
  4. }
  5. Rx.Observable.fromEvent(document.querySelector("input"), "keyup")
  6. .map(e => e.target.value)
  7. .flatMap(search => sendRequest(search))
  8. .subscribe(value => {
  9. console.log(value)
  10. })

用户每次在input框每次进行输入,均会触发ajax请求,并且每个ajax返回的值都会被打印一遍

现在需要实现这样一个功能:
希望用户在300ms以内停止输入,才发送请求(防抖),并且console打印出来的值只要最近的一个ajax返回的

</>复制代码

  1. Rx.Observable.fromEvent(document.querySelector("input"), "keyup")
  2. .debounceTime(300)
  3. .map(e => e.target.value)
  4. .switchMap(search => sendRequest(search))
  5. .subscribe(value => {
  6. console.log(value)
  7. })

debounceTime表示经过n毫秒后,没有流入新值,那么才将值转入下一个环节
switchMap能取消上一个已无用的请求,只保留最后的请求结果流,这样就确保处理展示的是最后的搜索的结果

可以看到,RxJS对异步的处理是非常优秀的,对异步的结果能进行各种复杂的处理和筛选。

React + Redux 的异步解決方案:redux-observable

Redux的action都是同步的,所以默认情况下也只能处理同步数据流。

为了生成异步action,处理异步数据流,有许多不同的解決方案,例如 redux-thunk、redux-promise、redux-saga 等等。

redux-thunk举例:

调用一个异步API,首先要先定义三个同步action构造函数,分别表示

请求开始

请求成功

请求失败

然后再定义一个异步action构造函数,该函数不再是返回普通的对象,而是返回一个函数,在这个函数里,进行ajax异步操作,然后根据返回的成功和失败,分别调用前面定义的同步action

actions.js

</>复制代码

  1. export const FETCH_STARTED = "WEATHER/FETCH_STARTED";
  2. export const FETCH_SUCCESS = "WEATHER/FETCH_SUCCESS";
  3. export const FETCH_FAILURE = "WEATHER/FETCH_FAILURE";
  4. // 普通action构造函数,返回普通对象
  5. export const fetchWeatherStarted = () => ({
  6. type: FETCH_STARTED
  7. });
  8. export const fetchWeatherSuccess = (result) => ({
  9. type: FETCH_SUCCESS,
  10. result
  11. })
  12. export const fetchWeatherFailure = (error) => ({
  13. type: FETCH_FAILURE,
  14. error
  15. })
  16. // 异步action构造函数,返回一个函数
  17. export const fetchWeather = (cityCode) => {
  18. return (dispatch) => {
  19. const apiUrl = `/data/cityinfo/${cityCode}.html`;
  20. dispatch(fetchWeatherStarted())
  21. return fetch(apiUrl).then((response) => {
  22. if (response.status !== 200) {
  23. throw new Error("Fail to get response with status " + response.status);
  24. }
  25. response.json().then((responseJson) => {
  26. dispatch(fetchWeatherSuccess(responseJson.weatherinfo));
  27. }).catch((error) => {
  28. dispatch(fetchWeatherFailure(error));
  29. });
  30. }).catch((error) => {
  31. dispatch(fetchWeatherFailure(error));
  32. })
  33. };
  34. }

现在如果想要异步请求,只要:

</>复制代码

  1. // fetchWeather是个异步action构造函数
  2. dispatch(fetchWeather("23333"));

我们再来看看redux-observable:

调用一个异步API,不再需要定义一个异步action构造函数,所有的action构造函数都只是返回普通的对象

那么ajax请求在哪里发送?

答案是在Epic进行异步操作

</>复制代码

  1. Epic是redux-observable的核心原语。
    它是一个函数,接收 actions 流作为参数并且返回 actions 流。 Actions 入, actions 出.

</>复制代码

  1. export const FETCH_STARTED = "WEATHER/FETCH_STARTED";
  2. export const FETCH_SUCCESS = "WEATHER/FETCH_SUCCESS";
  3. export const FETCH_FAILURE = "WEATHER/FETCH_FAILURE";
  4. export const fetchWeather = cityCode => ({ type: FETCH_STARTED, cityCode });
  5. export const fetchWeatherSuccess = result => (
  6. { type: FETCH_SUCCESS, result };
  7. );
  8. export const fetchWeatherFailure = (error) => (
  9. {
  10. type: FETCH_FAILURE,
  11. error
  12. }
  13. )
  14. export const fetchWeatherEpic = action$ =>
  15. action$.ofType(FETCH_STARTED)
  16. .mergeMap(action =>
  17. ajax.getJSON(`/data/cityinfo/${action.cityCode}.html`)
  18. .map(response => fetchWeatherSuccess(response.weatherinfo))
  19. // 这个处理异常的action必须使用Observable.of方法转为一个observable
  20. .catch(error => Observable.of(fetchWeatherFailure(error)))
  21. );

现在如果想要异步请求,只要:

</>复制代码

  1. // fetchWeather只是个普通的action构造函数
  2. dispatch(fetchWeather("23333"));

相较于thunk中间件,使用redux-observable来处理异步action,有以下优点:

不需要修改action构造函数,返回的仍然是普通对象

epics中间件会将action封装成Observable对象,可以使用RxJs的相应api来控制异步流程,它就像一个拥有许多高级功能的Promise,现在我们在Redux中也可以得到它的好处。

总结

原生JS传统解决异步的方式:callback、Generator、Promise、async/await

RxJS解决的是数据流的问题,它可以让批量数据处理起来更方便

可以想象的一些使用场景:

多个服务端实时消息流,通过RxJS进行高阶处理,最后到 view 层就是很清晰的一个Observable,但是view层本身处理用户事件依然可以沿用原有的范式。

爬虫抓取,每次对一个网站的前5页做平行请求,每个请求如果失败就重试,重试3次之后再放弃。

可以看出,这种需要对流进行复杂操作的场景更加适合RxJS

公司内部目前的大部分系统,前端就可能不太适合用RxJS,因为大部分是后台CRUD系统,整体性、实时性的要求都不高,并且也没有特别复杂的数据流操作

我们推荐在适合RxJS的地方用RxJS,但是不强求RxJS for everything。RxJS给了我们另一种思考和解决问题的方式,但这不一定是必要的

参考

构建流式应用—RxJS详解

希望是最淺顯易懂的RxJS教學

RxJS入门指引和初步应用

30天精通RxJS系列

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/93518.html

相关文章

  • 【响应式编程的思维艺术】 (1)Rxjs专题学习计划

    摘要:由于技术栈的学习,笔者需要在原来函数式编程知识的基础上,学习的使用。笔者在社区发现了一个非常高质量的响应式编程系列教程共篇,从基础概念到实际应用讲解的非常详细,有大量直观的大理石图来辅助理解流的处理,对培养响应式编程的思维方式有很大帮助。 showImg(https://segmentfault.com/img/bVus8n); [TOC] 一. 响应式编程 响应式编程,也称为流式编程...

    lscho 评论0 收藏0
  • 学习实践 - 收藏集 - 掘金

    摘要:官网地址聊天机器人插件开发实例教程一创建插件在系统技巧使你的更加专业前端掘金一个帮你提升技巧的收藏集。我会简单基于的简洁视频播放器组件前端掘金使用和实现购物车场景前端掘金本文是上篇文章的序章,一直想有机会再次实践下。 2道面试题:输入URL按回车&HTTP2 - 掘金通过几轮面试,我发现真正那种问答的技术面,写一堆项目真不如去刷技术文章作用大,因此刷了一段时间的博客和掘金,整理下曾经被...

    mikyou 评论0 收藏0
  • 2018前端值得关注的技术

    摘要:年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。自从谷歌提出后,就持续的获得了业界的关注,热度可见一斑。就在今年,谷歌也宣布将获得与安卓原生应用同等的待遇与权限。但是无论都值得关注。 1.前言 2017悄然过去,2018已经来到。人在进步,技术在发展。2018年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。下面就我个人的判断进行一个预测判断,希望能对大家...

    xiao7cn 评论0 收藏0
  • 2018前端值得关注的技术

    摘要:年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。自从谷歌提出后,就持续的获得了业界的关注,热度可见一斑。就在今年,谷歌也宣布将获得与安卓原生应用同等的待遇与权限。但是无论都值得关注。 1.前言 2017悄然过去,2018已经来到。人在进步,技术在发展。2018年前端有哪些领域,技术值得关注,哪些技术会兴起,哪些技术会没落。下面就我个人的判断进行一个预测判断,希望能对大家...

    用户84 评论0 收藏0
  • 原理解释 - 收藏集 - 掘金

    摘要:巧前端基础进阶全方位解读前端掘金我们在学习的过程中,由于对一些概念理解得不是很清楚,但是又想要通过一些方式把它记下来,于是就很容易草率的给这些概念定下一些方便自己记忆的有偏差的结论。 计算机程序的思维逻辑 (83) - 并发总结 - 掘金从65节到82节,我们用了18篇文章讨论并发,本节进行简要总结。 多线程开发有两个核心问题,一个是竞争,另一个是协作。竞争会出现线程安全问题,所以,本...

    AlphaGooo 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<