123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- 'use strict';
- const debug = require('debug')('cluster-client');
- const is = require('is-type-of');
- const Base = require('sdk-base');
- const assert = require('assert');
- const utils = require('../utils');
- // Symbols
- const {
- init,
- logger,
- isReady,
- innerClient,
- subscribe,
- unSubscribe,
- publish,
- invoke,
- subInfo,
- pubInfo,
- closeHandler,
- close,
- singleMode,
- createClient,
- } = require('../symbol');
- class WrapperBase extends Base {
- /**
- * Share Connection among Multi-Process Mode
- *
- * @param {Object} options
- * - {Number} port - the port
- * - {Transcode} transcode - serialze / deseriaze methods
- * - {Boolean} isLeader - wehether is leader or follower
- * - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
- * - {Function} createRealClient - to create the real client instance
- * @constructor
- */
- constructor(options) {
- super(options);
- this[subInfo] = new Map();
- this[pubInfo] = new Map();
- this[init]().catch(err => { this.ready(err); });
- }
- get isClusterClientLeader() {
- return this[innerClient] && this[innerClient].isLeader;
- }
- get [singleMode]() {
- return false;
- }
- /**
- * log instance
- * @property {Logger} ClusterClient#[logger]
- */
- get [logger]() {
- return this.options.logger;
- }
- async [createClient]() {
- throw new Error('not implement');
- }
- /**
- * initialize, to leader or follower
- *
- * @return {void}
- */
- async [init]() {
- this[innerClient] = await this[createClient]();
- // events delegate
- utils.delegateEvents(this[innerClient], this);
- // re init when connection is close
- if (this[closeHandler]) {
- this[innerClient].on('close', this[closeHandler]);
- }
- // wait leader/follower ready
- await this[innerClient].ready();
- // subscribe all
- for (const registrations of this[subInfo].values()) {
- for (const args of registrations) {
- this[innerClient].subscribe(args[0], args[1]);
- }
- }
- // publish all
- for (const reg of this[pubInfo].values()) {
- this[innerClient].publish(reg);
- }
- if (!this[isReady]) {
- this[isReady] = true;
- this.ready(true);
- }
- }
- /**
- * do subscribe
- *
- * @param {Object} reg - subscription info
- * @param {Function} listener - callback function
- * @return {void}
- */
- [subscribe](reg, listener) {
- assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);
- debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
- const key = this.options.formatKey(reg);
- const registrations = this[subInfo].get(key) || [];
- registrations.push([ reg, listener ]);
- this[subInfo].set(key, registrations);
- if (this[isReady]) {
- this[innerClient].subscribe(reg, listener);
- }
- }
- /**
- * do unSubscribe
- *
- * @param {Object} reg - subscription info
- * @param {Function} listener - callback function
- * @return {void}
- */
- [unSubscribe](reg, listener) {
- debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
- const key = this.options.formatKey(reg);
- const registrations = this[subInfo].get(key) || [];
- const newRegistrations = [];
- if (listener) {
- for (const arr of registrations) {
- if (arr[1] !== listener) {
- newRegistrations.push(arr);
- }
- }
- }
- this[subInfo].set(key, newRegistrations);
- if (this[isReady]) {
- this[innerClient].unSubscribe(reg, listener);
- }
- }
- /**
- * do publish
- *
- * @param {Object} reg - publish info
- * @return {void}
- */
- [publish](reg) {
- debug('[ClusterClient:%s] publish %j', this.options.name, reg);
- const key = this.options.formatKey(reg);
- this[pubInfo].set(key, reg);
- if (this[isReady]) {
- this[innerClient].publish(reg);
- }
- }
- /**
- * invoke a method asynchronously
- *
- * @param {String} method - the method name
- * @param {Array} args - the arguments list
- * @param {Function} callback - callback function
- * @return {void}
- */
- [invoke](method, args, callback) {
- if (!this[isReady]) {
- this.ready(err => {
- if (err) {
- callback && callback(err);
- return;
- }
- this[innerClient].invoke(method, args, callback);
- });
- return;
- }
- debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args);
- this[innerClient].invoke(method, args, callback);
- }
- async [close]() {
- try {
- // close after ready, in case of innerClient is initializing
- await this.ready();
- } catch (err) {
- // ignore
- }
- const client = this[innerClient];
- if (client) {
- // prevent re-initializing
- if (this[closeHandler]) {
- client.removeListener('close', this[closeHandler]);
- }
- if (client.close) {
- await utils.callFn(client.close.bind(client));
- }
- }
- }
- }
- module.exports = WrapperBase;
|