资讯专栏INFORMATION COLUMN

从观察者模式到迭代器模式系统讲解 RxJS Observable(一)

notebin / 2055人阅读

摘要:是的缩写,起源于,是一个基于可观测数据流结合观察者模式和迭代器模式的一种异步编程的应用库。是基于观察者模式和迭代器模式以函数式编程思维来实现的。学习之前我们需要先了解观察者模式和迭代器模式,还要对流的概念有所认识。

RxJS 是 Reactive Extensions for JavaScript 的缩写,起源于 Reactive Extensions,是一个基于可观测数据流 Stream 结合观察者模式和迭代器模式的一种异步编程的应用库。RxJS 是 Reactive Extensions 在 JavaScript 上的实现。

Reactive Extensions(Rx)是对 LINQ 的一种扩展,他的目标是对异步的集合进行操作,也就是说,集合中的元素是异步填充的,比如说从 Web
或者云端获取数据然后对集合进行填充。LINQ(Language Integrated Query)语言集成查询是一组用于 C# 和
Visual Basic 语言的扩展。它允许编写 C# 或者 Visual Basic 代码以操作内存数据的方式,查询数据库。

RxJS 的主要功能是利用响应式编程的模式来实现 JavaScript 的异步式编程(现前端主流框架 Vue React Angular 都是响应式的开发框架)。

RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。学习 RxJS 之前我们需要先了解观察者模式和迭代器模式,还要对 Stream 流的概念有所认识。下面我们将对其逐一进行介绍,准备好了吗?让我们现在就开始吧。

RxJS 前置知识点 观察者模式

观察者模式又叫发布订阅模式(Publish/Subscribe),它是一种一对多的关系,让多个观察者(Obesver)同时监听一个主题(Subject),这个主题也就是被观察者(Observable),被观察者的状态发生变化时就会通知所有的观察者,使得它们能够接收到更新的内容。

观察者模式主题和观察者是分离的,不是主动触发而是被动监听。

举个常见的例子,例如微信公众号关注者和微信公众号之间的信息订阅。当微信用户关注微信公众号 webinfoq就是一个订阅过程,webinfoq负责发布内容和信息,webinfoq有内容推送时,webinfoq的关注者就能收到最新发布的内容。这里,关注公众号的朋友就是观察者的角色,公众号webinfoq就是被观察者的角色。

示例代码:

// 定义一个主题类(被观察者/发布者)
class Subject {
  constructor() {
    this.observers = [];   // 记录订阅者(观察者)的集合
    this.state = 0;        // 发布的初始状态
  }
  getState() {
    return this.state;
  }
  setState(state) {        
    this.state = state;    // 推送新信息
    this.notify();         // 通知订阅者有更新了
  }
  attach(observer) {
    this.observers.push(observer);   // 对观察者进行登记
  }
  notify() {
    // 遍历观察者集合,一一进行通知
    this.observers.forEach(observer = {
      observer.update();   
    })
  }
}
// 定义一个观察者(订阅)类
class Observer {
  constructor(name, subject) {  
    this.name = name;   // name 表示观察者的标识
    this.subject = subject;  // 观察者订阅主题
    this.subject.attach(this);  // 向登记处传入观察者实体
  }
  update() {
    console.log(`${this.name} update, state: ${this.subject.getState()}`);
  }
}

// 创建一个主题
let subject = new Subject();

// 创建三个观察者: observer$1 observer$2 observer$3
let observer$1 = new Observer("observer$1", subject);
let observer$2 = new Observer("observer$2", subject);
let observer$3 = new Observer("observer$3", subject);

// 主题有更新
subject.setState(1);
subject.setState(2);
subject.setState(3);

// 输出结果
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$1 update, state: 1
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$2 update, state: 2
// observer$3 update, state: 3
// observer$3 update, state: 3
// observer$3 update, state: 3
迭代器模式

迭代器(Iterator)模式又叫游标(Sursor)模式,迭代器具有 next 方法,可以顺序访问一个聚合对象中的各个元素,而不需要暴露该对象的内部表现。

迭代器模式可以把迭代的过程从从业务逻辑中分离出来,迭代器将使用者和目标对象隔离开来,即使不了解对象的内部构造,也可以通过迭代器提供的方法顺序访问其每个元素。

了解更多可迭代对象:「JS篇」你不知道的 JS 知识点总结(一)

使用 ES5 创建一个迭代器

//创建一个迭代类,传入目标对象
function Iterator(container) {
  this.list = container.list;
  this.index = 0;

  //定义私有的next方法,执行迭代
  this.next = function() {
    if(this.hasNext()) { //判断是否迭代完毕
      return {
        value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  this.hasNext = function() {
    if(this.index >= this.list.length) {
      return false;
    }
    return true;
  }
}

//定义目标对象
function Container(list) {
  this.list = list;
  this.getIterator = function() {
    return new Iterator(this); //用户返回一个迭代器
  }
}

//调用
var container = new Container([1, 2, 3, 4, 5]);
var iterator = container.getIterator();
iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 构造一个迭代器

class Iterator {
  constructor(container) {
    this.list = container.list;
    this.index = 0;
  }
  next() {
    if(this.hasNext()) {
      return {
        value: this.list[this.index++],
        done: false
      }
    }
    return {value: null, done: true}
  }
  hasNext() {
    if(this.index >= this.list.length) {
      return false;
    }
    return true;
  }
}

class Container {
  constructor(list) {
    this.list = list;
  }
  getIterator() {
    return new Iterator(this);
  }
}

let container = new Container([1, 2, 3, 4, 5]);
let iterator = container.getIterator();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

使用 ES6 的 Symbol.iterator 创建一个迭代器

var list = [1, 2, 3, 4, 5];
var iterator = list[Symbol.iterator]();

iterator.next();  // {value: 1, done: false}
iterator.next();  // {value: 2, done: false}
iterator.next();  // {value: 3, done: false}
iterator.next();  // {value: 4, done: false}
iterator.next();  // {value: 5, done: false}
iterator.next();  // {value: null, done: true}

通过上边的示例代码我们可以得知,我们不了解对象的内部构造,但是可以通过调用迭代器提供的 next() 方法就能顺序访问其每个元素。

Stream 流

在这里可以将一系列的鼠标点击、键盘点击产生的事件和将要处理的元素集合看作一种流, 流的特点是数据源的本身是无限的,流在管道中传输, 并且可以在管道的节点上进行处理, 比如筛选, 排序,聚合等。

流数据源(source)经过数据转换等中间操作的处理,最后由最终操作得到前面处理的结果,每次转换原有 Stream 对象不改变,返回一个新的 Stream 对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道。

为了对 stream 有一个更感性的认识,我们说点击事件可以看作一种 stream,在介绍 RxJS 之前,我们不妨先看一个 RxJS 官网上的例子(列举官方的例子更能充分体现 RxJS 是基于可观测数据流 Stream 的)。

通常,注册一个事件侦听器是这样的。

document.addEventListener("click", () => console.log("Clicked!"));

使用 RxJS 可以创建一个 observable

import { fromEvent } from "rxjs";
fromEvent(document, "click").subscribe(() => console.log("Clicked!")); 
自定义源创建一个 Observable

结合上边讲到流和设计模式,为了方便我们对 RxJS 有进一步的认识,我们就用代码自己来实现一个 Obserable

其实 Observable 实际上就是一个函数,它接收一个Observer 对象作为参数,返回一个函数用来取消订阅。Observer 对象可以声明 next、err、complete 方法来处理流的不同状态。

首先我们定义数据源 Source

// 创建数据源
class Source {
  constructor() {
    this.state = 0;
    this.data = setInterval(() => this.emit(this.state++), 200);
  }
  
  emit(state) {
    const limit = 10;  // 定义数据上限
    if (this.onData) {
      this.onData(state);  // 产生数据
    }
    if (state === limit) {
      if (this.onComplete) {
        this.onComplete();  // 数据终止
      }
      this.destroy();
    }
  }
  
  destroy() {  //停止定时器,清除数据
    clearInterval(this.data);
  }
}

创建一个 Observable

// 创建 Observable
class Observable {
  constructor() {}
  getStream() {  
    return new Source();
  }
  subscribe(observer) {
    // 获取流数据源
    this.stream = this.getStream();  
    
    // 转换
    this.stream.onData = (e) => observer.next(e); // 处理流数据
    this.stream.onError = (err) => observer.error(err);  //处理异常
    this.stream.onComplete = () => observer.complete();  //处理流数据终止
    
    // 返回一个函数
    return () => {
      this.stream.destroy();        
    }
  }
}

调用 subscribe 进行订阅

const observable = new Observable();
//订阅
let observer = {
  next(data) { console.log(data); },
  error(err) { console.error(err); },
  complete() { console.log("done")}
}
const unsubscribe = observable.subscribe(observer);

输出结果

我们可以调用 unsubscribe 取消订阅

//0.5后取消订阅
setTimeout(unsubscribe, 500);

我们可以看到 Observable 作为生产者与观察者之间的桥梁,并返回一种方法来解除生产者与观察者之间的联系,其中观察者用于处理时间序列上数据流。

RxJS(Reactive Extensions for JavaScript)

介绍完 RxJS 的一些前置知识点,下面就让我们一起来认识下什么是 RxJS 吧。

RxJS 中含有两个基本概念:ObservablesObserver。Observables 作为被观察者,是一个值或事件的流集合;而 Observer 则作为观察者,根据 Observables 进行处理。

Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:

订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。

发布:Observable 通过回调 next 方法向 Observer 发布事件。

Observable 属于全新的 push 体系,让我们先了解下什么是 pull 体系和 push 体系吧。

Pull vs Push

PullPush 是两种不同的协议,描述了数据生产者如何与数据消费者进行通信。

生产者 消费者
pull 被请求的时候产生数据 决定何时请求数据
push 按自己的节奏生产数据 对接收的数据进行处理

Pull 体系中,数据的消费者决定何时从数据生产者那里获取数据,而生产者自身并不会意识到什么时候数据将会被发送给消费者。

每一个 JavaScript函数都是一个 Pull 体系,函数是数据的生产者,调用函数的代码通过 "拉出" 一个单一的返回值来消费该数据。

function add(x, y) {
  console.log("Hello");
  return x + y;
}
const x = add(4, 5); 

ES6介绍了 Iterator 迭代器 和 Generator 生成器,另一种 Pull 体系,调用 iterator.next() 的代码是消费者,可从中拉取多个值。

Push 体系中,数据的生产者决定何时发送数据给消费者,消费者不会在接收数据之前意识到它将要接收这个数据。

Promise 是当今 JS 中最常见的 Push 体系,一个 Promise (数据的生产者)发送一个 resolved value (成功状态的值)来执行一个回调(数据消费者)。但是不同于函数的地方的是:Promise 决定着何时数据才被推送至这个回调函数。

RxJS 引入了 Observable (可观察对象),一个全新的 Push 体系。一个可观察对象是一个产生多值的生产者,当产生新数据的时候,会主动 "推送给" Observer (观察者)。

Observable(可观察对象)是基于推送(Push)运行时执行(lazy)的多值集合。

MagicQ 单值 多值
拉取(Pull) Function Iterator
推送(Push) Promise Observable

ObservablePromise 之间的差异:

Promise:只能返回单个值,不可取消,要么 resolve 要么 reject 并且只响应一次。

Observable:随着时间的推移发出多个值,可以调用 unsubscribe() 取消订阅,支持 map、filter、reduce 等操作符,延迟执行,当订阅的时候才会开始执行,可以响应多次。

RxJS 之 Observable

使用 RxJS 我们可以使用 npm 进行安装(更多使用方法请参考 github):

npm install rxjs 

需要注意的是,很多人认为 RxJS 中的所有操作都是异步的,但其实这个观念是错的。RxJS 的核心特性是它的异步处理能力,但它也是可以用来处理同步的行为。具体示例如下:

import { Observable } from "rxjs";
 
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
 
console.log("start");
observable.subscribe({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log("done"); }
});
console.log("end");

以上代码运行后,控制台的输出结果:

start
1
2
3
done
end

当然我们也可以用它处理异步行为:

import { Observable } from "rxjs";
 
const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  setTimeout(() => {
    subscriber.next(4);
    subscriber.complete();
  }, 1000);
});
 
console.log("start");
observable.subscribe({
  next(x) { console.log(x); },
  error(err) { console.error(err); },
  complete() { console.log("done"); }
});
console.log("end");

代码运行后的输出结果为:

start
1
2
3
end
4
done
RxJS 使用创建类操作符创建 Observable

RxJS 中提供了很多操作符 Operators,下篇文章我们将对 Operators 进行介绍,创建类操作符(Creation Operator)用于创建 Observable 对象。

官网列举的一些创建类操作符如下:

ajax

bindCallback

bindNodeCallback

defer

empty

from

fromEvent

fromEventPattern

generate

interval

of

range

throwError

timer

iif

最后我们简单的来看一下,如何使用 from 创建一个 Observable(关于操作符的介绍我们将在下篇文章进行详细介绍,保持关注哦)。

from:可以把数组、Promise、以及 Iterable 转化为 Observable。

//将数组转换为 Observable:

import { from } from "rxjs";

const array = [10, 20, 30];
const result = from(array);

result.subscribe(x => console.log(x));

// Logs:
// 10
// 20
// 30

由于 RxJS 涉及到的概念和知识点比较宽泛和复杂,我们需要一步一步的去理解和掌握它,最终可以做到知其然亦知其所以然。接下来文章中会继续介绍 RxJS 中涉及到知识点,关注此公众号webinfoq不要错过哦。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/104556.html

相关文章

  • Rxjs 响应式编程-第章:响应式

    摘要:响应式编程具有很强的表现力,举个例子来说,限制鼠标重复点击的例子。在响应式编程中,我把鼠标点击事件作为一个我们可以查询和操作的持续的流事件。这在响应式编程中尤其重要,因为我们随着时间变换会产生很多状态片段。迭代器模式的另一主要部分来自模式。 Rxjs 响应式编程-第一章:响应式Rxjs 响应式编程-第二章:序列的深入研究Rxjs 响应式编程-第三章: 构建并发程序Rxjs 响应式编程-...

    songze 评论0 收藏0
  • RxJS基础教程

    摘要:是一个基于可观测数据流在异步编程应用中的库。正如官网所说,是基于观察者模式,迭代器模式和函数式编程。它具有时间与事件响应的概念。通知不再发送任何值。和通知可能只会在执行期间发生一次,并且只会执行其中的一个。 RxJS是一个基于可观测数据流在异步编程应用中的库。 ReactiveX is a combination of the best ideas fromthe Observer p...

    defcon 评论0 收藏0
  • 【CuteJavaScript】Angular6入门项目(3.编写服务和引入RxJS

    摘要:发布通过回调方法向发布事件。观察者一个回调函数的集合,它知道如何去监听由提供的值。 本文目录 一、项目起步 二、编写路由组件 三、编写页面组件 1.编写单一组件 2.模拟数据 3.编写主从组件 四、编写服务 1.为什么需要服务 2.编写服务 五、引入RxJS 1.关于RxJS 2.引入RxJS 3.改造数据获取方式 六、改造组件 1.添...

    RebeccaZhong 评论0 收藏0
  • [译]RxJS文档01——介绍

    摘要:原文是一个使用可观察量队列解决异步编程和基于事件编程的库。提供了几个管理异步事件的核心概念可观察量,代表了一个由未来获取到的值或事件组成的集合。相当于事件触发器,是向多个广播事件或推送值的唯一方法。 原文:http://reactivex.io/rxjs/manu... RxJS 是一个使用可观察量(observable)队列解决异步编程和基于事件编程的js库。他提供了一个核心的类型O...

    BlackHole1 评论0 收藏0
  • Rxjs 核心概念

    摘要:仿宋可以把想像成一个可以发射事件的库。在中用来处理异步事件的核心概念包括代表了未来可能会产生的一系列的值或事件的集合回调函数的集合,它知道如何去处理上产生的值或者事件,当然也包括异常。 又一年要过去了,回顾2017,rxjs始终是我在项目里使用最频繁的库,在我看来,它是一个非常优秀的数据处理工具。年初的时候就计划写点什么,碍于目前公司的项目实在抽不出时间,这一拖就到了年底。临近新年,总...

    Youngdze 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<