'use strict'; const co = require('co'); const Base = require('./base'); const is = require('is-type-of'); const utils = require('../utils'); const SdkBase = require('sdk-base'); const random = require('utility').random; // Symbol const { logger, createClient, singleMode, } = require('../symbol'); const _instances = new Map(); class InnerClient extends SdkBase { constructor(options = {}) { super(options); this._subData = new Map(); // this._subSet = new Set(); this._subListeners = new Map(); // > this._transcode = options.transcode; this._realClient = options.createRealClient(); this._closeMethodName = utils.findMethodName(options.descriptors, 'close'); this._subscribeMethodName = utils.findMethodName(options.descriptors, 'subscribe'); this._publishMethodName = utils.findMethodName(options.descriptors, 'publish'); this._isReady = false; this._closeByUser = false; this._refCount = 1; // event delegate utils.delegateEvents(this._realClient, this); if (is.function(this._realClient.ready)) { this._realClient.ready(err => { if (err) { this.ready(err); } else { this._isReady = true; this.ready(true); } }); } else { this._isReady = true; this.ready(true); } } ref() { this._refCount++; } get isLeader() { return true; } formatKey(reg) { return '$$inner$$__' + this.options.formatKey(reg); } subscribe(reg, listener) { const key = this.formatKey(reg); const transcode = this._transcode; const isBroadcast = this.options.isBroadcast; const listeners = this._subListeners.get(key) || []; listeners.push(listener); this._subListeners.set(key, listeners); this.on(key, listener); if (!this._subSet.has(key)) { this._subSet.add(key); this._realClient[this._subscribeMethodName](reg, result => { const data = transcode.encode(result); this._subData.set(key, data); let fns = this._subListeners.get(key); if (!fns) { return; } const len = fns.length; // if isBroadcast equal to false, random pick one to notify if (!isBroadcast) { fns = [ fns[random(len)] ]; } for (const fn of fns) { fn(transcode.decode(data)); } }); } else if (this._subData.has(key) && isBroadcast) { process.nextTick(() => { const data = this._subData.get(key); listener(transcode.decode(data)); }); } } unSubscribe(reg, listener) { const key = this.formatKey(reg); if (!listener) { this._subListeners.delete(key); } else { const listeners = this._subListeners.get(key) || []; const newListeners = []; for (const fn of listeners) { if (fn === listener) { continue; } newListeners.push(fn); } this._subListeners.set(key, newListeners); } } publish(reg) { this._realClient[this._publishMethodName](reg); } invoke(methodName, args, callback) { let method = this._realClient[methodName]; // compatible with generatorFunction if (is.generatorFunction(method)) { method = co.wrap(method); } args.push(callback); const ret = method.apply(this._realClient, args); if (callback && is.promise(ret)) { ret.then(result => callback(null, result), err => callback(err)) // to avoid uncaught exception in callback function, then cause unhandledRejection .catch(err => { this._errorHandler(err); }); } } // emit error asynchronously _errorHandler(err) { setImmediate(() => { if (!this._closeByUser) { this.emit('error', err); } }); } async close() { if (this._refCount > 0) { this._refCount--; } if (this._refCount > 0) return; this._closeByUser = true; if (this._realClient) { if (this._closeMethodName) { // support common function, generatorFunction, and function returning a promise await utils.callFn(this._realClient[this._closeMethodName].bind(this._realClient)); } } this.emit('close'); } } class SingleClient extends Base { get [singleMode]() { return true; } async [createClient]() { const options = this.options; let client; if (_instances.has(options.name)) { client = _instances.get(options.name); client.ref(); return client; } client = new InnerClient(options); client.once('close', () => { _instances.delete(options.name); this[logger].info('[cluster#SingleClient] %s is closed.', options.name); }); _instances.set(options.name, client); return client; } } module.exports = SingleClient;