'use strict'; const debug = require('debug')('cluster-client#leader'); const co = require('co'); const is = require('is-type-of'); const Base = require('sdk-base'); const utils = require('./utils'); const random = require('utility').random; const ClusterServer = require('./server'); const Connection = require('./connection'); const Request = require('./protocol/request'); const Response = require('./protocol/response'); class Leader extends Base { /** * The Leader hold the real client * * @param {Object} options * - {String} name - client name, default is the class name * - {ClusterServer} server - the cluster server * - {Boolean} isBroadcast - whether broadcast subscrption result to all followers or just one, default is true * - {Number} heartbeatInterval - the heartbeat interval * - {Function} createRealClient - to create the real client * @constructor */ constructor(options) { super(options); this._connections = new Map(); this._subListeners = new Map(); // subscribe key => listener this._subConnMap = new Map(); // subscribe key => conn key this._subData = new Map(); // local socket server this._server = this.options.server; this._transcode = this.options.transcode; this._isReady = false; this._closeByUser = false; // the real client this._realClient = this.options.createRealClient(); this._subscribeMethodName = this._findMethodName('subscribe'); this._publishMethodName = this._findMethodName('publish'); // event delegate utils.delegateEvents(this._realClient, this); if (is.function(this._realClient.ready)) { this._realClient.ready(err => { if (err) { this.ready(err); } else { this._isReady = true; this.ready(true); } }); } else { this._isReady = true; this.ready(true); } this._handleConnection = this._handleConnection.bind(this); // subscribe its own channel this._server.on(`${this.options.name}_connection`, this._handleConnection); this._server.once('close', () => { this.emit('server_closed'); }); this.on('server_closed', () => { this._handleClose().catch(err => { this.emit('error', err); }); }); // maxIdleTime is 3 times of heartbeatInterval const heartbeatInterval = this.options.heartbeatInterval; const maxIdleTime = this.options.heartbeatInterval * 3; this._heartbeatTimer = setInterval(() => { const now = Date.now(); for (const conn of this._connections.values()) { const dur = now - conn.lastActiveTime; if (dur > maxIdleTime) { const err = new Error(`client no response in ${dur}ms exceeding maxIdleTime ${maxIdleTime}ms, maybe the connection is close on other side.`); err.name = 'ClusterClientNoResponseError'; conn.close(err); } } }, heartbeatInterval); } get isLeader() { return true; } get logger() { return this.options.logger; } formatKey(reg) { return '$$inner$$__' + this.options.formatKey(reg); } subscribe(reg, listener) { const transcode = this._transcode; const conn = Object.create(Base.prototype, { isMock: { value: true }, key: { value: `${this.options.name}_mock_conn_${utils.nextId()}` }, lastActiveTime: { get() { return Date.now(); }, }, listener: { get() { return listener; }, }, send: { value(req) { const result = transcode.decode(req.data); process.nextTick(() => { listener(result); }); }, }, close: { value() {} }, }); conn.once('close', () => { this._connections.delete(conn.key); for (const connKeySet of this._subConnMap.values()) { connKeySet.delete(conn.key); } }); this._connections.set(conn.key, conn); this._doSubscribe(reg, conn); } unSubscribe(reg, listener) { const key = this.formatKey(reg); const connKeySet = this._subConnMap.get(key) || new Set(); const newConnKeySet = new Set(); for (const connKey of connKeySet.values()) { const conn = this._connections.get(connKey); if (!conn) { continue; } if (conn.isMock && (!listener || conn.listener === listener)) { this._connections.delete(connKey); continue; } newConnKeySet.add(connKey); } this._subConnMap.set(key, newConnKeySet); } publish(reg) { this._realClient[this._publishMethodName](reg); } invoke(methodName, args, callback) { let method = this._realClient[methodName]; // compatible with generatorFunction if (is.generatorFunction(method)) { method = co.wrap(method); } args.push(callback); const ret = method.apply(this._realClient, args); if (callback && is.promise(ret)) { ret.then(result => callback(null, result), err => callback(err)) // to avoid uncaught exception in callback function, then cause unhandledRejection .catch(err => { this._errorHandler(err); }); } } _doSubscribe(reg, conn) { const key = this.formatKey(reg); const callback = err => { if (err) { this._errorHandler(err); } }; const isBroadcast = this.options.isBroadcast; const timeout = this.options.responseTimeout; const connKeySet = this._subConnMap.get(key) || new Set(); connKeySet.add(conn.key); this._subConnMap.set(key, connKeySet); // only subscribe once in cluster mode, and broadcast to all followers if (!this._subListeners.has(key)) { const listener = result => { const data = this._transcode.encode(result); this._subData.set(key, data); const connKeySet = this._subConnMap.get(key); if (!connKeySet) { return; } let keys = Array.from(connKeySet.values()); // if isBroadcast equal to false, random pick one to notify if (!isBroadcast) { keys = [ keys[random(keys.length)] ]; } for (const connKey of keys) { const conn = this._connections.get(connKey); if (conn) { debug('[Leader:%s] push subscribe data to cluster client#%s', this.options.name, connKey); conn.send(new Request({ timeout, connObj: { type: 'subscribe_result', key, }, data, }), callback); } } }; this._subListeners.set(key, listener); this._realClient[this._subscribeMethodName](reg, listener); } else if (this._subData.has(key) && isBroadcast) { conn.send(new Request({ timeout, connObj: { type: 'subscribe_result', key, }, data: this._subData.get(key), }), callback); } } _findMethodName(type) { return utils.findMethodName(this.options.descriptors, type); } // handle new socket connect _handleConnection(socket, req) { debug('[Leader:%s] socket connected, port: %d', this.options.name, socket.remotePort); const conn = new Connection({ socket, name: this.options.name, logger: this.logger, transcode: this.options.transcode, requestTimeout: this.options.requestTimeout, }); this._connections.set(conn.key, conn); conn.once('close', () => { this._connections.delete(conn.key); for (const connKeySet of this._subConnMap.values()) { connKeySet.delete(conn.key); } }); conn.on('error', err => this._errorHandler(err)); conn.on('request', (req, res) => this._handleRequest(req, res, conn)); // handle register channel request const res = new Response({ id: req.id, timeout: req.timeout, }); this._handleRequest(req, res, conn); } _handleSubscribe(req, conn) { const connObj = req.connObj || {}; this._doSubscribe(connObj.reg, conn); } _handleUnSubscribe(req, conn) { const connObj = req.connObj || {}; const key = this.formatKey(connObj.reg); const connKeySet = this._subConnMap.get(key) || new Set(); connKeySet.delete(conn.key); this._subConnMap.set(key, connKeySet); } // handle request from followers _handleRequest(req, res, conn) { const connObj = req.connObj || {}; // update last active time to make sure not kick out by leader conn.lastActiveTime = Date.now(); switch (connObj.type) { case 'subscribe': debug('[Leader:%s] received subscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key); this._handleSubscribe(req, conn); break; case 'unSubscribe': debug('[Leader:%s] received unSubscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key); this._handleUnSubscribe(req, conn); break; case 'invoke': { debug('[Leader:%s] received invoke request from follower, req: %j, conn: %s', this.options.name, req, conn.key); const argLength = connObj.argLength; const args = []; if (argLength > 0) { const data = req.data; for (let i = 0, offset = 0; i < argLength; ++i) { const len = data.readUInt32BE(offset); const arg = this._transcode.decode(data.slice(offset + 4, offset + 4 + len)); args.push(arg); offset += (4 + len); } } if (connObj.oneway) { this.invoke(connObj.method, args); } else { const startTime = Date.now(); this.invoke(connObj.method, args, (err, result) => { // no response if processing timeout, just record error if (req.timeout && Date.now() - startTime > req.timeout) { const err = new Error(`[Leader:${this.options.name}] invoke method:${connObj.method} timeout for req#${req.id}`); err.name = 'ClusterLeaderTimeoutError'; err.method = connObj.method; err.args = args; this._errorHandler(err); return; } if (err) { const data = Object.assign({ stack: err.stack, name: err.name, message: err.message, }, err); err.method = connObj.method; err.args = connObj.args; this._errorHandler(err); res.connObj = { type: 'invoke_result', success: false, }; res.data = this._transcode.encode(data); } else { debug('[Leader:%s] send method:%s result to follower, result: %j', this.options.name, connObj.method, result); const data = this._transcode.encode(result); res.connObj = { type: 'invoke_result', success: true, }; res.data = data; } conn.send(res); }); } break; } case 'heartbeat': debug('[Leader:%s] received heartbeat request from follower, req: %j, conn: %s', this.options.name, req, conn.key); res.connObj = { type: 'heartbeat_res' }; conn.send(res); break; case 'register_channel': // make sure response after leader is ready this.ready(err => { if (err) { res.connObj = { type: 'register_channel_res' }; const data = Object.assign({ message: err.message, stack: err.stack, name: err.name, }, err); res.data = this._transcode.encode({ success: false, error: data, }); } else { res.connObj = { type: 'register_channel_res' }; res.data = this._transcode.encode({ success: true }); } conn.send(res); }); break; default: { const err = new Error(`unsupport data type: ${connObj.type}`); err.name = 'ClusterRequestTypeError'; this._errorHandler(err); break; } } } // emit error asynchronously _errorHandler(err) { setImmediate(() => { if (!this._closeByUser) { this.emit('error', err); } }); } async _handleClose() { debug('[Leader:%s] leader server is closed', this.options.name); // close the real client if (this._realClient) { const originClose = this._findMethodName('close'); if (originClose) { // support common function, generatorFunction, and function returning a promise await utils.callFn(this._realClient[originClose].bind(this._realClient)); } } clearInterval(this._heartbeatTimer); this._heartbeatTimer = null; this.emit('close'); } async close() { this._closeByUser = true; debug('[Leader:%s] try to close leader', this.options.name); // 1. stop listening to server channel this._server.removeListener(`${this.options.name}_connection`, this._handleConnection); // 2. close all mock connections for (const conn of this._connections.values()) { if (conn.isMock) { conn.emit('close'); } } // 3. close server // CANNOT close server directly by server.close(), other cluster clients may be using it this.removeAllListeners('server_closed'); await ClusterServer.close(this.options.name, this._server); // 5. close real client await this._handleClose(); } } module.exports = Leader;