摘要:技术积累经过社区的努力学习资料还是很多的,官方中文文档就已经很不错,不过我们先从天精通初步感受一下然后配合一些中文文档来补充知识点,最后再根据官方文档来校验整个知识体系。资料学习操作符的时候可以对照弹珠图的交互弹珠图的中文版中文文档
前言
最近准备毕设,技术选型的时候因为功能的一些需求准备将RxJs融入到项目中,考虑RxJs的时候因为之前的技术栈还犹豫了一下,查了一些资料以及粗略浏览了一些文档。感觉对于毕设项目RxJs的加入是有帮助的,因此打算系统的学习然后摘抄知识点以及实践一些demo做技术积累。
RxJS技术积累RxJs经过社区的努力学习资料还是很多的,官方中文文档就已经很不错,不过我们先从30 天精通 RxJS初步感受一下RxJS.然后配合一些中文文档来补充知识点,最后再根据官方文档来校验整个知识体系。
RxJS 基本介绍RxJS是一套由Observable sequences来组合异步行为和事件基础程序的Library
RxJS 是Functional Programming 跟Reactive Programming 的结合
把每个运算包成一个个不同的function,并用这些function 组合出我们要的结果,这就是最简单的Functional Programming
Functional Programming 强调没有Side Effect,也就是function 要保持纯粹,只做运算并返回一个值,没有其他额外的行为。
Side Effect
Side Effect是指一个function做了跟本身运算返回值没有关系的事,比如说修改某个全域变数,或是修改传入参数的值,甚至是执行console.log都算是Side Effect。
前端常见的Side Effect:
发送http request
在画面输出值或是log
获得用户的input
Query DOM
Reactive Programming简单来说就是当变数或资源发生变动时,由变数或资源自动告诉我发生变动了
Observable Observer Pattern(观察者模式)Observer Pattern 其实很常遇到,许多API 的设计上都用了Observer Pattern,最简单的例子就是DOM 物件的事件监听:
function clickHandler(event) { console.log("user click!"); } document.body.addEventListener("click", clickHandler)
观察者模式:我们可以对某件事注册监听,并在事件发生时,自动执行我们注册的监听者(listener)。
Es5版本:
function Producer() { // 这个 if 只是避免使用者不小心把 Producer 当做函数调用 if(!(this instanceof Producer)) { throw new Error("请用 new Producer()!"); } this.listeners = []; } // 加入监听的方法 Producer.prototype.addListener = function(listener) { if(typeof listener === "function") { this.listeners.push(listener) } else { throw new Error("listener 必须是 function") } } // 移除监听的方法 Producer.prototype.removeListener = function(listener) { this.listeners.splice(this.listeners.indexOf(listener), 1) } // 发送通知的方法 Producer.prototype.notify = function(message) { this.listeners.forEach(listener => { listener(message); }) }
es6 版本
class Producer { constructor() { this.listeners = []; } addListener(listener) { if(typeof listener === "function") { this.listeners.push(listener) } else { throw new Error("listener 必须是 function") } } removeListener(listener) { this.listeners.splice(this.listeners.indexOf(listener), 1) } notify(message) { this.listeners.forEach(listener => { listener(message); }) } }
调用例子:
var egghead = new Producer(); function listener1(message) { console.log(message + "from listener1"); } function listener2(message) { console.log(message + "from listener2"); } egghead.addListener(listener1);egghead.addListener(listener2); egghead.notify("A new course!!")
输出:
a new course!! from listener1
a new course!! from listener2
JavaScript 到了ES6 才有原生的Iterator在ECMAScript中Iterator最早其实是要采用类似Python的Iterator规范,就是Iterator在没有元素之后,执行next会直接抛出错误;但后来经过一段时间讨论后,决定采更functional的做法,改成在取得最后一个元素之后执行next永远都回传{ done: true, value: undefined }
var arr = [1, 2, 3]; var iterator = arr[Symbol.iterator](); iterator.next(); // { value: 1, done: false } iterator.next(); // { value: 2, done: false } iterator.next(); // { value: 3, done: false } iterator.next(); // { value: undefined, done: true }
简单实现:
es5: function IteratorFromArray(arr) { if(!(this instanceof IteratorFromArray)) { throw new Error("请用 new IteratorFromArray()!"); } this._array = arr; this._cursor = 0; } IteratorFromArray.prototype.next = function() { return this._cursor < this._array.length ? { value: this._array[this._cursor++], done: false } : { done: true }; } es6: class IteratorFromArray { constructor(arr) { this._array = arr; this._cursor = 0; } next() { return this._cursor < this._array.length ? { value: this._array[this._cursor++], done: false } : { done: true }; } }
优势
Iterator的特性可以拿来做延迟运算(Lazy evaluation),让我们能用它来处理大数组。
第二因为iterator 本身是序列,所以可以第调用方法像map, filter... 等!
延迟运算(Lazy evaluation)
function* getNumbers(words) { for (let word of words) { if (/^[0-9]+$/.test(word)) { yield parseInt(word, 10); } } } const iterator = getNumbers("30 天精通 RxJS (04)"); iterator.next(); // { value: 3, done: false } iterator.next(); // { value: 0, done: false } iterator.next(); // { value: 0, done: false } iterator.next(); // { value: 4, done: false } iterator.next(); // { value: undefined, done: true }
把一个字串丢进getNumbersh函数时,并没有马上运算出字串中的所有数字,必须等到我们执行next()时,才会真的做运算,这就是所谓的延迟运算(evaluation strategy)Observable简介
Observer跟Iterator有个共通的特性,就是他们都是渐进式 (progressive)的取得资料,差别只在于Observer是生产者(Producer)推送资料(push ),而Iterator是消费者(Consumer)请求资料(pull)!
Observable其实就是这两个Pattern思想的结合,Observable具备生产者推送资料的特性,同时能像序列,拥有序列处理资料的方法 (map, filter...)!
RxJS说白了就是一个核心三个重点。
一个核心是Observable 再加上相关的Operators(map, filter...),这个部份是最重要的,其他三个重点本质上也是围绕着这个核心在转,所以我们会花将近20 天的篇数讲这个部份的观念及使用案例。
另外三个重点分别是
Observer
Subject
Schedulers
Observable 实践Observable 同时可以处理同步与异步的行为!
同步操作 var observable = Rx.Observable .create(function(observer) { observer.next("Jerry"); observer.next("Anna"); }) // 订阅 observable observable.subscribe(function(value) { console.log(value); }) > Jerry > Anna 异步操作: var observable = Rx.Observable .create(function(observer) { observer.next("Jerry"); // RxJS 4.x 以前的版本用 onNext observer.next("Anna"); setTimeout(() => { observer.next("RxJS 30 days!"); }, 30) }) console.log("start"); observable.subscribe(function(value) { console.log(value); }); console.log("end"); > start Jerry Anna end RxJS 30 days!观察者Observer
Observable 可以被订阅(subscribe),或说可以被观察,而订阅Observable的又称为观察者(Observer)。
观察者是一个具有三个方法(method)的对象,每当Observable 发生事件时,便会呼叫观察者相对应的方法。
next:每当Observable 发送出新的值,next 方法就会被呼叫。
complete:在Observable 没有其他的资料可以取得时,complete 方法就会被呼叫,在complete 被呼叫之后,next 方法就不会再起作用。
error:每当Observable 内发生错误时,error 方法就会被呼叫。
var observable = Rx.Observable .create(function(observer) { observer.next("Jerry"); observer.next("Anna"); observer.complete(); observer.next("not work"); }) // 定义一个观察者 var observer = { next: function(value) { console.log(value); }, error: function(error) { console.log(error) }, complete: function() { console.log("complete") } } // 订阅 observable observable.subscribe(observer) > Jerry Anna complete // complete执行后,next就会自动失效,所以没有印出not work。 捕获错误实例: var observable = Rx.Observable .create(function(observer) { try { observer.next("Jerry"); observer.next("Anna"); throw "some exception"; } catch(e) { observer.error(e) } }); var observer = { next: function(value) { console.log(value); }, error: function(error) { console.log("Error: ", error) }, complete: function() { console.log("complete") } } observable observable.subscribe(observer) > Jerry Anna Error: some exception
观察者可以是不完整的,他可以只具有一个next 方法
订阅一个Observable 就像是执行一个function
Operator操作符Operators 就是一个个被附加到Observable 型别的函数,例如像是map, filter, contactAll... 等等每个operator都会回传一个新的observable,而我们可以透过create的方法建立各种operator
Observable 有许多创建实例的方法,称为creation operator。下面我们列出RxJS 常用的creation operator:
create of from fromEvent fromPromise never empty throw interval timer
当我们想要同步的传递几个值时,就可以用of这个operator来简洁的表达!
var source = Rx.Observable.of("Jerry", "Anna"); source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } }); // Jerry // Anna // complete!
用from来接收任何可枚举的参数(Set, WeakSet, Iterator 等都可)
var arr = ["Jerry", "Anna", 2016, 2017, "30 days"] var source = Rx.Observable.from(arr); source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } }); // Jerry // Anna // 2016 // 2017 // 30 days // complete! var source = Rx.Observable.from("123"); source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } }); // 1 // 2 // 3 // complete! var source = Rx.Observable .from(new Promise((resolve, reject) => { setTimeout(() => { resolve("Hello RxJS!"); },3000) })) source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } }); // Hello RxJS! // complete!
可以用Event建立Observable,通过fromEvent的方法
var source = Rx.Observable.fromEvent(document.body, "click"); source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } });
fromEvent的第一个参数要传入DOM ,第二个参数传入要监听的事件名称。上面的代码会针对body 的click 事件做监听,每当点击body 就会印出event。
获取 DOM 的常用方法:
document.getElementById()
document.querySelector()
document.getElementsByTagName()
document.getElementsByClassName()
Event来建立Observable实例还有另一个方法fromEventPattern,这个方法是给类事件使用
所谓的类事件就是指其行为跟事件相像,同时具有注册监听及移除监听两种行为,就像DOM Event有addEventListener及removeEventListener一样
class Producer { constructor() { this.listeners = []; } addListener(listener) { if(typeof listener === "function") { this.listeners.push(listener) } else { throw new Error("listener 必須是 function") } } removeListener(listener) { this.listeners.splice(this.listeners.indexOf(listener), 1) } notify(message) { this.listeners.forEach(listener => { listener(message); }) } } var egghead = new Producer(); var source = Rx.Observable .fromEventPattern( (handler) => egghead.addListener(handler), (handler) => egghead.removeListener(handler) ); source.subscribe({ next: function(value) { console.log(value) }, complete: function() { console.log("complete!"); }, error: function(error) { console.log(error) } }) egghead.notify("Hello! Can you hear me?");
字数受限,可以去博客看完整版
Subject简介Subject 可以拿去订阅Observable(source) 代表他是一个Observer,同时Subject 又可以被Observer(observerA, observerB) 订阅,代表他是一个Observable。
Subject 同时是Observable 又是ObserverSubject应用Subject 会对内部的observers 清单进行组播(multicast)
Subject 在内部管理一份observer 的清单,并在接收到值时遍历这份清单并送出值
var subject = new Rx.Subject(); var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } subject.subscribe(observerA); subject.subscribe(observerB); subject.next(1); // "A next: 1" // "B next: 1" subject.next(2); // "A next: 2" // "B next: 2"
这里我们可以直接用subject 的next 方法传送值,所有订阅的observer 就会接收到,又因为Subject 本身是Observable,所以这样的使用方式很适合用在某些无法直接使用Observable 的前端框架中,例如在React 想对DOM 的事件做监听
class MyButton extends React.Component { constructor(props) { super(props); this.state = { count: 0 }; this.subject = new Rx.Subject(); this.subject .mapTo(1) .scan((origin, next) => origin + next) .subscribe(x => { this.setState({ count: x }) }) } render() { return } }
BehaviorSubject
BehaviorSubject 是用来呈现当前的值,而不是单纯的发送事件。BehaviorSubject 会记住最新一次发送的元素,并把该元素当作目前的值,在使用上BehaviorSubject 建构式需要传入一个参数来代表起始的状态
// BehaviorSubject 在建立时就需要给定一个状态,并在之后任何一次订阅,就会先送出最新的状态。其实这种行为就是一种状态的表达而非单存的事件,就像是年龄跟生日一样,年龄是一种状态而生日就是事件;所以当我们想要用一个stream 来表达年龄时,就应该用BehaviorSubject 。 var subject = new Rx.BehaviorSubject(0); // 0 var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } subject.subscribe(observerA); // "A next: 0" subject.next(1); // "A next: 1" subject.next(2); // "A next: 2" subject.next(3); // "A next: 3" setTimeout(() => { subject.subscribe(observerB); // "B next: 3" },3000)
ReplaySubject
在新订阅时重新发送最后的几个元素,这时我们就可以用ReplaySubject
var subject = new Rx.ReplaySubject(2); // 重复发送最后俩个元素 var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } subject.subscribe(observerA); subject.next(1); // "A next: 1" subject.next(2); // "A next: 2" subject.next(3); // "A next: 3" setTimeout(() => { subject.subscribe(observerB); // "B next: 2" // "B next: 3" },3000)
AsyncSubject
在subject结束后送出最后一个值
var subject = new Rx.AsyncSubject(); var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } subject.subscribe(observerA); subject.next(1); subject.next(2); subject.next(3); subject.complete(); // "A next: 3" // "A complete!" setTimeout(() => { subject.subscribe(observerB); // "B next: 3" // "B complete!" },3000)Observable and Subject
multicast
multicast 可以用来挂载subject 并回传一个可连结(connectable)的observable
var source = Rx.Observable.interval(1000) .take(3) .multicast(new Rx.Subject()); var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } source.subscribe(observerA); // subject.subscribe(observerA) source.connect(); // source.subscribe(subject) setTimeout(() => { source.subscribe(observerB); // subject.subscribe(observerB) }, 1000);
必须真的等到执行connect()后才会真的用subject订阅source,并开始送出元素,如果没有执行connect()observable是不会真正执行的。
var source = Rx.Observable.interval(1000) .do(x => console.log("send: " + x)) .multicast(new Rx.Subject()); // 無限的 observable var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } var subscriptionA = source.subscribe(observerA); var realSubscription = source.connect(); var subscriptionB; setTimeout(() => { subscriptionB = source.subscribe(observerB); }, 1000); setTimeout(() => { subscriptionA.unsubscribe(); subscriptionB.unsubscribe(); // 虽然A,B退订,但是时间流还是继续执行 }, 5000); setTimeout(() => { realSubscription.unsubscribe(); // 这里才会真正的退订 }, 7000);
refCount
建立一个只要有订阅就会自动connect 的observable
var source = Rx.Observable.interval(1000) .do(x => console.log("send: " + x)) .multicast(new Rx.Subject()) .refCount(); var observerA = { next: value => console.log("A next: " + value), error: error => console.log("A error: " + error), complete: () => console.log("A complete!") } var observerB = { next: value => console.log("B next: " + value), error: error => console.log("B error: " + error), complete: () => console.log("B complete!") } var subscriptionA = source.subscribe(observerA); // 当source 一被observerA 订阅时(订阅数从0 变成1),就会立即执行并发送元素 var subscriptionB; setTimeout(() => { subscriptionB = source.subscribe(observerB); }, 1000); setTimeout(() => { subscriptionA.unsubscribe(); // 订阅减一 subscriptionB.unsubscribe(); // 订阅为0,停止发送 }, 5000);
publish
等价于 multicast(new Rx.Subject())
var source = Rx.Observable.interval(1000) .publish() .refCount(); // var source = Rx.Observable.interval(1000) // .multicast(new Rx.Subject()) // .refCount(); var source = Rx.Observable.interval(1000) .publishReplay(1) .refCount(); // var source = Rx.Observable.interval(1000) // .multicast(new Rx.ReplaySubject(1)) // .refCount(); var source = Rx.Observable.interval(1000) .publishBehavior(0) .refCount(); // var source = Rx.Observable.interval(1000) // .multicast(new Rx.BehaviorSubject(0)) // .refCount(); var source = Rx.Observable.interval(1000) .publishLast() .refCount(); // var source = Rx.Observable.interval(1000) // .multicast(new Rx.AsyncSubject(1)) // .refCount();
share
等价于 publish + refCount
var source = Rx.Observable.interval(1000) .share(); // var source = Rx.Observable.interval(1000) // .publish() // .refCount(); // var source = Rx.Observable.interval(1000) // .multicast(new Rx.Subject()) // .refCount();Scheduler Scheduler简介
Scheduler 控制一个observable 的订阅什么时候开始,以及发送元素什么时候送达,主要由以下三个元素所组成
Scheduler 是一个对象结构。它知道如何根据优先级或其他标准来储存并执行任务。 Scheduler 是一个执行环境。它意味着任务何时何地被执行,比如像是立即执行、在回调(callback)中执行、setTimeout 中执行、animation frame 中执行 Scheduler是一个虚拟时钟。它透过now()这个方法提供了时间的概念,我们可以让任务在特定的时间点被执行。
// Scheduler 会影响Observable 开始执行及元素送达的时机 var observable = Rx.Observable.create(function (observer) { observer.next(1); observer.next(2); observer.next(3); observer.complete(); }); console.log("before subscribe"); observable.observeOn(Rx.Scheduler.async) // 设为 async .subscribe({ next: (value) => { console.log(value); }, error: (err) => { console.log("Error: " + err); }, complete: () => { console.log("complete"); } }); console.log("after subscribe");项目中的RxJs
在项目中RxJs可以通过库的形式引用,也可以引用结合了框架的组合。
通过之前的学习,对RxJs有了一定的了解。对我而言RxJS最好的应用场景就是复杂的UI交互。
而且在学习RxJS的资料中,很多典型的Demo都是:
拖拽交互
Auto Complete
等等
利用RxJS能把我们以前需要写很多判断,很多逻辑的UI交互都简化了,通过它自带的一套Stream的用法,可以利用更少的代码完成以前的复杂的工作量,提供了开发效率。
RxJS同时能应用在组件状态管理中,可以参考Reactive 视角审视 React 组件
在React中,内部通过setState管理状态。状态的变更可以依赖RxJS流,在需要的Response中setState即可。
其他方案可以自行根据项目需求加入,需要就引入,不需要就不要加,不要为RxJS而RxJS.
还要注意的是RxJS的操作符非常强大,但是数量极多,因此一开始开发入门的时候先设计好逻辑再去查文档。
官方的example有很多例子可以参考应用。
认识一下 redux-observableredux-observable,则是通过创建epics中间件,为每一个dispatch添加相应的附加效果。相较于thunk中间件,使用redux-observable来处理异步action,有以下两个优点:
不需要修改reducer,我们的reducer可以依然保持简单的纯函数形态。
epics中间件会将action封装成Observable对象,可以使用RxJs的相应api来控制异步流程。
比起redux-thunk,redux-observable能够强有力的支持更为复杂的异步逻辑。用更少的代码来实现需求。
总结通过几天的学习,对RxJS有了一定的了解,之后就是将其应用到实际项目中。
资料学习操作符的时候可以对照弹珠图
Rx Observables 的交互弹珠图
Learn RxJS 的中文版redux-observable中文文档
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/107246.html
摘要:随着前端应用的复杂度越来越高,如何管理应用的数据已经是一个不可回避的问题。应用的数据不是只有状态的,还有事件异步常量等等。出于以上两点原因,最终决定基于来设计一套管理应用的状态的解决方案。 随着前端应用的复杂度越来越高,如何管理应用的数据已经是一个不可回避的问题。当你面对的是业务场景复杂、需求变动频繁、各种应用数据互相关联依赖的大型前端应用时,你会如何去管理应用的状态数据呢? 我们认为...
摘要:项目简介本次使用了和开发了一个地址输入框,主要实现的功能有限制输入符合条件的字符并每隔两位可以自动添加用于分割的冒号。项目屏蔽了的事件处理,同时使用来手动控制光标。继承于和因此同时具有和两者的方法。后面的和都是需要利用最新的来进行判断的。 项目简介 本次使用了RxJS和react开发了一个mac地址输入框,主要实现的功能有限制输入符合条件的字符1-9,a-f,并每隔两位可以自动添加用于...
摘要:前言微前端理论篇上一篇介绍了微前端的理念,本片将开始介绍项目。先实现公共依赖的引入吧。在上一步我们没有引入的依赖包,是因为的依赖包是作为公共依赖导入的。里面全是我的公共依赖文件在下新建文件夹,新建文件,作为我们整个项目的页面文件。 前言 微前端 —— 理论篇 上一篇介绍了微前端的理念,本片将开始介绍portal项目。 portal项目介绍 portal项目包括两个...
阅读 2507·2023-04-25 22:09
阅读 1022·2021-11-17 17:01
阅读 1543·2021-09-04 16:45
阅读 2619·2021-08-03 14:02
阅读 815·2019-08-29 17:11
阅读 3253·2019-08-29 12:23
阅读 1086·2019-08-29 11:10
阅读 3280·2019-08-26 13:48