资讯专栏INFORMATION COLUMN

nodejs cluster模块分析

KnewOne / 2582人阅读

摘要:而在进程执行把进程添加到调度器中时添加了一个回调函数,回调函数了一个带的消息,并且为,就是这个消息触发了发送的函数的执行。

最近做了点nodejs项目,对nodejs的cluster怎么利用多进程处理请求产生了疑问,于是着手进行了研究,之后发现这其中竟大有文章!一切还是先从遥远的TCP说起吧。。。

TCP与Socket

说到TCP,相信很多人都相当了解了,大学已经教过,但是又相信有很多人也不是很了解,要不是当时没听,要不也可能是自身的编程能力不足以去实践相关内容,写到这我还特意去翻了一下大学的计算机网络教材,内容是很丰富的,但教人实践的内容还是太少了,里面的内容都把学生当成了有相当的Linux编程能力的人了,所以结果就是大部分只上了一年编程课刚学会几个Hello world程序的大二学生,听了这门课后一脸懵逼,即使记住了也因为没什么实践很快忘了,当年我就是这么懵逼过来的。
所以,扯了这些,结果是什么呢,结果就是我们要多动手!而要动手建立一条TCP连接可以用socket来实现,不过这里不是要说socket用法,只是来简单聊一聊他们之间的一点小联系,以便于理解后面的内容。

应用层通过传输层进行TCP通信时,有时TCP需要为多个应用程序进程提供并发服务。多个TCP连接或多个应用程序进程可能需要通过同一个TCP协议端口传输数据。为了区别不同的应用程序进程和连接,许多计算机操作系统为应用程序与TCP协议交互提供了称为套接字 (Socket)的接口,区分不同应用程序进程间的网络通信和连接。

我们可以用一个四元组来确定一条TCP连接(源ip,源端口,目标ip,目标端口),而连接是通过socket来建立的(服务端进行bind和listen->客户端发起connect->服务端accept),计算机系统就是通过socket来区分不同的TCP连接的。所以我们可以看出来,只要目标ip/端口不同,服务端可以用同一个端口生成多个socket,建立多条连接。
但是,一个进程只能监听一个端口,一个端口怎么生成多个socket呢?其实服务器端程序一般会把socket和服务器某个端口(ip+端口)bind起来, 这样构成了一个特殊的socket, 这个socket没有目标ip和端口。socket进行listen之后当有新的连接进来时, 系统将请求存进队列(此时TCP握三次手完成), 后续可以再调用accept拿到队列的请求,返回一个新的socket, 这个socket是由四元组建立的, 也就对应了一个唯一的连接。

说完这些,可以来聊一聊nodejs是怎样建立一个TCP服务的了。

nodejs createServer启动TCP服务小解析

一般我们用nodejs启动一个TCP服务可能是这样的:

require("net").createServer(function(sock) {
    sock.on("data", function(data) {
        sock.write("Hello world");
    });
}).listen(8080, "127.0.0.1");

进到createServer一看(代码都在net模块中),里面return了一个Server对象,Server继承EventEmitter,将createServer的参数做为connection事件的回调函数,这块比较简单就不贴代码了。我们需要关注的是Serverlisten方法,其不同的参数最终都会调用到listenInCluster方法。cluster!是的这和cluster有关,但先不管它,我们先管在主进程中它的执行:

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive) {
  // ...

  if (cluster.isMaster || exclusive) {
    // ...
    server._listen2(address, port, addressType, backlog, fd);
    return;
  }

  // ...

}

从代码我们可以看到listenInCluster最终是调用了_listen2方法,它就是服务启动的关键,其定义如下:

function setupListenHandle(address, port, addressType, backlog, fd) {
  
    // ...

    var rval = null;

    // ...

    if (rval === null)
        rval = createServerHandle(address, port, addressType, fd);

    // ...
    this._handle = rval;

  // ...
  this._handle.onconnection = onconnection;
  this._handle.owner = this;

  var err = this._handle.listen(backlog || 511);

  // ...
}

其中createServerHandle方法就不展开了,它就如之前所说的:把socket和服务器某个端口(ip+端口)bind起来, 这样构成了一个特殊的socket, 这个socket没有目标ip和端口。它绑定了address+port并返回了一个特殊socket(句柄)rval,可以看到最后它调用了listen对端口进行监听,并且指定了一个回调函数onconnection,函数会在C++层当accept请求时触发,其回调参数之一就是前面提到的accept后与客户端连接的新socket句柄。到这里再看一下onconnection的代码:

function onconnection(err, clientHandle) {
  // ...
  var self = handle.owner;
  var socket = new Socket({
    handle: clientHandle,
    allowHalfOpen: self.allowHalfOpen,
    pauseOnCreate: self.pauseOnConnect
  });
  socket.readable = socket.writable = true;


  // ...
  self.emit("connection", socket);
}

可以看到nodejs在对socket句柄进一步封装后(封装成nodejs的Socket对象),再触发server(由createServer创建)的connection事件。这时我们再回到前面createServer的介绍,其监听了connection事件,所以最终流程走下来createServer的的方法参数将被触发,并且可以拿到一个nodejs的Socket对象进行write与read操作,与客户端进行通信。

至此我们已经对nodejs启动一个TCP服务的流程有了了解,接下来就到主题cluster了。

cluster为我们做了什么

开始说代码之前,先来聊一聊喂鸽子吧。假设你坐在布拉格广场前静静地坐着,然后往前面撒了一把狗粮,喔不对是鸽粮,然后周围的一群鸽子都震惊了并往你这边飞抢东西吃。这个现象可以用一个词来形容就是“惊群“。然而这只是我的瞎掰,我们程序员理解的惊群应该是:多个进程/线程同时阻塞等待某个事件,当事件发生时唤醒了所有等待的进程/线程,但最终只有一个能对事件进行处理。很明显这对cpu造成了浪费,而cluster的多进程模型对此做了处理:只用一个master进程等待请求,然后有请求到来时使用round-robin轮询分配请求给各个子进程进行处理,这块后面提到的源码会涉及到,这里就不深入了。除了round-robin,还有其他的一些cluster为我们做的,就用代码来talk吧:

const cluster = require("cluster");
const http = require("http");

if (cluster.isMaster) {

  const numCPUs = require("os").cpus().length;
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end("hello world
");
  }).listen(8000);
}

以上代码就是cluster的典型用法,在nodejs启动文件判断当前进程,如果当前进程是master进程,那么就根据cpu的核数fork出相同数量的进程,否则(worker进程)就启动一个http服务,所以一般这样会给一个核心分配一个worker进程来启动一个服务,搭起一个小服务集群。但是问题来了,为什么这里可以有多个进程同时监听一个端口呢,是因为listen做的一些文章,下面再一步步深入解析。由于http.Server其实是继承了net.Server,所以跟前面创建TCP服务一样,listen最终也是调用到listenInCluster,我们从这里重新开始。

function listenInCluster(server, address, port, addressType,
                         backlog, fd, exclusive) {
  // ...

  const serverQuery = {
    address: address,
    port: port,
    addressType: addressType,
    fd: fd,
    flags: 0
  };

  // Get the master"s server handle, and listen on it
  cluster._getServer(server, serverQuery, listenOnMasterHandle);

  // ...
}

listenInCluster在worker进程中调用cluster._getServer,并且传入了一个函数listenOnMasterHandle。这里还不知道它做了什么,所以再进入cluster._getServer看看(由于当前是在worker进程,cluster模块文件是lib/internal/cluster/child.js):

cluster._getServer = function(obj, options, cb) {
  // ...

  const message = util._extend({
    act: "queryServer",
    index: indexes[indexesKey],
    data: null
  }, options);

  send(message, (reply, handle) => {
    if (typeof obj._setServerData === "function")
      obj._setServerData(reply.data);

    if (handle)
      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
    else
      rr(reply, indexesKey, cb);              // Round-robin.
  });
  // ...
};

关注send方法,它调用了sendHelper方法,该方法是在internal/cluster/utils定义的,相当一个消息转发器处理进程间通信,它发送一个“进程内部消息“(internalMessage),而worker进程在master进程被fork出来的时候监听了internalMessage:

// lib/internal/cluster/master.js
worker.process.on("internalMessage", internal(worker, onmessage));

所以最终在worker进程发送的消息,触发了master进程执行了onmessage方法,onmessage判断message.act === "queryServer"执行queryServer,而就是在这个方法中,新建了一个RoundRobinHandle调度器,就是这个东西分配请求做了负载均衡。这里用地址和端口号作为key将调度器存储起来,调度器不会被worker创建两次,最后将worker进程add到队列。相关代码如下:

// lib/internal/cluster/master.js
function queryServer(worker, message) {
  // ...
  var handle = handles[key];

  if (handle === undefined) {
    var constructor = RoundRobinHandle;
    // ...

    handles[key] = handle = new constructor(key,
                                            message.address,
                                            message.port,
                                            message.addressType,
                                            message.fd,
                                            message.flags);
  }
  // ...
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    // ...
  });
}

然后我们再来看看RoundRobinHandle,它里面调用net.createServer方法新建了一个server,并且开始监听,这块可以看前面内容。不过与前面不同的是,server在listening事件完成时拿到监听端口的那个特殊socket句柄,重置了onconnection方法,当新的连接建立时方法被调用,将accept连接的socket句柄分发到队列里的worker进行处理(distribute)。对于listening事件,它在Server.listen执行后就会触发,代码就在setupListenHandle方法里面。RoundRobinHandle代码如下:

// lib/internal/cluster/round_robin_handle.js
function RoundRobinHandle(key, address, port, addressType, fd) {
  // ...
  this.server = net.createServer(assert.fail);

  if (fd >= 0)
    this.server.listen({ fd });
  else if (port >= 0)
    this.server.listen(port, address);
  else
    this.server.listen(address);  // UNIX socket path.

  this.server.once("listening", () => {
    this.handle = this.server._handle;
    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
    // ...
  });
}
RoundRobinHandle.prototype.distribute = function(err, handle) {
  this.handles.push(handle);
  const worker = this.free.shift();

  if (worker)
    this.handoff(worker);
};
RoundRobinHandle.prototype.handoff = function(worker) {
  // ...
  const message = { act: "newconn", key: this.key };

  sendHelper(worker.process, message, handle, (reply) => {
    // ...
  });
};

从代码上看到最终调度器调用handoff方法,通过sendHelper向worker进程发送一个新连接到达的消息newconn,执行worker进程的server的onconnection方法,worker进程相关代码如下:

// lib/internal/cluster/child.js
cluster._setupWorker = function() {
  // ...
  process.on("internalMessage", internal(worker, onmessage));
  send({ act: "online" });

  function onmessage(message, handle) {
    if (message.act === "newconn")
      onconnection(message, handle);
    else if (message.act === "disconnect")
      _disconnect.call(worker, true);
  }
};
// Round-robin connection.
function onconnection(message, handle) {
  const key = message.key;
  const server = handles[key];
  const accepted = server !== undefined;

  send({ ack: message.seq, accepted });

  if (accepted)
    server.onconnection(0, handle);
}

走到这里worker进程的server就拿到了连接的socket句柄可以进行处理,但是好像有点问题,worker进程的server好像还没起起来啊,前面讲的只是在master进程的调度器启动了一个server,worker进程并没有server。我们又得翻回前面的内容看一看了,看看之前提到的workder进程的cluster._getServer,里面send方法发送了一个函数,函数里面的rr(reply, indexesKey, cb);就是创建了workder进程server的代码。

先来看看cluster._getServer中发送的函数怎么被调用的。这里需要来了解一下之前出现了几次的sendHelper,它是cluster模块用来做进程间通信的,另外还有一个internal方法用来处理通信的回调。cluster._getServersend会调用sendHelper,它会用message.seq当key把send的函数存储起来。然后在internal方法处理通信的回调时判断message是否有这个key,是否能找到这个函数,可以的话就执行。而在master进程执行queryServer把worker进程添加到调度器中时添加了一个回调函数,回调函数send了一个带seq的消息,并且handle为null,就是这个消息触发了cluster._getServer发送的函数的执行。相关代码如下:

// `internal/cluster/utils.js`
const callbacks = {};
var seq = 0;
function sendHelper(proc, message, handle, cb) {
  // ...
  if (typeof cb === "function")
    callbacks[seq] = cb;

  message.seq = seq;
  // ...
}
function internal(worker, cb) {
  return function onInternalMessage(message, handle) {
    // ...
    var fn = cb;

    if (message.ack !== undefined && callbacks[message.ack] !== undefined) {
      fn = callbacks[message.ack];
      delete callbacks[message.ack];
    }
    // ...
  };
}
// lib/internal/cluster/master.js
function queryServer(worker, message) {
  // ...
  // Set custom server data
  handle.add(worker, (errno, reply, handle) => {
    reply = util._extend({
      // ...
      ack: message.seq,
      // ...
    }, reply);
    // ...
    send(worker, reply, handle);
  });

最终,rr(reply, indexesKey, cb);执行,它构造了一个假的socket句柄,句柄设置了一个不做操作的listen方法。然后执行cb,这个cb也就是前面提到过的listenOnMasterHandle,它会把假socket句柄赋值给worker进程的server._handle,随后由于server._handle的存在,server._listen2(address, port, addressType, backlog, fd);也不会做任何操作,也就是说worker进程创建的server是不会对端口进行监听的。相关代码如下:

// lib/internal/cluster/child.js
function rr(message, indexesKey, cb) {
  function listen(backlog) {
    // ...
    return 0;
  }
  // ...
  cb(0, handle);
}
// lib/net.js
function listenOnMasterHandle(err, handle) {
  // ...
  server._handle = handle;
  server._listen2(address, port, addressType, backlog, fd);
}
// setupListenHandle就是_listen2
function setupListenHandle(address, port, addressType, backlog, fd) {
  // ...
  if (this._handle) {
    debug("setupListenHandle: have a handle already");
  }
  // ...

至此,cluster模块如何建立多进程服务的就算讲完了。画个草图总结下吧:

文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。

转载请注明本文地址:https://www.ucloud.cn/yun/89327.html

相关文章

  • nodejs中的子进程,深入解析child_process模块cluster模块

    摘要:严格来说,并不是单线程的。其他异步和事件驱动相关的线程通过来实现内部的线程池和线程调度。线程是最小的进程,因此也是单进程的。子进程中执行的是非程序,提供一组参数后,执行的结果以回调的形式返回。在子进程中通过和的机制来接收和发送消息。   node遵循的是单线程单进程的模式,node的单线程是指js的引擎只有一个实例,且在nodejs的主线程中执行,同时node以事件驱动的方式处理IO...

    JinB 评论0 收藏0
  • 前端进阶-让你升级的网络知识

    摘要:一般由客户端发送,用来表示报文段中第一个数据字节在数据流中的序号,主要用来解决网络包乱序的问题。为有效,为无效表示,当数据包得到后,立马给应用程序使用到最顶端用来确保连接的安全。亲,那进程和线程区别是什么嘞这算是计算机的基本知识吧。 在正文之前,我想问大家一个问题:问:亲,你有基础吗?答: 有啊,你说前端吗? 不就是HTML,JS,CSS 吗? so easy~问: oh-my-zsh...

    leoperfect 评论0 收藏0
  • 通过源码解析 Node.js 中 cluster 模块的主要功能实现

    摘要:通常的解决方案,便是使用中自带的模块,以模式启动多个应用实例。最后中的模块除了上述提到的功能外,其实还提供了非常丰富的供和进程之前通信,对于不同的操作系统平台,也提供了不同的默认行为。如果大家有闲,非常推荐完整领略一下模块的代码实现。 众所周知,Node.js中的JavaScript代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃。这在许多场景下,尤其是web...

    leeon 评论0 收藏0
  • nodeJS多进程

    摘要:通过将的给出来的进程。恩吞吐率关于吞吐率有多种解读,一种是描绘服务器单位时间处理请求的能力。而根据这个描述的话他的单位就为而这个指标就是上面数据中的当然,肯定是越大越好了吞吐量这个和上面的吞吐率很有点关系的。 首先郑重声明:nodeJS 是一门单线程!异步!非阻塞语言!nodeJS 是一门单线程!异步!非阻塞语言!nodeJS 是一门单线程!异步!非阻塞语言! 重要的事情说3遍。 因为...

    happen 评论0 收藏0

发表评论

0条评论

最新活动
阅读需要支付1元查看
<