123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211 |
- 'use strict';
- const is = require('is-type-of');
- const symbols = require('./symbol');
- const logger = require('./default_logger');
- const transcode = require('./default_transcode');
- const SingleClient = require('./wrapper/single');
- const ClusterClient = require('./wrapper/cluster');
- const { formatKey } = require('./utils');
- const defaultOptions = {
- port: parseInt(process.env.NODE_CLUSTER_CLIENT_PORT) || 7777,
- singleMode: process.env.NODE_CLUSTER_CLIENT_SINGLE_MODE === '1',
- maxWaitTime: 30000,
- connectTimeout: parseInt(process.env.NODE_CLUSTER_CLIENT_CONNECT_TIMEOUT) || 10000,
- responseTimeout: 3000,
- heartbeatInterval: 20000,
- autoGenerate: true,
- isBroadcast: true,
- logger,
- transcode,
- formatKey,
- };
- const autoGenerateMethods = [
- 'subscribe',
- 'unSubscribe',
- 'publish',
- 'close',
- ];
- class ClientGenerator {
- /**
- * Cluster Client Generator
- *
- * @param {Function} clientClass - the client class
- * @param {Object} options
- * - {Number} responseTimeout - response timeout, default is 3000
- * - {Boolean} autoGenerate - whether generate delegate rule automatically, default is true
- * - {Boolean} isBroadcast - whether broadcast subscrption result to all followers or just one, default is true
- * - {Logger} logger - log instance
- * - {Transcode} [transcode|JSON.stringify/parse]
- * - {Function} encode - custom serialize method
- * - {Function} decode - custom deserialize method
- * - {Boolean} [isLeader|null] - specify whether current instance is leader
- * - {Number} [maxWaitTime|30000] - leader startup max time (ONLY effective on isLeader is true)
- * @constructor
- */
- constructor(clientClass, options) {
- this._clientClass = clientClass;
- this._options = Object.assign({
- name: clientClass.prototype.constructor.name,
- }, defaultOptions, options);
- // wrapper descptions
- this._descriptors = new Map();
- }
- /**
- * override the property
- *
- * @param {String} name - property name
- * @param {Object} value - property value
- * @return {ClientGenerator} self
- */
- override(name, value) {
- this._descriptors.set(name, {
- type: 'override',
- value,
- });
- return this;
- }
- /**
- * delegate methods
- *
- * @param {String} from - method name
- * @param {String} to - delegate to subscribe|publish|invoke
- * @return {ClientGenerator} self
- */
- delegate(from, to) {
- to = to || 'invoke';
- this._descriptors.set(from, {
- type: 'delegate',
- to,
- });
- return this;
- }
- /**
- * create cluster client instance
- *
- * @return {Object} instance
- */
- create(...args) {
- const clientClass = this._clientClass;
- const proto = clientClass.prototype;
- const descriptors = this._descriptors;
- // auto generate description
- if (this._options.autoGenerate) {
- this._generateDescriptors();
- }
- function createRealClient() {
- return Reflect.construct(clientClass, args);
- }
- const ClientWrapper = this._options.singleMode ? SingleClient : ClusterClient;
- const client = new ClientWrapper(Object.assign({
- createRealClient,
- descriptors: this._descriptors,
- }, this._options));
- for (const name of descriptors.keys()) {
- let value;
- const descriptor = descriptors.get(name);
- switch (descriptor.type) {
- case 'override':
- value = descriptor.value;
- break;
- case 'delegate':
- if (/^invoke|invokeOneway$/.test(descriptor.to)) {
- if (is.generatorFunction(proto[name])) {
- value = function* (...args) {
- return yield cb => { client[symbols.invoke](name, args, cb); };
- };
- } else if (is.function(proto[name])) {
- if (descriptor.to === 'invoke') {
- value = (...args) => {
- let cb;
- if (is.function(args[args.length - 1])) {
- cb = args.pop();
- }
- // whether callback or promise
- if (cb) {
- client[symbols.invoke](name, args, cb);
- } else {
- return new Promise((resolve, reject) => {
- client[symbols.invoke](name, args, function(err) {
- if (err) {
- reject(err);
- } else {
- resolve.apply(null, Array.from(arguments).slice(1));
- }
- });
- });
- }
- };
- } else {
- value = (...args) => {
- client[symbols.invoke](name, args);
- };
- }
- } else {
- throw new Error(`[ClusterClient] api: ${name} not implement in client`);
- }
- } else {
- value = client[Symbol.for(`ClusterClient#${descriptor.to}`)];
- }
- break;
- default:
- break;
- }
- Object.defineProperty(client, name, {
- value,
- writable: true,
- enumerable: true,
- configurable: true,
- });
- }
- return client;
- }
- _generateDescriptors() {
- const clientClass = this._clientClass;
- const proto = clientClass.prototype;
- const needGenerateMethods = new Set(autoGenerateMethods);
- for (const entry of this._descriptors.entries()) {
- const key = entry[0];
- const value = entry[1];
- if (needGenerateMethods.has(key) ||
- (value.type === 'delegate' && needGenerateMethods.has(value.to))) {
- needGenerateMethods.delete(key);
- }
- }
- for (const method of needGenerateMethods.values()) {
- if (is.function(proto[method])) {
- this.delegate(method, method);
- }
- }
- const keys = Reflect.ownKeys(proto)
- .filter(key => typeof key !== 'symbol' &&
- !key.startsWith('_') &&
- !this._descriptors.has(key));
- for (const key of keys) {
- const descriptor = Reflect.getOwnPropertyDescriptor(proto, key);
- if (descriptor.value &&
- (is.generatorFunction(descriptor.value) || is.asyncFunction(descriptor.value))) {
- this.delegate(key);
- }
- }
- }
- }
- module.exports = function(clientClass, options) {
- return new ClientGenerator(clientClass, options);
- };
|