123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 |
- 'use strict';
- const debug = require('debug')('cluster-client#leader');
- const co = require('co');
- const is = require('is-type-of');
- const Base = require('sdk-base');
- const utils = require('./utils');
- const random = require('utility').random;
- const ClusterServer = require('./server');
- const Connection = require('./connection');
- const Request = require('./protocol/request');
- const Response = require('./protocol/response');
- class Leader extends Base {
- /**
- * The Leader hold the real client
- *
- * @param {Object} options
- * - {String} name - client name, default is the class name
- * - {ClusterServer} server - the cluster server
- * - {Boolean} isBroadcast - whether broadcast subscrption result to all followers or just one, default is true
- * - {Number} heartbeatInterval - the heartbeat interval
- * - {Function} createRealClient - to create the real client
- * @constructor
- */
- constructor(options) {
- super(options);
- this._connections = new Map();
- this._subListeners = new Map(); // subscribe key => listener
- this._subConnMap = new Map(); // subscribe key => conn key
- this._subData = new Map();
- // local socket server
- this._server = this.options.server;
- this._transcode = this.options.transcode;
- this._isReady = false;
- this._closeByUser = false;
- // the real client
- this._realClient = this.options.createRealClient();
- this._subscribeMethodName = this._findMethodName('subscribe');
- this._publishMethodName = this._findMethodName('publish');
- // event delegate
- utils.delegateEvents(this._realClient, this);
- if (is.function(this._realClient.ready)) {
- this._realClient.ready(err => {
- if (err) {
- this.ready(err);
- } else {
- this._isReady = true;
- this.ready(true);
- }
- });
- } else {
- this._isReady = true;
- this.ready(true);
- }
- this._handleConnection = this._handleConnection.bind(this);
- // subscribe its own channel
- this._server.on(`${this.options.name}_connection`, this._handleConnection);
- this._server.once('close', () => { this.emit('server_closed'); });
- this.on('server_closed', () => {
- this._handleClose().catch(err => { this.emit('error', err); });
- });
- // maxIdleTime is 3 times of heartbeatInterval
- const heartbeatInterval = this.options.heartbeatInterval;
- const maxIdleTime = this.options.heartbeatInterval * 3;
- this._heartbeatTimer = setInterval(() => {
- const now = Date.now();
- for (const conn of this._connections.values()) {
- const dur = now - conn.lastActiveTime;
- if (dur > maxIdleTime) {
- const err = new Error(`client no response in ${dur}ms exceeding maxIdleTime ${maxIdleTime}ms, maybe the connection is close on other side.`);
- err.name = 'ClusterClientNoResponseError';
- conn.close(err);
- }
- }
- }, heartbeatInterval);
- }
- get isLeader() {
- return true;
- }
- get logger() {
- return this.options.logger;
- }
- formatKey(reg) {
- return '$$inner$$__' + this.options.formatKey(reg);
- }
- subscribe(reg, listener) {
- const transcode = this._transcode;
- const conn = Object.create(Base.prototype, {
- isMock: { value: true },
- key: { value: `${this.options.name}_mock_conn_${utils.nextId()}` },
- lastActiveTime: {
- get() {
- return Date.now();
- },
- },
- listener: {
- get() {
- return listener;
- },
- },
- send: {
- value(req) {
- const result = transcode.decode(req.data);
- process.nextTick(() => {
- listener(result);
- });
- },
- },
- close: { value() {} },
- });
- conn.once('close', () => {
- this._connections.delete(conn.key);
- for (const connKeySet of this._subConnMap.values()) {
- connKeySet.delete(conn.key);
- }
- });
- this._connections.set(conn.key, conn);
- this._doSubscribe(reg, conn);
- }
- unSubscribe(reg, listener) {
- const key = this.formatKey(reg);
- const connKeySet = this._subConnMap.get(key) || new Set();
- const newConnKeySet = new Set();
- for (const connKey of connKeySet.values()) {
- const conn = this._connections.get(connKey);
- if (!conn) {
- continue;
- }
- if (conn.isMock && (!listener || conn.listener === listener)) {
- this._connections.delete(connKey);
- continue;
- }
- newConnKeySet.add(connKey);
- }
- this._subConnMap.set(key, newConnKeySet);
- }
- publish(reg) {
- this._realClient[this._publishMethodName](reg);
- }
- invoke(methodName, args, callback) {
- let method = this._realClient[methodName];
- // compatible with generatorFunction
- if (is.generatorFunction(method)) {
- method = co.wrap(method);
- }
- args.push(callback);
- const ret = method.apply(this._realClient, args);
- if (callback && is.promise(ret)) {
- ret.then(result => callback(null, result), err => callback(err))
- // to avoid uncaught exception in callback function, then cause unhandledRejection
- .catch(err => { this._errorHandler(err); });
- }
- }
- _doSubscribe(reg, conn) {
- const key = this.formatKey(reg);
- const callback = err => {
- if (err) {
- this._errorHandler(err);
- }
- };
- const isBroadcast = this.options.isBroadcast;
- const timeout = this.options.responseTimeout;
- const connKeySet = this._subConnMap.get(key) || new Set();
- connKeySet.add(conn.key);
- this._subConnMap.set(key, connKeySet);
- // only subscribe once in cluster mode, and broadcast to all followers
- if (!this._subListeners.has(key)) {
- const listener = result => {
- const data = this._transcode.encode(result);
- this._subData.set(key, data);
- const connKeySet = this._subConnMap.get(key);
- if (!connKeySet) {
- return;
- }
- let keys = Array.from(connKeySet.values());
- // if isBroadcast equal to false, random pick one to notify
- if (!isBroadcast) {
- keys = [ keys[random(keys.length)] ];
- }
- for (const connKey of keys) {
- const conn = this._connections.get(connKey);
- if (conn) {
- debug('[Leader:%s] push subscribe data to cluster client#%s', this.options.name, connKey);
- conn.send(new Request({
- timeout,
- connObj: {
- type: 'subscribe_result',
- key,
- },
- data,
- }), callback);
- }
- }
- };
- this._subListeners.set(key, listener);
- this._realClient[this._subscribeMethodName](reg, listener);
- } else if (this._subData.has(key) && isBroadcast) {
- conn.send(new Request({
- timeout,
- connObj: {
- type: 'subscribe_result',
- key,
- },
- data: this._subData.get(key),
- }), callback);
- }
- }
- _findMethodName(type) {
- return utils.findMethodName(this.options.descriptors, type);
- }
- // handle new socket connect
- _handleConnection(socket, req) {
- debug('[Leader:%s] socket connected, port: %d', this.options.name, socket.remotePort);
- const conn = new Connection({
- socket,
- name: this.options.name,
- logger: this.logger,
- transcode: this.options.transcode,
- requestTimeout: this.options.requestTimeout,
- });
- this._connections.set(conn.key, conn);
- conn.once('close', () => {
- this._connections.delete(conn.key);
- for (const connKeySet of this._subConnMap.values()) {
- connKeySet.delete(conn.key);
- }
- });
- conn.on('error', err => this._errorHandler(err));
- conn.on('request', (req, res) => this._handleRequest(req, res, conn));
- // handle register channel request
- const res = new Response({
- id: req.id,
- timeout: req.timeout,
- });
- this._handleRequest(req, res, conn);
- }
- _handleSubscribe(req, conn) {
- const connObj = req.connObj || {};
- this._doSubscribe(connObj.reg, conn);
- }
- _handleUnSubscribe(req, conn) {
- const connObj = req.connObj || {};
- const key = this.formatKey(connObj.reg);
- const connKeySet = this._subConnMap.get(key) || new Set();
- connKeySet.delete(conn.key);
- this._subConnMap.set(key, connKeySet);
- }
- // handle request from followers
- _handleRequest(req, res, conn) {
- const connObj = req.connObj || {};
- // update last active time to make sure not kick out by leader
- conn.lastActiveTime = Date.now();
- switch (connObj.type) {
- case 'subscribe':
- debug('[Leader:%s] received subscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
- this._handleSubscribe(req, conn);
- break;
- case 'unSubscribe':
- debug('[Leader:%s] received unSubscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
- this._handleUnSubscribe(req, conn);
- break;
- case 'invoke':
- {
- debug('[Leader:%s] received invoke request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
- const argLength = connObj.argLength;
- const args = [];
- if (argLength > 0) {
- const data = req.data;
- for (let i = 0, offset = 0; i < argLength; ++i) {
- const len = data.readUInt32BE(offset);
- const arg = this._transcode.decode(data.slice(offset + 4, offset + 4 + len));
- args.push(arg);
- offset += (4 + len);
- }
- }
- if (connObj.oneway) {
- this.invoke(connObj.method, args);
- } else {
- const startTime = Date.now();
- this.invoke(connObj.method, args, (err, result) => {
- // no response if processing timeout, just record error
- if (req.timeout && Date.now() - startTime > req.timeout) {
- const err = new Error(`[Leader:${this.options.name}] invoke method:${connObj.method} timeout for req#${req.id}`);
- err.name = 'ClusterLeaderTimeoutError';
- err.method = connObj.method;
- err.args = args;
- this._errorHandler(err);
- return;
- }
- if (err) {
- const data = Object.assign({
- stack: err.stack,
- name: err.name,
- message: err.message,
- }, err);
- err.method = connObj.method;
- err.args = connObj.args;
- this._errorHandler(err);
- res.connObj = {
- type: 'invoke_result',
- success: false,
- };
- res.data = this._transcode.encode(data);
- } else {
- debug('[Leader:%s] send method:%s result to follower, result: %j', this.options.name, connObj.method, result);
- const data = this._transcode.encode(result);
- res.connObj = {
- type: 'invoke_result',
- success: true,
- };
- res.data = data;
- }
- conn.send(res);
- });
- }
- break;
- }
- case 'heartbeat':
- debug('[Leader:%s] received heartbeat request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
- res.connObj = { type: 'heartbeat_res' };
- conn.send(res);
- break;
- case 'register_channel':
- // make sure response after leader is ready
- this.ready(err => {
- if (err) {
- res.connObj = { type: 'register_channel_res' };
- const data = Object.assign({
- message: err.message,
- stack: err.stack,
- name: err.name,
- }, err);
- res.data = this._transcode.encode({
- success: false,
- error: data,
- });
- } else {
- res.connObj = { type: 'register_channel_res' };
- res.data = this._transcode.encode({ success: true });
- }
- conn.send(res);
- });
- break;
- default:
- {
- const err = new Error(`unsupport data type: ${connObj.type}`);
- err.name = 'ClusterRequestTypeError';
- this._errorHandler(err);
- break;
- }
- }
- }
- // emit error asynchronously
- _errorHandler(err) {
- setImmediate(() => {
- if (!this._closeByUser) {
- this.emit('error', err);
- }
- });
- }
- async _handleClose() {
- debug('[Leader:%s] leader server is closed', this.options.name);
- // close the real client
- if (this._realClient) {
- const originClose = this._findMethodName('close');
- if (originClose) {
- // support common function, generatorFunction, and function returning a promise
- await utils.callFn(this._realClient[originClose].bind(this._realClient));
- }
- }
- clearInterval(this._heartbeatTimer);
- this._heartbeatTimer = null;
- this.emit('close');
- }
- async close() {
- this._closeByUser = true;
- debug('[Leader:%s] try to close leader', this.options.name);
- // 1. stop listening to server channel
- this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);
- // 2. close all mock connections
- for (const conn of this._connections.values()) {
- if (conn.isMock) {
- conn.emit('close');
- }
- }
- // 3. close server
- // CANNOT close server directly by server.close(), other cluster clients may be using it
- this.removeAllListeners('server_closed');
- await ClusterServer.close(this.options.name, this._server);
- // 5. close real client
- await this._handleClose();
- }
- }
- module.exports = Leader;
|