index.js 8.3 KB


  1. 'use strict';
  2. var cluster = require('cluster');
  3. var os = require('os');
  4. var util = require('util');
  5. var utility = require('utility');
  6. var defer = global.setImmediate || process.nextTick;
  7. module.exports = fork;
  8. /**
  9. * cluster fork
  10. *
  11. * @param {Object} [options]
  12. * - {String} exec exec file path
  13. * - {Array} [args] exec arguments
  14. * - {Array} [slaves] slave processes
  15. * - {Boolean} [silent] whether or not to send output to parent's stdio, default is `false`
  16. * - {Number} [count] worker num, defualt is `os.cpus().length`
  17. * - {Boolean} [refork] refork when disconect and unexpected exit, default is `true`
  18. * - {Boolean} [autoCoverage] auto fork with istanbul when `running_under_istanbul` env set, default is `false`
  19. * - {Boolean} [windowsHide] Hide the forked processes console window that would normally be created on Windows systems. Default: false.
  20. * @return {Cluster}
  21. */
  22. function fork(options) {
  23. if (cluster.isWorker) {
  24. return;
  25. }
  26. options = options || {};
  27. var count = options.count || os.cpus().length;
  28. var refork = options.refork !== false;
  29. var limit = options.limit || 60;
  30. var duration = options.duration || 60000; // 1 min
  31. var reforks = [];
  32. var attachedEnv = options.env || {};
  33. var newWorker;
  34. if (options.exec) {
  35. var opts = {
  36. exec: options.exec
  37. };
  38. if (options.execArgv !== undefined) {
  39. opts.execArgv = options.execArgv;
  40. }
  41. if (options.args !== undefined) {
  42. opts.args = options.args;
  43. }
  44. if (options.silent !== undefined) {
  45. opts.silent = options.silent;
  46. }
  47. if (options.windowsHide !== undefined) {
  48. opts.windowsHide = options.windowsHide;
  49. }
  50. // https://github.com/gotwarlost/istanbul#multiple-process-usage
  51. // Multiple Process under istanbul
  52. if (options.autoCoverage && process.env.running_under_istanbul) {
  53. // use coverage for forked process
  54. // disabled reporting and output for child process
  55. // enable pid in child process coverage filename
  56. var args = [
  57. 'cover', '--report', 'none', '--print', 'none', '--include-pid',
  58. opts.exec,
  59. ];
  60. if (opts.args && opts.args.length > 0) {
  61. args.push('--');
  62. args = args.concat(opts.args);
  63. }
  64. opts.exec = './node_modules/.bin/istanbul';
  65. opts.args = args;
  66. }
  67. cluster.setupMaster(opts);
  68. }
  69. var disconnects = {};
  70. var disconnectCount = 0;
  71. var unexpectedCount = 0;
  72. cluster.on('disconnect', function (worker) {
  73. var log = console[worker.disableRefork ? 'info' : 'error'];
  74. disconnectCount++;
  75. var isDead = worker.isDead && worker.isDead();
  76. var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
  77. log('[%s] [cfork:master:%s] worker:%s disconnect (%s: %s, state: %s, isDead: %s, worker.disableRefork: %s)',
  78. utility.logDate(), process.pid, worker.process.pid, propertyName, worker[propertyName],
  79. worker.state, isDead, worker.disableRefork);
  80. if (isDead) {
  81. // worker has terminated before disconnect
  82. log('[%s] [cfork:master:%s] don\'t fork, because worker:%s exit event emit before disconnect',
  83. utility.logDate(), process.pid, worker.process.pid);
  84. return;
  85. }
  86. if (worker.disableRefork) {
  87. // worker has terminated by master, like egg-cluster master will set disableRefork to true
  88. log('[%s] [cfork:master:%s] don\'t fork, because worker:%s will be kill soon',
  89. utility.logDate(), process.pid, worker.process.pid);
  90. return;
  91. }
  92. disconnects[worker.process.pid] = utility.logDate();
  93. if (allow()) {
  94. newWorker = forkWorker(worker._clusterSettings);
  95. newWorker._clusterSettings = worker._clusterSettings;
  96. log('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
  97. utility.logDate(), process.pid, newWorker.process.pid, newWorker.state);
  98. } else {
  99. log('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
  100. utility.logDate(), process.pid, refork);
  101. }
  102. });
  103. cluster.on('exit', function (worker, code, signal) {
  104. var log = console[worker.disableRefork ? 'info' : 'error'];
  105. var isExpected = !!disconnects[worker.process.pid];
  106. var isDead = worker.isDead && worker.isDead();
  107. var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
  108. log('[%s] [cfork:master:%s] worker:%s exit (code: %s, %s: %s, state: %s, isDead: %s, isExpected: %s, worker.disableRefork: %s)',
  109. utility.logDate(), process.pid, worker.process.pid, code, propertyName, worker[propertyName],
  110. worker.state, isDead, isExpected, worker.disableRefork);
  111. if (isExpected) {
  112. delete disconnects[worker.process.pid];
  113. // worker disconnect first, exit expected
  114. return;
  115. }
  116. if (worker.disableRefork) {
  117. // worker is killed by master
  118. return;
  119. }
  120. unexpectedCount++;
  121. if (allow()) {
  122. newWorker = forkWorker(worker._clusterSettings);
  123. newWorker._clusterSettings = worker._clusterSettings;
  124. log('[%s] [cfork:master:%s] new worker:%s fork (state: %s)',
  125. utility.logDate(), process.pid, newWorker.process.pid, newWorker.state);
  126. } else {
  127. log('[%s] [cfork:master:%s] don\'t fork new work (refork: %s)',
  128. utility.logDate(), process.pid, refork);
  129. }
  130. cluster.emit('unexpectedExit', worker, code, signal);
  131. });
  132. // defer to set the listeners
  133. // so you can listen this by your own
  134. defer(function () {
  135. if (process.listeners('uncaughtException').length === 0) {
  136. process.on('uncaughtException', onerror);
  137. }
  138. if (cluster.listeners('unexpectedExit').length === 0) {
  139. cluster.on('unexpectedExit', onUnexpected);
  140. }
  141. if (cluster.listeners('reachReforkLimit').length === 0) {
  142. cluster.on('reachReforkLimit', onReachReforkLimit);
  143. }
  144. });
  145. for (var i = 0; i < count; i++) {
  146. newWorker = forkWorker();
  147. newWorker._clusterSettings = cluster.settings;
  148. }
  149. // fork slaves after workers are forked
  150. if (options.slaves) {
  151. var slaves = Array.isArray(options.slaves) ? options.slaves : [options.slaves];
  152. slaves.map(normalizeSlaveConfig)
  153. .forEach(function(settings) {
  154. if (settings) {
  155. newWorker = forkWorker(settings);
  156. newWorker._clusterSettings = settings;
  157. }
  158. });
  159. }
  160. return cluster;
  161. /**
  162. * allow refork
  163. */
  164. function allow() {
  165. if (!refork) {
  166. return false;
  167. }
  168. var times = reforks.push(Date.now());
  169. if (times > limit) {
  170. reforks.shift();
  171. }
  172. var span = reforks[reforks.length - 1] - reforks[0];
  173. var canFork = reforks.length < limit || span > duration;
  174. if (!canFork) {
  175. cluster.emit('reachReforkLimit');
  176. }
  177. return canFork;
  178. }
  179. /**
  180. * uncaughtException default handler
  181. */
  182. function onerror(err) {
  183. if (!err) {
  184. return;
  185. }
  186. console.error('[%s] [cfork:master:%s] master uncaughtException: %s', utility.logDate(), process.pid, err.stack);
  187. console.error(err);
  188. console.error('(total %d disconnect, %d unexpected exit)', disconnectCount, unexpectedCount);
  189. }
  190. /**
  191. * unexpectedExit default handler
  192. */
  193. function onUnexpected(worker, code, signal) {
  194. var exitCode = worker.process.exitCode;
  195. var propertyName = worker.hasOwnProperty('exitedAfterDisconnect') ? 'exitedAfterDisconnect' : 'suicide';
  196. var err = new Error(util.format('worker:%s died unexpected (code: %s, signal: %s, %s: %s, state: %s)',
  197. worker.process.pid, exitCode, signal, propertyName, worker[propertyName], worker.state));
  198. err.name = 'WorkerDiedUnexpectedError';
  199. console.error('[%s] [cfork:master:%s] (total %d disconnect, %d unexpected exit) %s',
  200. utility.logDate(), process.pid, disconnectCount, unexpectedCount, err.stack);
  201. }
  202. /**
  203. * reachReforkLimit default handler
  204. */
  205. function onReachReforkLimit() {
  206. console.error('[%s] [cfork:master:%s] worker died too fast (total %d disconnect, %d unexpected exit)',
  207. utility.logDate(), process.pid, disconnectCount, unexpectedCount);
  208. }
  209. /**
  210. * normalize slave config
  211. */
  212. function normalizeSlaveConfig(opt) {
  213. // exec path
  214. if (typeof opt === 'string') {
  215. opt = { exec: opt };
  216. }
  217. if (!opt.exec) {
  218. return null;
  219. } else {
  220. return opt;
  221. }
  222. }
  223. /**
  224. * fork worker with certain settings
  225. */
  226. function forkWorker(settings) {
  227. if (settings) {
  228. cluster.settings = settings;
  229. cluster.setupMaster();
  230. }
  231. return cluster.fork(attachedEnv);
  232. }
  233. }