日常工作中,对 node.js 的使用都比较粗浅,趁未羊之际,来学点稍微高级的,那就先从 cluster 开始吧。
尼古拉斯张三说过,“带着问题去学习是一个比较好的方法”,所以我们也来试一试。
当初使用 cluster 时,一直好奇它是怎么做到多个子进程监听同一个端口而不冲突的,比如下面这段代码:
const cluster = require('cluster')const net = require('net')const cpus = require('os').cpus()if (cluster.isprimary) { for (let i = 0; i < cpus.length; i++) { cluster.fork() }} else { net .createserver(function (socket) { socket.on('data', function (data) { socket.write(`reply from ${process.pid}: ` + data.tostring()) }) socket.on('end', function () { console.log('close') }) socket.write('hello!\n') }) .listen(9999)}
该段代码通过父进程 fork 出了多个子进程,且这些子进程都监听了 9999 这个端口并能正常提供服务,这是如何做到的呢?我们来研究一下。【相关教程推荐:nodejs视频教程、编程教学】
准备调试环境学习 node.js 官方提供库最好的方式当然是调试一下,所以,我们先来准备一下环境。注:本文的操作系统为 macos big sur 11.6.6,其他系统请自行准备相应环境。
编译 node.js
下载 node.js 源码
git clone https://github.com/nodejs/node.git
然后在下面这两个地方加入断点,方便后面调试用:
// lib/internal/cluster/primary.jsfunction queryserver(worker, message) { debugger; // stop processing if worker already disconnecting if (worker.exitedafterdisconnect) return; ...}
// lib/internal/cluster/child.jssend(message, (reply, handle) => { debugger if (typeof obj._setserverdata === 'function') obj._setserverdata(reply.data) if (handle) { // shared listen socket shared(reply, {handle, indexeskey, index}, cb) } else { // round-robin. rr(reply, {indexeskey, index}, cb) }})
进入目录,执行
./configure --debugmake -j4
之后会生成 out/debug/node
准备 ide 环境
使用 vscode 调试,配置好 launch.json 就可以了(其他 ide 类似,请自行解决):
{ "version": "0.2.0", "configurations": [ { "name": "debug c++", "type": "cppdbg", "program": "/users/youxingzhi/ayou/node/out/debug/node", "request": "launch", "args": ["/users/youxingzhi/ayou/node/index.js"], "stopatentry": false, "cwd": "${workspacefolder}", "environment": [], "externalconsole": false, "mimode": "lldb" }, { "name": "debug node", "type": "node", "runtimeexecutable": "/users/youxingzhi/ayou/node/out/debug/node", "request": "launch", "args": ["--expose-internals", "--nolazy"], "skipfiles": [], "program": "${workspacefolder}/index.js" } ]}
其中第一个是用于调式 c++ 代码(需要安装 c/c++ 插件),第二个用于调式 js 代码。接下来就可以开始调试了,我们暂时用调式 js 代码的那个配置就好了。
cluster 源码调试准备好调试代码(为了调试而已,这里启动一个子进程就够了):
debuggerconst cluster = require('cluster')const net = require('net')if (cluster.isprimary) { debugger cluster.fork()} else { const server = net.createserver(function (socket) { socket.on('data', function (data) { socket.write(`reply from ${process.pid}: ` + data.tostring()) }) socket.on('end', function () { console.log('close') }) socket.write('hello!\n') }) debugger server.listen(9999)}
很明显,我们的程序可以分父进程和子进程这两部分来进行分析。
首先进入的是父进程:
执行 require('cluster') 时,会进入 lib/cluster.js 这个文件:
const childorprimary = 'node_unique_id' in process.env ? 'child' : 'primary'module.exports = require(`internal/cluster/${childorprimary}`)
会根据当前 process.env 上是否有 node_unique_id 来引入不同的模块,此时是没有的,所以会引入 internal/cluster/primary.js 这个模块:
...const cluster = new eventemitter();...module.exports = clusterconst handles = new safemap()cluster.isworker = falsecluster.ismaster = true // deprecated alias. must be same as isprimary.cluster.isprimary = truecluster.worker = workercluster.workers = {}cluster.settings = {}cluster.sched_none = sched_none // leave it to the operating system.cluster.sched_rr = sched_rr // primary distributes connections....cluster.schedulingpolicy = schedulingpolicycluster.setupprimary = function (options) {...}// deprecated alias must be same as setupprimarycluster.setupmaster = cluster.setupprimaryfunction setupsettingsnt(settings) {...}function createworkerprocess(id, env) { ...}function removeworker(worker) { ...}function removehandlesforworker(worker) { ...}cluster.fork = function (env) { ...}
该模块主要是在 cluster 对象上挂载了一些属性和方法,并导出,这些后面回过头再看,我们继续往下调试。往下调试会进入 if (cluster.isprimary) 分支,代码很简单,仅仅是 fork 出了一个新的子进程而已:
// lib/internal/cluster/primary.jscluster.fork = function (env) { cluster.setupprimary() const id = ++ids const workerprocess = createworkerprocess(id, env) const worker = new worker({ id: id, process: workerprocess, }) ... worker.process.on('internalmessage', internal(worker, onmessage)) process.nexttick(emitforknt, worker) cluster.workers[worker.id] = worker return worker}
cluster.setupprimary():比较简单,初始化一些参数啥的。
createworkerprocess(id, env):
// lib/internal/cluster/primary.jsfunction createworkerprocess(id, env) { const workerenv = {...process.env, ...env, node_unique_id: `${id}`} const execargv = [...cluster.settings.execargv] ... return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerenv, serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowshide: cluster.settings.windowshide, execargv: execargv, stdio: cluster.settings.stdio, gid: cluster.settings.gid, uid: cluster.settings.uid, })}
可以看到,该方法主要是通过 fork 启动了一个子进程来执行我们的 index.js,且启动子进程的时候设置了环境变量 node_unique_id,这样 index.js 中 require('cluster') 的时候,引入的就是 internal/cluster/child.js 模块了。
worker.process.on('internalmessage', internal(worker, onmessage)):监听子进程传递过来的消息并处理。
接下来就进入了子进程的逻辑:
前面说了,此时引入的是 internal/cluster/child.js 模块,我们先跳过,继续往下,执行 server.listen(9999) 时实际上是调用了 server 上的方法:
// lib/net.jsserver.prototype.listen = function (...args) { ... listenincluster( this, null, options.port | 0, 4, backlog, undefined, options.exclusive );}
可以看到,最终是调用了 listenincluster:
// lib/net.jsfunction listenincluster( server, address, port, addresstype, backlog, fd, exclusive, flags, options) { exclusive = !!exclusive if (cluster === undefined) cluster = require('cluster') if (cluster.isprimary || exclusive) { // will create a new handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addresstype, backlog, fd, flags) return } const serverquery = { address: address, port: port, addresstype: addresstype, fd: fd, flags, backlog, ...options, } // get the primary's server handle, and listen on it cluster._getserver(server, serverquery, listenonprimaryhandle) function listenonprimaryhandle(err, handle) { err = checkbinderror(err, port, handle) if (err) { const ex = exceptionwithhostport(err, 'bind', address, port) return server.emit('error', ex) } // reuse primary's server handle server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addresstype, backlog, fd, flags) }}
由于是在子进程中执行,所以最后会调用 cluster._getserver(server, serverquery, listenonprimaryhandle):
// lib/internal/cluster/child.js// 这里的 cb 就是上面的 listenonprimaryhandlecluster._getserver = function (obj, options, cb) { ... send(message, (reply, handle) => { debugger if (typeof obj._setserverdata === 'function') obj._setserverdata(reply.data) if (handle) { // shared listen socket shared(reply, {handle, indexeskey, index}, cb) } else { // round-robin. rr(reply, {indexeskey, index}, cb) } }) ...}
该函数最终会向父进程发送 queryserver 的消息,父进程处理完后会调用回调函数,回调函数中会调用 cb 即 listenonprimaryhandle。看来,listen 的逻辑是在父进程中进行的了。
接下来进入父进程:
父进程收到 queryserver 的消息后,最终会调用 queryserver 这个方法:
// lib/internal/cluster/primary.jsfunction queryserver(worker, message) { // stop processing if worker already disconnecting if (worker.exitedafterdisconnect) return const key = `${message.address}:${message.port}:${message.addresstype}:` + `${message.fd}:${message.index}` let handle = handles.get(key) if (handle === undefined) { let address = message.address // find shortest path for unix sockets because of the ~100 byte limit if ( message.port < 0 && typeof address === 'string' && process.platform !== 'win32' ) { address = path.relative(process.cwd(), address) if (message.address.length < address.length) address = message.address } // udp is exempt from round-robin connection balancing for what should // be obvious reasons: it's connectionless. there is nothing to send to // the workers except raw datagrams and that's pointless. if ( schedulingpolicy !== sched_rr || message.addresstype === 'udp4' || message.addresstype === 'udp6' ) { handle = new sharedhandle(key, address, message) } else { handle = new roundrobinhandle(key, address, message) } handles.set(key, handle) } ...}
可以看到,这里主要是对 handle 的处理,这里的 handle 指的是调度策略,分为 sharedhandle 和 roundrobinhandle,分别对应抢占式和轮询两种策略(文章最后补充部分有关于两者对比的例子)。
node.js 中默认是 roundrobinhandle 策略,可通过环境变量 node_cluster_sched_policy 来修改,取值可以为 none(sharedhandle) 或 rr(roundrobinhandle)。
sharedhandle
首先,我们来看一下 sharedhandle,由于我们这里是 tcp 协议,所以最后会通过 net._createserverhandle 创建一个 tcp 对象挂载在 handle 属性上(注意这里又有一个 handle,别搞混了):
// lib/internal/cluster/shared_handle.jsfunction sharedhandle(key, address, {port, addresstype, fd, flags}) { this.key = key this.workers = new safemap() this.handle = null this.errno = 0 let rval if (addresstype === 'udp4' || addresstype === 'udp6') rval = dgram._createsockethandle(address, port, addresstype, fd, flags) else rval = net._createserverhandle(address, port, addresstype, fd, flags) if (typeof rval === 'number') this.errno = rval else this.handle = rval}
在 createserverhandle 中除了创建 tcp 对象外,还绑定了端口和地址:
// lib/net.jsfunction createserverhandle(address, port, addresstype, fd, flags) { ... } else { handle = new tcp(tcpconstants.server); istcp = true; } if (address || port || istcp) { ... err = handle.bind6(address, port, flags); } else { err = handle.bind(address, port); } } ... return handle;}
然后,queryserver 中继续执行,会调用 add 方法,最终会将 handle 也就是 tcp 对象传递给子进程:
// lib/internal/cluster/primary.jsfunction queryserver(worker, message) { ... if (!handle.data) handle.data = message.data // set custom server data handle.add(worker, (errno, reply, handle) => { const {data} = handles.get(key) if (errno) handles.delete(key) // gives other workers a chance to retry. send( worker, { errno, key, ack: message.seq, data, ...reply, }, handle // tcp 对象 ) }) ...}
之后进入子进程:
子进程收到父进程对于 queryserver 的回复后,会调用 shared:
// lib/internal/cluster/child.js// `obj` is a net#server or a dgram#socket object.cluster._getserver = function (obj, options, cb) { ... send(message, (reply, handle) => { if (typeof obj._setserverdata === 'function') obj._setserverdata(reply.data) if (handle) { // shared listen socket shared(reply, {handle, indexeskey, index}, cb) } else { // round-robin. rr(reply, {indexeskey, index}, cb) // cb 是 listenonprimaryhandle } }) ...}
shared 中最后会调用 cb 也就是 listenonprimaryhandle:
// lib/net.jsfunction listenonprimaryhandle(err, handle) { err = checkbinderror(err, port, handle) if (err) { const ex = exceptionwithhostport(err, 'bind', address, port) return server.emit('error', ex) } // reuse primary's server handle 这里的 server 是 index.js 中 net.createserver 返回的那个对象 server._handle = handle // _listen2 sets up the listened handle, it is still named like this // to avoid breaking code that wraps this method server._listen2(address, port, addresstype, backlog, fd, flags)}
这里会把 handle 赋值给 server._handle,这里的 server 是 index.js 中 net.createserver 返回的那个对象,并调用 server._listen2,也就是 setuplistenhandle:
// lib/net.jsfunction setuplistenhandle(address, port, addresstype, backlog, fd, flags) { debug('setuplistenhandle', address, port, addresstype, backlog, fd) // if there is not yet a handle, we need to create one and bind. // in the case of a server sent via ipc, we don't need to do this. if (this._handle) { debug('setuplistenhandle: have a handle already') } else { ... } this[async_id_symbol] = getnewasyncid(this._handle) this._handle.onconnection = onconnection this._handle[owner_symbol] = this // use a backlog of 512 entries. we pass 511 to the listen() call because // the kernel does: backlogsize = roundup_pow_of_two(backlogsize + 1); // which will thus give us a backlog of 512 entries. const err = this._handle.listen(backlog || 511) if (err) { const ex = uvexceptionwithhostport(err, 'listen', address, port) this._handle.close() this._handle = null defaulttriggerasyncidscope( this[async_id_symbol], process.nexttick, emiterrornt, this, ex ) return }}
首先会执行 this._handle.onconnection = onconnection,由于客户端请求过来时会调用 this._handle(也就是 tcp 对象)上的 onconnection 方法,也就是会执行lib/net.js 中的 onconnection 方法建立连接,之后就可以通信了。为了控制篇幅,该方法就不继续往下了。
然后调用 listen 监听,注意这里参数 backlog 跟之前不同,不是表示端口,而是表示在拒绝连接之前,操作系统可以挂起的最大连接数量,也就是连接请求的排队数量。我们平时遇到的 listen eaddrinuse: address already in use 错误就是因为这行代码返回了非 0 的错误。
如果还有其他子进程,也会同样走一遍上述的步骤,不同之处是在主进程中 queryserver 时,由于已经有 handle 了,不需要再重新创建了:
function queryserver(worker, message) { debugger; // stop processing if worker already disconnecting if (worker.exitedafterdisconnect) return; const key = `${message.address}:${message.port}:${message.addresstype}:` + `${message.fd}:${message.index}`; let handle = handles.get(key); ...}
以上内容整理成流程图如下:
所谓的 sharedhandle,其实是在多个子进程中共享 tcp 对象的句柄,当客户端请求过来时,多个进程会去竞争该请求的处理权,会导致任务分配不均的问题,这也是为什么需要 roundrobinhandle 的原因。接下来继续看看这种调度方式。
roundrobinhandle
// lib/internal/cluster/round_robin_handle.jsfunction roundrobinhandle( key, address, {port, fd, flags, backlog, readableall, writableall}) { ... this.server = net.createserver(assert.fail) ... else if (port >= 0) { this.server.listen({ port, host: address, // currently, net module only supports `ipv6only` option in `flags`. ipv6only: boolean(flags & constants.uv_tcp_ipv6only), backlog, }) } ... this.server.once('listening', () => { this.handle = this.server._handle this.handle.onconnection = (err, handle) => { this.distribute(err, handle) } this.server._handle = null this.server = null })}
如上所示,roundrobinhandle 会调用 net.createserver() 创建一个 server,然后调用 listen 方法,最终会来到 setuplistenhandle:
// lib/net.jsfunction setuplistenhandle(address, port, addresstype, backlog, fd, flags) { debug('setuplistenhandle', address, port, addresstype, backlog, fd) // if there is not yet a handle, we need to create one and bind. // in the case of a server sent via ipc, we don't need to do this. if (this._handle) { debug('setuplistenhandle: have a handle already') } else { debug('setuplistenhandle: create a handle') let rval = null // try to bind to the unspecified ipv6 address, see if ipv6 is available if (!address && typeof fd !== 'number') { rval = createserverhandle(default_ipv6_addr, port, 6, fd, flags) if (typeof rval === 'number') { rval = null address = default_ipv4_addr addresstype = 4 } else { address = default_ipv6_addr addresstype = 6 } } if (rval === null) rval = createserverhandle(address, port, addresstype, fd, flags) if (typeof rval === 'number') { const error = uvexceptionwithhostport(rval, 'listen', address, port) process.nexttick(emiterrornt, this, error) return } this._handle = rval } this[async_id_symbol] = getnewasyncid(this._handle) this._handle.onconnection = onconnection this._handle[owner_symbol] = this ...}
且由于此时 this._handle 为空,会调用 createserverhandle() 生成一个 tcp 对象作为 _handle。之后就跟 sharedhandle 一样了,最后也会回到子进程:
// lib/internal/cluster/child.js// `obj` is a net#server or a dgram#socket object.cluster._getserver = function (obj, options, cb) { ... send(message, (reply, handle) => { if (typeof obj._setserverdata === 'function') obj._setserverdata(reply.data) if (handle) { // shared listen socket shared(reply, {handle, indexeskey, index}, cb) } else { // round-robin. rr(reply, {indexeskey, index}, cb) // cb 是 listenonprimaryhandle } }) ...}
不过由于 roundrobinhandle 不会传递 handle 给子进程,所以此时会执行 rr:
function rr(message, {indexeskey, index}, cb) { ... // faux handle. mimics a tcpwrap with just enough fidelity to get away // with it. fools net.server into thinking that it's backed by a real // handle. use a noop function for ref() and unref() because the control // channel is going to keep the worker alive anyway. const handle = {close, listen, ref: noop, unref: noop} if (message.sockname) { handle.getsockname = getsockname // tcp handles only. } assert(handles.has(key) === false) handles.set(key, handle) debugger cb(0, handle)}
可以看到,这里构造了一个假的 handle,然后执行 cb 也就是 listenonprimaryhandle。最终跟 sharedhandle 一样会调用 setuplistenhandle 执行 this._handle.onconnection = onconnection。
roundrobinhandle 逻辑到此就结束了,好像缺了点什么的样子。回顾下,我们给每个子进程中的 server 上都挂载了一个假的 handle,但它跟绑定了端口的 tcp 对象没有任何关系,如果客户端请求过来了,是不会执行它上面的 onconnection 方法的。之所以要这样写,估计是为了保持跟之前 sharedhandle 代码逻辑的统一。
此时,我们需要回到 roundrobinhandle,有这样一段代码:
// lib/internal/cluster/round_robin_handle.jsthis.server.once('listening', () => { this.handle = this.server._handle this.handle.onconnection = (err, handle) => { this.distribute(err, handle) } this.server._handle = null this.server = null})
在 listen 执行完后,会触发 listening 事件的回调,这里重写了 handle 上面的 onconnection。
所以,当客户端请求过来时,会调用 distribute 在多个子进程中轮询分发,这里又有一个 handle,这里的 handle 姑且理解为 clienthandle,即客户端连接的 handle,别搞混了。总之,最后会将这个 clienthandle 发送给子进程:
// lib/internal/cluster/round_robin_handle.jsroundrobinhandle.prototype.handoff = function (worker) { ... const message = { act: 'newconn', key: this.key }; // 这里的 handle 是 clienthandle sendhelper(worker.process, message, handle, (reply) => { if (reply.accepted) handle.close(); else this.distribute(0, handle); // worker is shutting down. send to another. this.handoff(worker); });};
而子进程在 require('cluster') 时,已经监听了该事件:
// lib/internal/cluster/child.jsprocess.on('internalmessage', internal(worker, onmessage))send({act: 'online'})function onmessage(message, handle) { if (message.act === 'newconn') onconnection(message, handle) else if (message.act === 'disconnect') reflectapply(_disconnect, worker, [true])}
最终也同样会走到 net.js 中的 function onconnection(err, clienthandle) 方法。这个方法第二个参数名就叫 clienthandle,这也是为什么前面的 handle 我想叫这个名字的原因。
还是用图来总结下:
跟 sharedhandle 不同的是,该调度策略中 onconnection 最开始是在主进程中触发的,然后通过轮询算法挑选一个子进程,将 clienthandle 传递给它。
为什么端口不冲突cluster 模块的调试就到此告一段落了,接下来我们来回答一下一开始的问题,为什么多个进程监听同一个端口没有报错?
网上有些文章说是因为设置了 so_reuseaddr,但其实跟这个没关系。通过上面的分析知道,不管什么调度策略,最终都只会在主进程中对 tcp 对象 bind 一次。
我们可以修改一下源代码来测试一下:
// deps/uv/src/unix/tcp.c 下面的 so_reuseaddr 改成 so_debugif (setsockopt(tcp->io_watcher.fd, sol_socket, so_reuseaddr, &on, sizeof(on)))
编译后执行发现,我们仍然可以正常使用 cluster 模块。
那这个 so_reuseaddr 到底影响的是啥呢?我们继续来研究一下。
so_reuseaddr首先,我们我们知道,下面的代码是会报错的:
const net = require('net')const server1 = net.createserver()const server2 = net.createserver()server1.listen(9999)server2.listen(9999)
但是,如果我稍微修改一下,就不会报错了:
const net = require('net')const server1 = net.createserver()const server2 = net.createserver()server1.listen(9999, '127.0.0.1')server2.listen(9999, '10.53.48.67')
原因在于 listen 时,如果不指定 address,则相当于绑定了所有地址,当两个 server 都这样做时,请求到来就不知道要给谁处理了。
我们可以类比成找对象,port 是对外貌的要求,address 是对城市的要求。现在甲乙都想要一个 port 是 1米7以上 不限城市的对象,那如果有一个 1米7以上 来自 深圳 的对象,就不知道介绍给谁了。而如果两者都指定了城市就好办多了。
那如果一个指定了 address,一个没有呢?就像下面这样:
const net = require('net')const server1 = net.createserver()const server2 = net.createserver()server1.listen(9999, '127.0.0.1')server2.listen(9999)
结果是:设置了 so_reuseaddr 可以正常运行,而修改成 so_debug 的会报错。
还是上面的例子,甲对城市没有限制,乙需要是来自 深圳 的,那当一个对象来自 深圳,我们可以选择优先介绍给乙,非 深圳 的就选择介绍给甲,这个就是 so_reuseaddr 的作用。
补充sharedhandle 和 roundrobinhandle 两种模式的对比
先准备下测试代码:
// cluster.jsconst cluster = require('cluster')const net = require('net')if (cluster.ismaster) { for (let i = 0; i < 4; i++) { cluster.fork() }} else { const server = net.createserver() server.on('connection', (socket) => { console.log(`pid: ${process.pid}!`) }) server.listen(9997)}
// client.jsconst net = require('net')for (let i = 0; i < 20; i++) { net.connect({port: 9997})}
roundrobin先执行 node cluster.js,然后执行 node client.js,会看到如下输出,可以看到没有任何一个进程的 pid 是紧挨着的。至于为什么没有一直按照一样的顺序,后面再研究一下。
pid: 42904!pid: 42906!pid: 42905!pid: 42904!pid: 42907!pid: 42905!pid: 42906!pid: 42907!pid: 42904!pid: 42905!pid: 42906!pid: 42907!pid: 42904!pid: 42905!pid: 42906!pid: 42907!pid: 42904!pid: 42905!pid: 42906!pid: 42904!
shared
先执行 node_cluster_sched_policy=none node cluster.js,则 node.js 会使用 sharedhandle,然后执行 node client.js,会看到如下输出,可以看到同一个 pid 连续输出了多次,所以这种策略会导致进程任务分配不均的现象。就像公司里有些人忙到 996,有些人天天摸鱼,这显然不是老板愿意看到的现象,所以不推荐使用。
pid: 42561!pid: 42562!pid: 42561!pid: 42562!pid: 42564!pid: 42561!pid: 42562!pid: 42563!pid: 42561!pid: 42562!pid: 42563!pid: 42564!pid: 42564!pid: 42564!pid: 42564!pid: 42564!pid: 42563!pid: 42563!pid: 42564!pid: 42563!
更多node相关知识,请访问:nodejs 教程!
以上就是一文聊聊node.js中的cluster(集群)的详细内容。