摘要:子进程使用反序列化消息字符串为消息对象。在调用这类方法时,遍历列表中的实例发送内部消息,子进程列表中的对应项收到内部消息并处理返回,父进程中再结合返回结果和对应着这个类实例维护的信息,保证功能的正确性。
在 Node.js 中,当我们使用 child_process 模块创建子进程后,会返回一个 ChildProcess 类的实例,通过调用 ChildProcess#send(message[, sendHandle[, options]][, callback]) 方法,我们可以实现与子进程的通信,其中的 sendHandle 参数支持传递 net.Server ,net.Socket 等多种句柄,使用它,我们可以很轻松的实现在进程间转发 TCP socket:
// parent.js "use stirct" const { createServer } = require("net") const { fork } = require("child_process") const server = createServer() const child = fork("./child.js") server.on("connection", function (socket) { child.send("socket", socket) }) .listen(1337)
// child.js "use strict" process.on("message", function (message, socket) { if (message === "socket") socket.end("Child handled it.") })
$ curl 127.0.0.1:1337 Child handled it.
这时你可能就会疑问,此时 socket 已经处在了另一个进程中,那么像 net.Server#getConnections,net.Server#close 等等这些方法,该怎么实现其功能呢?传递的句柄都是 JavaScript 对象,它们在传递时,序列化和反序列化的机制,又是怎么样的呢?
让我们跟着 Node.js 项目中的 lib/child_process.js,lib/internal/child_process.js,lib/internal/process.js 等文件中的代码,来一探究竟。
序列化与反序列化当使用 child_process 模块中的 fork 函数创建 ChildProcess 类的实例时,会在建立 IPC channel 时,初始化 ChildProcess#send 方法:
// lib/internal/child_process.js // ... function setupChannel(target, channel) { // 此处的 target,即为正在创建的 ChildProcess 类实例 target._channel = channel; target._handleQueue = null; // ... target.send = function(message, handle, options, callback) { // ... if (this.connected) { return this._send(message, handle, options, callback); } // ... }; target._send = function(message, handle, options, callback) { assert(this.connected || this._channel); // ... if (handle) { message = { cmd: "NODE_HANDLE", type: null, msg: message }; if (handle instanceof net.Socket) { message.type = "net.Socket"; } else if (handle instanceof net.Server) { message.type = "net.Server"; } else if (handle instanceof TCP || handle instanceof Pipe) { message.type = "net.Native"; } else if (handle instanceof dgram.Socket) { message.type = "dgram.Socket"; } else if (handle instanceof UDP) { message.type = "dgram.Native"; } else { throw new TypeError("This handle type can"t be sent"); } var obj = handleConversion[message.type]; handle = handleConversion[message.type].send.call(target, message, handle, options); // ... var req = new WriteWrap(); req.async = false; var string = JSON.stringify(message) + " "; var err = channel.writeUtf8String(req, string, handle); // ... }; }
从代码我们可以看到,当我们带着句柄调用 ChildProcess#send 方法发送消息时,Node.js 会替我们先将该消息封装成它的内部消息(将消息包在对象中,且对象拥有一个 cmd 属性)。句柄的序列化,使用到的是 handleConversion[message.type].send 方法,在传递的是 socket 时,即为 handleConversion["net.Socket"].send。
所以关键一定就是在 handleConversion 这个对象上了,我们先不着急看它的如山真面如。让我们先来看看子进程反序列化时的关键步骤代码。
在子进程启动时,若发现自己是通过 child_process 模块创建的进程(环境变量中带有 NODE_CHANNEL_FD),则最后也会执行上述的 lib/internal/child_process.js 文件中的 setupChannel 初始化函数:
// lib/internal/process.js // ... function setupChannel() { if (process.env.NODE_CHANNEL_FD) { var fd = parseInt(process.env.NODE_CHANNEL_FD, 10); delete process.env.NODE_CHANNEL_FD; var cp = require("child_process"); // ... cp._forkChild(fd); assert(process.send); } } // lib/child_process.js // ... const child_process = require("internal/child_process"); const setupChannel = child_process.setupChannel; exports._forkChild = function(fd) { // ... const control = setupChannel(process, p); };
以下函数与上上个例子的中函数为同一个,只不过于子进程中执行:
// lib/internal/child_process.js // ... function setupChannel(target, channel) { target._channel = channel; target._handleQueue = null; // ... target.on("internalMessage", function(message, handle) { // ... if (message.cmd !== "NODE_HANDLE") return; var obj = handleConversion[message.type]; obj.got.call(this, message, handle, function(handle) { handleMessage(target, message.msg, handle); }); }); } function handleMessage(target, message, handle) { if (!target._channel) return; var eventName = "message"; if (message !== null && typeof message === "object" && typeof message.cmd === "string" && message.cmd.length > INTERNAL_PREFIX.length && message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX) { eventName = "internalMessage"; } target.emit(eventName, message, handle); }
显而易见,使用了 handleConversion[message.type].got 来进行句柄的反序列化,使之构建成 JavaScript 对象。所以我们不难想到,句柄序列化 & 反序列化运用的就是,各个 handleConversion[message.type] 对象中提供的同一方法 send 和 got 。打个比方就像 Java 中的这些 class 都实现了同一个 interface:
// lib/internal/child_process.js // ... const handleConversion = { // ... "net.Server": { // ... send: function(message, server, options) { return server._handle; }, got: function(message, handle, emit) { var server = new net.Server(); server.listen(handle, function() { emit(server); }); } }, "net.Socket": { send: function(message, socket, options) { // ... }, got: function(message, handle, emit) { // ... } }, "dgram.Socket": { send: function(message, socket, options) { // ... }, got: function(message, handle, emit) { // ... } } // ... };
所以传递的过程:
主进程:
传递消息和句柄。
将消息包装成内部消息,使用 JSON.stringify 序列化为字符串。
通过对应的 handleConversion[message.type].send 方法序列化句柄。
将序列化后的字符串和句柄发入 IPC channel 。
子进程
使用 JSON.parse 反序列化消息字符串为消息对象。
触发内部消息事件(internalMessage)监听器。
将传递来的句柄使用 handleConversion[message.type].got 方法反序列化为 JavaScript 对象。
带着消息对象中的具体消息内容和反序列化后的句柄对象,触发用户级别事件。
net.Server#getConnections 等方法的功能实现由于将 socket 传递给了子进程之后,net.Server#getConnections,net.Server#close 等等方法,原来的实现已经无效了,为了保证功能,Node.js 又是怎么办的呢?答案可以大致概括为,父子进程之间,在同一地址下的 socket 传递时,各自都额外维护一个关联列表存储这些 socket 信息和 ChildProcess 实例,并且父进程中的 net#Server 类实例自己保存下所有父进程关联列表。在调用 net.Server#getConnections 这类方法时,遍历列表中的 ChildPorcess 实例发送内部消息,子进程列表中的对应项收到内部消息并处理返回,父进程中再结合返回结果和对应着这个 ChildProcess 类实例维护的 socket 信息,保证功能的正确性。
lib/internal/socket_list.js 这个文件中,分别定义了这两个列表类,分别名为 SocketListSend 和 SocketListReceive:
// lib/internal/socket_list.js // ... function SocketListSend(slave, key) { EventEmitter.call(this); this.key = key; this.slave = slave; } util.inherits(SocketListSend, EventEmitter); // ... function SocketListReceive(slave, key) { EventEmitter.call(this); this.connections = 0; this.key = key; this.slave = slave; // ... } util.inherits(SocketListReceive, EventEmitter);
然后在 net.Socket 句柄的序列化和反序列化过程中,将句柄和进程推入列表:
// lib/internal/child_process.js // ... const handleConversion = { // ... send: function(message, socket, options) { // ... if (socket.server) { // ... var firstTime = !this._channel.sockets.send[message.key]; var socketList = getSocketList("send", this, message.key); if (firstTime) socket.server._setupSlave(socketList); } // ... return handle; }, got: function(message, handle, emit) { var socket = new net.Socket({handle: handle}); socket.readable = socket.writable = true; if (message.key) { var socketList = getSocketList("got", this, message.key); socketList.add({ socket: socket }); } emit(socket); } } function getSocketList(type, slave, key) { // slave 对象即为当前正在创建的 ChildProcess 类实例 var sockets = slave._channel.sockets[type]; var socketList = sockets[key]; if (!socketList) { var Construct = type === "send" ? SocketListSend : SocketListReceive; socketList = sockets[key] = new Construct(slave, key); } return socketList; } // lib/net.js // ... Server.prototype._setupSlave = function(socketList) { this._usingSlaves = true; this._slaves.push(socketList); };
然后在调用具体方法时,遍历列表,结合通信来的结果,再返回:
// lib/net.js // ... Server.prototype.getConnections = function(cb) { // ... if (!this._usingSlaves) { return end(null, this._connections); } var left = this._slaves.length; var total = this._connections; function oncount(err, count) { if (err) { left = -1; return end(err); } total += count; if (--left === 0) return end(null, total); } this._slaves.forEach(function(slave) { slave.getConnections(oncount); }); }
即遍历了 _salves
当我们解析好了 net.Server#getConnections 方法后,其他类似需求方法的解决方案,其实也大同小异,思路是一致的。涉及的东西有点多,上一个简单的图示(顺序为黑,蓝,红):
最后参考:
https://github.com/nodejs/node/blob/master/lib/child_process.js
https://github.com/nodejs/node/blob/master/lib/net.js
https://github.com/nodejs/node/blob/master/lib/internal/process.js
https://github.com/nodejs/node/blob/master/lib/internal/child_process.js
https://github.com/nodejs/node/blob/master/lib/internal/socket_list.js
文章版权归作者所有,未经允许请勿转载,若此文章存在违规行为,您可以联系管理员删除。
转载请注明本文地址:https://www.ucloud.cn/yun/79380.html
摘要:通常的解决方案,便是使用中自带的模块,以模式启动多个应用实例。最后中的模块除了上述提到的功能外,其实还提供了非常丰富的供和进程之前通信,对于不同的操作系统平台,也提供了不同的默认行为。如果大家有闲,非常推荐完整领略一下模块的代码实现。 众所周知,Node.js中的JavaScript代码执行在单线程中,非常脆弱,一旦出现了未捕获的异常,那么整个应用就会崩溃。这在许多场景下,尤其是web...
摘要:进程间通信的目的是为了让不同的进程能够互相访问资源,并进程协调工作。这个过程的示意图如下端口共同监听集群稳定之路进程事件自动重启负载均衡状态共享模块工作原理事件二测试单元测试性能测试三产品化项目工程化部署流程性能日志监控报警稳定性异构共存 内容 9.玩转进程10.测试11.产品化 一、玩转进程 node的单线程只不过是js层面的单线程,是基于V8引擎的单线程,因为,V8的缘故,前后...
摘要:第二种是主进程创建监听后发送给感兴趣的工作进程,由工作进程负责直接接收连接。继续看,可以看到它捕获了事件,并在回调函数里面关闭连接,关闭本身进程,断开与的通道。参考与引用多进程模型和进程间通讯源码解析之 前言 最近用Egg作为底层框架开发项目,好奇其多进程模型的管理实现,于是学习了解了一些东西,顺便记录下来。文章如有错误, 请轻喷 为什么需要多进程 伴随科技的发展, 现在的服务器基本上...
摘要:而在进程执行把进程添加到调度器中时添加了一个回调函数,回调函数了一个带的消息,并且为,就是这个消息触发了发送的函数的执行。 最近做了点nodejs项目,对nodejs的cluster怎么利用多进程处理请求产生了疑问,于是着手进行了研究,之后发现这其中竟大有文章!一切还是先从遥远的TCP说起吧。。。 TCP与Socket 说到TCP,相信很多人都相当了解了,大学已经教过,但是又相信有很多...
阅读 1212·2021-09-26 09:55
阅读 3156·2019-08-30 15:55
阅读 947·2019-08-30 15:53
阅读 2284·2019-08-30 13:59
阅读 2367·2019-08-29 13:08
阅读 1097·2019-08-29 12:19
阅读 3289·2019-08-26 13:41
阅读 409·2019-08-26 13:24