single.js 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. 'use strict';
  2. const co = require('co');
  3. const Base = require('./base');
  4. const is = require('is-type-of');
  5. const utils = require('../utils');
  6. const SdkBase = require('sdk-base');
  7. const random = require('utility').random;
  8. // Symbol
  9. const {
  10. logger,
  11. createClient,
  12. singleMode,
  13. } = require('../symbol');
  14. const _instances = new Map();
  15. class InnerClient extends SdkBase {
  16. constructor(options = {}) {
  17. super(options);
  18. this._subData = new Map(); // <key, data>
  19. this._subSet = new Set();
  20. this._subListeners = new Map(); // <key, Array<Function>>
  21. this._transcode = options.transcode;
  22. this._realClient = options.createRealClient();
  23. this._closeMethodName = utils.findMethodName(options.descriptors, 'close');
  24. this._subscribeMethodName = utils.findMethodName(options.descriptors, 'subscribe');
  25. this._publishMethodName = utils.findMethodName(options.descriptors, 'publish');
  26. this._isReady = false;
  27. this._closeByUser = false;
  28. this._refCount = 1;
  29. // event delegate
  30. utils.delegateEvents(this._realClient, this);
  31. if (is.function(this._realClient.ready)) {
  32. this._realClient.ready(err => {
  33. if (err) {
  34. this.ready(err);
  35. } else {
  36. this._isReady = true;
  37. this.ready(true);
  38. }
  39. });
  40. } else {
  41. this._isReady = true;
  42. this.ready(true);
  43. }
  44. }
  45. ref() {
  46. this._refCount++;
  47. }
  48. get isLeader() {
  49. return true;
  50. }
  51. formatKey(reg) {
  52. return '$$inner$$__' + this.options.formatKey(reg);
  53. }
  54. subscribe(reg, listener) {
  55. const key = this.formatKey(reg);
  56. const transcode = this._transcode;
  57. const isBroadcast = this.options.isBroadcast;
  58. const listeners = this._subListeners.get(key) || [];
  59. listeners.push(listener);
  60. this._subListeners.set(key, listeners);
  61. this.on(key, listener);
  62. if (!this._subSet.has(key)) {
  63. this._subSet.add(key);
  64. this._realClient[this._subscribeMethodName](reg, result => {
  65. const data = transcode.encode(result);
  66. this._subData.set(key, data);
  67. let fns = this._subListeners.get(key);
  68. if (!fns) {
  69. return;
  70. }
  71. const len = fns.length;
  72. // if isBroadcast equal to false, random pick one to notify
  73. if (!isBroadcast) {
  74. fns = [ fns[random(len)] ];
  75. }
  76. for (const fn of fns) {
  77. fn(transcode.decode(data));
  78. }
  79. });
  80. } else if (this._subData.has(key) && isBroadcast) {
  81. process.nextTick(() => {
  82. const data = this._subData.get(key);
  83. listener(transcode.decode(data));
  84. });
  85. }
  86. }
  87. unSubscribe(reg, listener) {
  88. const key = this.formatKey(reg);
  89. if (!listener) {
  90. this._subListeners.delete(key);
  91. } else {
  92. const listeners = this._subListeners.get(key) || [];
  93. const newListeners = [];
  94. for (const fn of listeners) {
  95. if (fn === listener) {
  96. continue;
  97. }
  98. newListeners.push(fn);
  99. }
  100. this._subListeners.set(key, newListeners);
  101. }
  102. }
  103. publish(reg) {
  104. this._realClient[this._publishMethodName](reg);
  105. }
  106. invoke(methodName, args, callback) {
  107. let method = this._realClient[methodName];
  108. // compatible with generatorFunction
  109. if (is.generatorFunction(method)) {
  110. method = co.wrap(method);
  111. }
  112. args.push(callback);
  113. const ret = method.apply(this._realClient, args);
  114. if (callback && is.promise(ret)) {
  115. ret.then(result => callback(null, result), err => callback(err))
  116. // to avoid uncaught exception in callback function, then cause unhandledRejection
  117. .catch(err => { this._errorHandler(err); });
  118. }
  119. }
  120. // emit error asynchronously
  121. _errorHandler(err) {
  122. setImmediate(() => {
  123. if (!this._closeByUser) {
  124. this.emit('error', err);
  125. }
  126. });
  127. }
  128. async close() {
  129. if (this._refCount > 0) {
  130. this._refCount--;
  131. }
  132. if (this._refCount > 0) return;
  133. this._closeByUser = true;
  134. if (this._realClient) {
  135. if (this._closeMethodName) {
  136. // support common function, generatorFunction, and function returning a promise
  137. await utils.callFn(this._realClient[this._closeMethodName].bind(this._realClient));
  138. }
  139. }
  140. this.emit('close');
  141. }
  142. }
  143. class SingleClient extends Base {
  144. get [singleMode]() {
  145. return true;
  146. }
  147. async [createClient]() {
  148. const options = this.options;
  149. let client;
  150. if (_instances.has(options.name)) {
  151. client = _instances.get(options.name);
  152. client.ref();
  153. return client;
  154. }
  155. client = new InnerClient(options);
  156. client.once('close', () => {
  157. _instances.delete(options.name);
  158. this[logger].info('[cluster#SingleClient] %s is closed.', options.name);
  159. });
  160. _instances.set(options.name, client);
  161. return client;
  162. }
  163. }
  164. module.exports = SingleClient;