资讯专栏INFORMATION COLUMN

在 Node.js 中用 pipe 处理数组的实现

mikyou / 1947人阅读

摘要:总结虽然我没有像老师一样进行性能测试,但是猜测也知道的方式在数量比较小的时候肯定要弱于正常方式,的好处在于数据量比较大的时候,可以使用比较小的内存,尽快的处理数组中前置的数据。

TLDR;
这篇文章的风格是在致敬 Jim 老师;致敬,致敬,懂吗,不是抄袭,程序员的事怎么能叫抄袭。
当然我对 Node.js 的 stream 也是现学现卖,有使用不当的地方,敬请指出。
原文链接 欢迎 star。

写这篇文章的初衷是年前看 SICP 的时候,第二章介绍构造数据抽象的时候有提到 Lisp 对序列的处理采用类似『信号流』的方式。所以很自然的就想到了 Node.js 中的 pipe 方式,于是就一直想用 pipe 的方式尝试一下。

同 Jim 老师的这篇 文章 中描述的一样, 我也是懒癌发作,从年尾拖到今年年初,然后在年初又看到了 Jim 老师 的博客,深受启发,终于下定决心要开始码了...... 然后,嗯,又拖到昨天。促使我下定决心要写的主要原因是昨天部门的年会!反正年会跟我这种死肥宅也没多大关系,在大家 happy 的时候构思了下代码实现,回家用了一晚上的时候补上了代码。

Jim 老师在他的文章里面也说了,JS 的那些数组操作 (map/ reduce/filter) 啥的,每次调用的时候都会进行一次完整的遍历。试想一下如果有一个第一个数是1,长度是 1亿 的递增为 1 的数组,需要把所有的数组都乘 3,再排除其中的奇数,如果用 (map/filter) 的方法,只要也需要循环 一亿五千万次;那么如果有其他办法能只循环一亿次,是不是节省了大量的内存资源和循环消耗的时间。

废话不多说,直接上代码吧。

pipe

在编写代码时,我们应该有一些方法将程序像连接水管一样连接起来 -- 当我们需要获取一些数据时,可以去通过"拧"其他的部分来达到目的。这也应该是IO应有的方式。 -- Doug McIlroy. October 11, 1964

关于 node 的 stream 可以看看这篇 文章。

下面是代码部分,整个代码我是在边学 pipe 边用一晚上的时间仓促写就的,懒癌发作,也不想再重构了,各位相公讲究看吧,求别喷代码。

入口
const stream = require("stream")

const last = Symbol()

// 在 selfArray 中接收一个真正的数组
// 返回一个可读流
// 如果再做的精细点,可以做成可读可写流,这样就能通过控制流的大小,来控制内存的大小,别几亿条数据直接撑爆内存了
// 不过对后面 reduce 的处理就比较麻烦
function selfArray(a) {
  const rs = new stream.Readable({
    objectMode: true
  })

  a.forEach((v, index) => {
    rs.push(v)
  })
  rs.push(last)
  rs.push(null)
  return rs
}

上面的 selfArray 在流的最后面 push 了一个 Symbol 对象来标志整个流的输入结束,留待为之后 reduce 的使用。

Map/Filter/Reduce 的实现
function forEach(callback) {
  const ws = new stream.Writable({
    objectMode: true
  })
  let index = 0

  ws._write = function (chunk, enc, next) {
    if (chunk !== last) {
      callback(chunk, index++)
      next()
    }
  }

  return ws
}

function filter(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0

  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      let condition = callback(chunk, index++)
      if (condition) {
        this.push(chunk)
      }
      next()
    }
  }
  return trans
}

function map(callback) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })
  let index = 0
  trans._transform = function (chunk, enc, next) {
    if (chunk === last) {
      next(null, last)
    } else {
      next(null, callback(chunk, index++))
    }
  }
  return trans
}

function reduce(callback, initial) {
  const trans = new stream.Transform({
    readableObjectMode: true,
    writableObjectMode: true
  })

  let index = 0,
    current = initial,
    prev = initial


  trans._transform = function (chunk, enc, next) {

    if (chunk === last) {
      if (index > 1) {
        prev = callback(prev, current, index - 1)
      }
      this.push(prev)
      this.push(last)
      return next(null, last)
    }

    if (initial === void 0 && index === 0) {
      prev = chunk
    }

    if (index > 0) {
      prev = callback(prev, current, index - 1)
    }

    current = chunk
    index++
    next()
  }

  return trans
}

上面的代码在 reduce 的实现稍微麻烦了一些,reduce 对没有初始值,原始数组为空的条件下有各种不同的处理情况,翻看了下 MDN 的解释又自己实现了下。

使用
selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => v * 3))
  .pipe(filter(v => v % 2))
  .pipe(reduce((p, c) => p + c, 0))
  .pipe(forEach(v => {
    console.log("pipe 计算最后的结果是:", v)
  }))

为了好看我故意把各种括号都删掉了。嗯,看起来还挺完美,我们来测试下

selfArray([9, 2, 6, 3, 5, 6, 7, 1, 4, 4])
  .pipe(map(v => {
    console.log("map:", v)
    return v * 3
  }))
  .pipe(filter(v => {
    console.log("filter:", v)
    return v % 2
  }))
  .pipe(reduce((p, c) => {
    console.log("reduce:", p, c)
    return p + c
  }, 0))
  .pipe(forEach(v => {
    console.log("pipe 计算最后的结果是:", v)
  }))
  
  
加上 log 之后可以看到结算结果是:
  
map: 9
filter: 27
map: 2
filter: 6
map: 6
filter: 18
map: 3
filter: 9
reduce: 0 27
map: 5
filter: 15
reduce: 27 9
map: 6
filter: 18
map: 7
filter: 21
reduce: 36 15
map: 1
filter: 3
reduce: 51 21
map: 4
filter: 12
map: 4
filter: 12
reduce: 72 3
pipe 计算最后的结果是: 75

从上面的 log 可以看到, 第一个数 9 先执行了 map,然后在 3 之后就直接进入了 filter,此时第 2 个数 2 也开始被 map 处理,然后被 filter 处理,但是由于 3 之后是偶数不会被 reduce 接收, reduce 会一直等到第二个奇数,也就是 3 进入之后才会被处理... 嗯,直到最终的计算结果是 75, 被 forEach 消耗。

总结

虽然我没有像 Jim 老师一样进行性能测试,但是猜测也知道 pipe 的方式在数量比较小的时候肯定要弱于正常方式,pipe 的好处在于数据量比较大的时候,可以使用比较小的内存,尽快的处理数组中前置的数据。

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/88185.html

相关文章

  • Node.js 中用子进程操作标准输入/输出

    摘要:在行中,我们将子进程的连接到当前进程的。等待子进程通过退出函数如下所示。子进程的实现以下代码用异步写入以命令运行的子进程的我们为命令生成一个名为的独立进程。而是子进程完成。没有这个,将会在调用之前被输出。 翻译:疯狂的技术宅原文:http://2ality.com/2018/05/chi... 本文首发微信公众号:jingchengyideng欢迎关注,每天都给你推送新鲜的前端技术...

    leeon 评论0 收藏0
  • 我他喵到底要怎样才能生产环境中用上 ES6 模块化?

    摘要:因此,你还是需要各种各样杂七杂八的工具来转换你的代码噢,我可去你妈的吧,这些东西都是干嘛的我就是想用个模块化,我到底该用啥子本文正旨在列出几种可用的在生产环境中放心使用模块化的方法,希望能帮到诸位后来者这方面的中文资源实在是忒少了。 原文发表在我的博客上。最近捣鼓了一下 ES6 的模块化,分享一些经验 :) Python3 已经发布了九年了,Python 社区却还在用 Python 2...

    KaltZK 评论0 收藏0
  • 认识 Node .js

    摘要:服务器每收到一条请求,都会用新的和对象触发请求回调函数。在调用完请求回调函数之后,就要由你负责用方法结束响应。 前言 本文将介绍Node.js的一些基本概念,包含它的历史,特性和简单的使用等。如果你有过服务端的编程经验,那么你将能很快熟悉它。 Node.js是什么 showImg(https://segmentfault.com/img/bVbc24J?w=619&h=35); 这是N...

    I_Am 评论0 收藏0
  • [译]关于Node.js streams你需要知道一切

    摘要:当一个客户端的响应对象是一个可读流,那么在服务器端这就是一个可写流。的模块给我们提供了一个可以操作任何文件的可读流通过方法创建。创建一个可读流创建可读流,我们需要类创建一个可读流非常简单。可以通过修改可读流配置里面的方法实现。 Node.js的stream模块是有名的应用困难,更别说理解了。那现在可以告诉你,这些都不是问题了。 多年来,开发人员在那里创建了大量的软件包,其唯一目的就是使...

    bang590 评论0 收藏0
  • gulp详细基础教程

    摘要:核心概念流流,简单来说就是建立在面向对象基础上的一种抽象的处理数据的工具。类型,设置输出路径以某个路径的某个组成部分为基础向后拼接。 一、gulp简介 1.gulp是什么? gulp是前端开发过程中一种基于流的代码构建工具,是自动化项目的构建利器;它不仅能对网站资源进行优化,而且在开发过程中很多重复的任务能够使用正确的工具自动完成;使用它,不仅可以很愉快的编写代码,而且大大提高我们的工...

    xuhong 评论0 收藏0

发表评论

0条评论

mikyou

|高级讲师

TA的文章

阅读更多
最新活动
阅读需要支付1元查看
<