|
- 'use strict';
- const os = require('os');
- const v8 = require('v8');
- const util = require('util');
- const path = require('path');
- const fs = require('fs');
- const cluster = require('cluster');
- const EventEmitter = require('events');
- const childprocess = require('child_process');
- const cfork = require('cfork');
- const ready = require('get-ready');
- const GetFreePort = require('detect-port');
- const ConsoleLogger = require('egg-logger').EggConsoleLogger;
- const utility = require('utility');
- const semver = require('semver');
- const co = require('co');
- const { mkdirp } = require('mz-modules');
- const Manager = require('./utils/manager');
- const parseOptions = require('./utils/options');
- const Messenger = require('./utils/messenger');
- const terminate = require('./utils/terminate');
- const PROTOCOL = Symbol('Master#protocol');
- const REAL_PORT = Symbol('Master#real_port');
- const APP_ADDRESS = Symbol('Master#appAddress');
- class Master extends EventEmitter {
- /**
- * @class
- * @param {Object} options
- * - {String} [framework] - specify framework that can be absolute path or npm package
- * - {String} [baseDir] directory of application, default to `process.cwd()`
- * - {Object} [plugins] - customized plugins, for unittest
- * - {Number} [workers] numbers of app workers, default to `os.cpus().length`
- * - {Number} [port] listening port, default to 7001(http) or 8443(https)
- * - {Object} [https] https options, { key, cert, ca }, full path
- * - {Array|String} [require] will inject into worker/agent process
- * - {String} [pidFile] will save master pid to this file
- */
- constructor(options) {
- super();
- this.options = parseOptions(options);
- this.workerManager = new Manager();
- this.messenger = new Messenger(this);
- ready.mixin(this);
- this.isProduction = isProduction();
- this.agentWorkerIndex = 0;
- this.closed = false;
- this[REAL_PORT] = this.options.port;
- this[PROTOCOL] = this.options.https ? 'https' : 'http';
- // app started or not
- this.isStarted = false;
- this.logger = new ConsoleLogger({ level: process.env.EGG_MASTER_LOGGER_LEVEL || 'INFO' });
- this.logMethod = 'info';
- if (process.env.EGG_SERVER_ENV === 'local' || process.env.NODE_ENV === 'development') {
- this.logMethod = 'debug';
- }
- // get the real framework info
- const frameworkPath = this.options.framework;
- const frameworkPkg = utility.readJSONSync(path.join(frameworkPath, 'package.json'));
- this.log(`[master] =================== ${frameworkPkg.name} start =====================`);
- this.logger.info(`[master] node version ${process.version}`);
- /* istanbul ignore next */
- if (process.alinode) this.logger.info(`[master] alinode version ${process.alinode}`);
- this.logger.info(`[master] ${frameworkPkg.name} version ${frameworkPkg.version}`);
- if (this.isProduction) {
- this.logger.info('[master] start with options:%s%s',
- os.EOL, JSON.stringify(this.options, null, 2));
- } else {
- this.log('[master] start with options: %j', this.options);
- }
- this.log('[master] start with env: isProduction: %s, EGG_SERVER_ENV: %s, NODE_ENV: %s',
- this.isProduction, process.env.EGG_SERVER_ENV, process.env.NODE_ENV);
- const startTime = Date.now();
- this.ready(() => {
- this.isStarted = true;
- const stickyMsg = this.options.sticky ? ' with STICKY MODE!' : '';
- this.logger.info('[master] %s started on %s (%sms)%s',
- frameworkPkg.name, this[APP_ADDRESS], Date.now() - startTime, stickyMsg);
- const action = 'egg-ready';
- this.messenger.send({
- action,
- to: 'parent',
- data: {
- port: this[REAL_PORT],
- address: this[APP_ADDRESS],
- protocol: this[PROTOCOL],
- },
- });
- this.messenger.send({
- action,
- to: 'app',
- data: this.options,
- });
- this.messenger.send({
- action,
- to: 'agent',
- data: this.options,
- });
- // start check agent and worker status
- if (this.isProduction) {
- this.workerManager.startCheck();
- }
- });
- this.on('agent-exit', this.onAgentExit.bind(this));
- this.on('agent-start', this.onAgentStart.bind(this));
- this.on('app-exit', this.onAppExit.bind(this));
- this.on('app-start', this.onAppStart.bind(this));
- this.on('reload-worker', this.onReload.bind(this));
- // fork app workers after agent started
- this.once('agent-start', this.forkAppWorkers.bind(this));
- // get the real port from options and app.config
- // app worker will send after loading
- this.on('realport', ({ port, protocol }) => {
- if (port) this[REAL_PORT] = port;
- if (protocol) this[PROTOCOL] = protocol;
- });
- // https://nodejs.org/api/process.html#process_signal_events
- // https://en.wikipedia.org/wiki/Unix_signal
- // kill(2) Ctrl-C
- process.once('SIGINT', this.onSignal.bind(this, 'SIGINT'));
- // kill(3) Ctrl-\
- process.once('SIGQUIT', this.onSignal.bind(this, 'SIGQUIT'));
- // kill(15) default
- process.once('SIGTERM', this.onSignal.bind(this, 'SIGTERM'));
- process.once('exit', this.onExit.bind(this));
- // write pid to file if provided
- if (this.options.pidFile) {
- mkdirp.sync(path.dirname(this.options.pidFile));
- fs.writeFileSync(this.options.pidFile, process.pid.toString(), 'utf-8');
- }
- this.detectPorts()
- .then(() => {
- this.forkAgentWorker();
- });
- // exit when agent or worker exception
- this.workerManager.on('exception', ({
- agent,
- worker,
- }) => {
- const err = new Error(`[master] ${agent} agent and ${worker} worker(s) alive, exit to avoid unknown state`);
- err.name = 'ClusterWorkerExceptionError';
- err.count = {
- agent,
- worker,
- };
- this.logger.error(err);
- process.exit(1);
- });
- }
- detectPorts() {
- // Detect cluster client port
- return GetFreePort()
- .then(port => {
- this.options.clusterPort = port;
- // If sticky mode, detect worker port
- if (this.options.sticky) {
- return GetFreePort();
- }
- })
- .then(port => {
- if (this.options.sticky) {
- this.options.stickyWorkerPort = port;
- }
- })
- .catch(/* istanbul ignore next */ err => {
- this.logger.error(err);
- process.exit(1);
- });
- }
- log(...args) {
- this.logger[this.logMethod](...args);
- }
- get agentWorker() {
- return this.workerManager.agent;
- }
- startMasterSocketServer(cb) {
- // Create the outside facing server listening on our port.
- require('net').createServer({
- pauseOnConnect: true,
- }, connection => {
- // We received a connection and need to pass it to the appropriate
- // worker. Get the worker for this connection's source IP and pass
- // it the connection.
- /* istanbul ignore next */
- if (!connection.remoteAddress) {
- // This will happen when a client sends an RST(which is set to 1) right
- // after the three-way handshake to the server.
- // Read https://en.wikipedia.org/wiki/TCP_reset_attack for more details.
- connection.destroy();
- } else {
- const worker = this.stickyWorker(connection.remoteAddress);
- worker.send('sticky-session:connection', connection);
- }
- }).listen(this[REAL_PORT], cb);
- }
- stickyWorker(ip) {
- const workerNumbers = this.options.workers;
- const ws = this.workerManager.listWorkerIds();
- let s = '';
- for (let i = 0; i < ip.length; i++) {
- if (!isNaN(ip[i])) {
- s += ip[i];
- }
- }
- s = Number(s);
- const pid = ws[s % workerNumbers];
- return this.workerManager.getWorker(pid);
- }
- forkAgentWorker() {
- this.agentStartTime = Date.now();
- const args = [ JSON.stringify(this.options) ];
- const opt = {};
- if (process.platform === 'win32') opt.windowsHide = true;
- // add debug execArgv
- const debugPort = process.env.EGG_AGENT_DEBUG_PORT || 5800;
- if (this.options.isDebug) opt.execArgv = process.execArgv.concat([ `--${semver.gte(process.version, '8.0.0') ? 'inspect' : 'debug'}-port=${debugPort}` ]);
- const agentWorker = childprocess.fork(this.getAgentWorkerFile(), args, opt);
- agentWorker.status = 'starting';
- agentWorker.id = ++this.agentWorkerIndex;
- this.workerManager.setAgent(agentWorker);
- this.log('[master] agent_worker#%s:%s start with clusterPort:%s',
- agentWorker.id, agentWorker.pid, this.options.clusterPort);
- // send debug message
- if (this.options.isDebug) {
- this.messenger.send({
- to: 'parent',
- from: 'agent',
- action: 'debug',
- data: {
- debugPort,
- pid: agentWorker.pid,
- },
- });
- }
- // forwarding agent' message to messenger
- agentWorker.on('message', msg => {
- if (typeof msg === 'string') {
- msg = {
- action: msg,
- data: msg,
- };
- }
- msg.from = 'agent';
- this.messenger.send(msg);
- });
- agentWorker.on('error', err => {
- err.name = 'AgentWorkerError';
- err.id = agentWorker.id;
- err.pid = agentWorker.pid;
- this.logger.error(err);
- });
- // agent exit message
- agentWorker.once('exit', (code, signal) => {
- this.messenger.send({
- action: 'agent-exit',
- data: {
- code,
- signal,
- },
- to: 'master',
- from: 'agent',
- });
- });
- }
- forkAppWorkers() {
- this.appStartTime = Date.now();
- this.isAllAppWorkerStarted = false;
- this.startSuccessCount = 0;
- const args = [ JSON.stringify(this.options) ];
- this.log('[master] start appWorker with args %j', args);
- cfork({
- exec: this.getAppWorkerFile(),
- args,
- silent: false,
- count: this.options.workers,
- // don't refork in local env
- refork: this.isProduction,
- windowsHide: process.platform === 'win32',
- });
- let debugPort = process.debugPort;
- cluster.on('fork', worker => {
- worker.disableRefork = true;
- this.workerManager.setWorker(worker);
- worker.on('message', msg => {
- if (typeof msg === 'string') {
- msg = {
- action: msg,
- data: msg,
- };
- }
- msg.from = 'app';
- this.messenger.send(msg);
- });
- this.log('[master] app_worker#%s:%s start, state: %s, current workers: %j',
- worker.id, worker.process.pid, worker.state, Object.keys(cluster.workers));
- // send debug message, due to `brk` scence, send here instead of app_worker.js
- if (this.options.isDebug) {
- debugPort++;
- this.messenger.send({
- to: 'parent',
- from: 'app',
- action: 'debug',
- data: {
- debugPort,
- pid: worker.process.pid,
- },
- });
- }
- });
- cluster.on('disconnect', worker => {
- this.logger.info('[master] app_worker#%s:%s disconnect, suicide: %s, state: %s, current workers: %j',
- worker.id, worker.process.pid, worker.exitedAfterDisconnect, worker.state, Object.keys(cluster.workers));
- });
- cluster.on('exit', (worker, code, signal) => {
- this.messenger.send({
- action: 'app-exit',
- data: {
- workerPid: worker.process.pid,
- code,
- signal,
- },
- to: 'master',
- from: 'app',
- });
- });
- cluster.on('listening', (worker, address) => {
- this.messenger.send({
- action: 'app-start',
- data: {
- workerPid: worker.process.pid,
- address,
- },
- to: 'master',
- from: 'app',
- });
- });
- }
- /**
- * close agent worker, App Worker will closed by cluster
- *
- * https://www.exratione.com/2013/05/die-child-process-die/
- * make sure Agent Worker exit before master exit
- *
- * @param {number} timeout - kill agent timeout
- * @return {Promise} -
- */
- killAgentWorker(timeout) {
- const agentWorker = this.agentWorker;
- if (agentWorker) {
- this.log('[master] kill agent worker with signal SIGTERM');
- agentWorker.removeAllListeners();
- }
- return co(function* () {
- yield terminate(agentWorker, timeout);
- });
- }
- killAppWorkers(timeout) {
- return co(function* () {
- yield Object.keys(cluster.workers).map(id => {
- const worker = cluster.workers[id];
- worker.disableRefork = true;
- return terminate(worker, timeout);
- });
- });
- }
- /**
- * Agent Worker exit handler
- * Will exit during startup, and refork during running.
- * @param {Object} data
- * - {Number} code - exit code
- * - {String} signal - received signal
- */
- onAgentExit(data) {
- if (this.closed) return;
- this.messenger.send({
- action: 'egg-pids',
- to: 'app',
- data: [],
- });
- const agentWorker = this.agentWorker;
- this.workerManager.deleteAgent(this.agentWorker);
- const err = new Error(util.format('[master] agent_worker#%s:%s died (code: %s, signal: %s)',
- agentWorker.id, agentWorker.pid, data.code, data.signal));
- err.name = 'AgentWorkerDiedError';
- this.logger.error(err);
- // remove all listeners to avoid memory leak
- agentWorker.removeAllListeners();
- if (this.isStarted) {
- this.log('[master] try to start a new agent_worker after 1s ...');
- setTimeout(() => {
- this.logger.info('[master] new agent_worker starting...');
- this.forkAgentWorker();
- }, 1000);
- this.messenger.send({
- action: 'agent-worker-died',
- to: 'parent',
- });
- } else {
- this.logger.error('[master] agent_worker#%s:%s start fail, exiting with code:1',
- agentWorker.id, agentWorker.pid);
- process.exit(1);
- }
- }
- onAgentStart() {
- this.agentWorker.status = 'started';
- // Send egg-ready when agent is started after launched
- if (this.isAllAppWorkerStarted) {
- this.messenger.send({
- action: 'egg-ready',
- to: 'agent',
- data: this.options,
- });
- }
- this.messenger.send({
- action: 'egg-pids',
- to: 'app',
- data: [ this.agentWorker.pid ],
- });
- // should send current worker pids when agent restart
- if (this.isStarted) {
- this.messenger.send({
- action: 'egg-pids',
- to: 'agent',
- data: this.workerManager.getListeningWorkerIds(),
- });
- }
- this.messenger.send({
- action: 'agent-start',
- to: 'app',
- });
- this.logger.info('[master] agent_worker#%s:%s started (%sms)',
- this.agentWorker.id, this.agentWorker.pid, Date.now() - this.agentStartTime);
- }
- /**
- * App Worker exit handler
- * @param {Object} data
- * - {String} workerPid - worker id
- * - {Number} code - exit code
- * - {String} signal - received signal
- */
- onAppExit(data) {
- if (this.closed) return;
- const worker = this.workerManager.getWorker(data.workerPid);
- if (!worker.isDevReload) {
- const signal = data.signal;
- const message = util.format(
- '[master] app_worker#%s:%s died (code: %s, signal: %s, suicide: %s, state: %s), current workers: %j',
- worker.id, worker.process.pid, worker.process.exitCode, signal,
- worker.exitedAfterDisconnect, worker.state,
- Object.keys(cluster.workers)
- );
- if (this.options.isDebug && signal === 'SIGKILL') {
- // exit if died during debug
- this.logger.error(message);
- this.logger.error('[master] worker kill by debugger, exiting...');
- setTimeout(() => this.close(), 10);
- } else {
- const err = new Error(message);
- err.name = 'AppWorkerDiedError';
- this.logger.error(err);
- }
- }
- // remove all listeners to avoid memory leak
- worker.removeAllListeners();
- this.workerManager.deleteWorker(data.workerPid);
- // send message to agent with alive workers
- this.messenger.send({
- action: 'egg-pids',
- to: 'agent',
- data: this.workerManager.getListeningWorkerIds(),
- });
- if (this.isAllAppWorkerStarted) {
- // cfork will only refork at production mode
- this.messenger.send({
- action: 'app-worker-died',
- to: 'parent',
- });
- } else {
- // exit if died during startup
- this.logger.error('[master] app_worker#%s:%s start fail, exiting with code:1',
- worker.id, worker.process.pid);
- process.exit(1);
- }
- }
- /**
- * after app worker
- * @param {Object} data
- * - {String} workerPid - worker id
- * - {Object} address - server address
- */
- onAppStart(data) {
- const worker = this.workerManager.getWorker(data.workerPid);
- const address = data.address;
- // worker should listen stickyWorkerPort when sticky mode
- if (this.options.sticky) {
- if (String(address.port) !== String(this.options.stickyWorkerPort)) {
- return;
- }
- // worker should listen REALPORT when not sticky mode
- } else if (!isUnixSock(address) &&
- (String(address.port) !== String(this[REAL_PORT]))) {
- return;
- }
- // send message to agent with alive workers
- this.messenger.send({
- action: 'egg-pids',
- to: 'agent',
- data: this.workerManager.getListeningWorkerIds(),
- });
- this.startSuccessCount++;
- const remain = this.isAllAppWorkerStarted ? 0 : this.options.workers - this.startSuccessCount;
- this.log('[master] app_worker#%s:%s started at %s, remain %s (%sms)',
- worker.id, data.workerPid, address.port, remain, Date.now() - this.appStartTime);
- // Send egg-ready when app is started after launched
- if (this.isAllAppWorkerStarted) {
- this.messenger.send({
- action: 'egg-ready',
- to: 'app',
- data: this.options,
- });
- }
- // if app is started, it should enable this worker
- if (this.isAllAppWorkerStarted) {
- worker.disableRefork = false;
- }
- if (this.isAllAppWorkerStarted || this.startSuccessCount < this.options.workers) {
- return;
- }
- this.isAllAppWorkerStarted = true;
- // enable all workers when app started
- for (const id in cluster.workers) {
- const worker = cluster.workers[id];
- worker.disableRefork = false;
- }
- address.protocol = this[PROTOCOL];
- address.port = this.options.sticky ? this[REAL_PORT] : address.port;
- this[APP_ADDRESS] = getAddress(address);
- if (this.options.sticky) {
- this.startMasterSocketServer(err => {
- if (err) return this.ready(err);
- this.ready(true);
- });
- } else {
- this.ready(true);
- }
- }
- /**
- * master exit handler
- */
- onExit(code) {
- if (this.options.pidFile && fs.existsSync(this.options.pidFile)) {
- try {
- fs.unlinkSync(this.options.pidFile);
- } catch (err) {
- /* istanbul ignore next */
- this.logger.error('[master] delete pidfile %s fail with %s', this.options.pidFile, err.message);
- }
- }
- // istanbul can't cover here
- // https://github.com/gotwarlost/istanbul/issues/567
- const level = code === 0 ? 'info' : 'error';
- this.logger[level]('[master] exit with code:%s', code);
- }
- onSignal(signal) {
- if (this.closed) return;
- this.logger.info('[master] master is killed by signal %s, closing', signal);
- // logger more info
- const { used_heap_size, heap_size_limit } = v8.getHeapStatistics();
- this.logger.info('[master] system memory: total %s, free %s', os.totalmem(), os.freemem());
- this.logger.info('[master] process info: heap_limit %s, heap_used %s', heap_size_limit, used_heap_size);
- this.close();
- }
- /**
- * reload workers, for develop purpose
- */
- onReload() {
- this.log('[master] reload workers...');
- for (const id in cluster.workers) {
- const worker = cluster.workers[id];
- worker.isDevReload = true;
- }
- require('cluster-reload')(this.options.workers);
- }
- close() {
- this.closed = true;
- const self = this;
- co(function* () {
- try {
- yield self._doClose();
- self.log('[master] close done, exiting with code:0');
- process.exit(0);
- } catch (e) /* istanbul ignore next */ {
- this.logger.error('[master] close with error: ', e);
- process.exit(1);
- }
- });
- }
- getAgentWorkerFile() {
- return path.join(__dirname, 'agent_worker.js');
- }
- getAppWorkerFile() {
- return path.join(__dirname, 'app_worker.js');
- }
- * _doClose() {
- // kill app workers
- // kill agent worker
- // exit itself
- const legacyTimeout = process.env.EGG_MASTER_CLOSE_TIMEOUT || 5000;
- const appTimeout = process.env.EGG_APP_CLOSE_TIMEOUT || legacyTimeout;
- const agentTimeout = process.env.EGG_AGENT_CLOSE_TIMEOUT || legacyTimeout;
- this.logger.info('[master] send kill SIGTERM to app workers, will exit with code:0 after %sms', appTimeout);
- this.logger.info('[master] wait %sms', appTimeout);
- try {
- yield this.killAppWorkers(appTimeout);
- } catch (e) /* istanbul ignore next */ {
- this.logger.error('[master] app workers exit error: ', e);
- }
- this.logger.info('[master] send kill SIGTERM to agent worker, will exit with code:0 after %sms', agentTimeout);
- this.logger.info('[master] wait %sms', agentTimeout);
- try {
- yield this.killAgentWorker(agentTimeout);
- } catch (e) /* istanbul ignore next */ {
- this.logger.error('[master] agent worker exit error: ', e);
- }
- }
- }
- module.exports = Master;
- function isProduction() {
- const serverEnv = process.env.EGG_SERVER_ENV;
- if (serverEnv) {
- return serverEnv !== 'local' && serverEnv !== 'unittest';
- }
- return process.env.NODE_ENV === 'production';
- }
- function getAddress({
- addressType,
- address,
- port,
- protocol,
- }) {
- // unix sock
- // https://nodejs.org/api/cluster.html#cluster_event_listening_1
- if (addressType === -1) return address;
- let hostname = address;
- if (!hostname && process.env.HOST && process.env.HOST !== '0.0.0.0') {
- hostname = process.env.HOST;
- }
- if (!hostname) {
- hostname = '127.0.0.1';
- }
- return `${protocol}://${hostname}:${port}`;
- }
- function isUnixSock(address) {
- return address.addressType === -1;
- }
|