base.js 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. 'use strict';
  2. const debug = require('debug')('cluster-client');
  3. const is = require('is-type-of');
  4. const Base = require('sdk-base');
  5. const assert = require('assert');
  6. const utils = require('../utils');
  7. // Symbols
  8. const {
  9. init,
  10. logger,
  11. isReady,
  12. innerClient,
  13. subscribe,
  14. unSubscribe,
  15. publish,
  16. invoke,
  17. subInfo,
  18. pubInfo,
  19. closeHandler,
  20. close,
  21. singleMode,
  22. createClient,
  23. } = require('../symbol');
  24. class WrapperBase extends Base {
  25. /**
  26. * Share Connection among Multi-Process Mode
  27. *
  28. * @param {Object} options
  29. * - {Number} port - the port
  30. * - {Transcode} transcode - serialze / deseriaze methods
  31. * - {Boolean} isLeader - wehether is leader or follower
  32. * - {Number} maxWaitTime - leader startup max time (ONLY effective on isLeader is true)
  33. * - {Function} createRealClient - to create the real client instance
  34. * @constructor
  35. */
  36. constructor(options) {
  37. super(options);
  38. this[subInfo] = new Map();
  39. this[pubInfo] = new Map();
  40. this[init]().catch(err => { this.ready(err); });
  41. }
  42. get isClusterClientLeader() {
  43. return this[innerClient] && this[innerClient].isLeader;
  44. }
  45. get [singleMode]() {
  46. return false;
  47. }
  48. /**
  49. * log instance
  50. * @property {Logger} ClusterClient#[logger]
  51. */
  52. get [logger]() {
  53. return this.options.logger;
  54. }
  55. async [createClient]() {
  56. throw new Error('not implement');
  57. }
  58. /**
  59. * initialize, to leader or follower
  60. *
  61. * @return {void}
  62. */
  63. async [init]() {
  64. this[innerClient] = await this[createClient]();
  65. // events delegate
  66. utils.delegateEvents(this[innerClient], this);
  67. // re init when connection is close
  68. if (this[closeHandler]) {
  69. this[innerClient].on('close', this[closeHandler]);
  70. }
  71. // wait leader/follower ready
  72. await this[innerClient].ready();
  73. // subscribe all
  74. for (const registrations of this[subInfo].values()) {
  75. for (const args of registrations) {
  76. this[innerClient].subscribe(args[0], args[1]);
  77. }
  78. }
  79. // publish all
  80. for (const reg of this[pubInfo].values()) {
  81. this[innerClient].publish(reg);
  82. }
  83. if (!this[isReady]) {
  84. this[isReady] = true;
  85. this.ready(true);
  86. }
  87. }
  88. /**
  89. * do subscribe
  90. *
  91. * @param {Object} reg - subscription info
  92. * @param {Function} listener - callback function
  93. * @return {void}
  94. */
  95. [subscribe](reg, listener) {
  96. assert(is.function(listener), `[ClusterClient:${this.options.name}] subscribe(reg, listener) listener should be a function`);
  97. debug('[ClusterClient:%s] subscribe %j', this.options.name, reg);
  98. const key = this.options.formatKey(reg);
  99. const registrations = this[subInfo].get(key) || [];
  100. registrations.push([ reg, listener ]);
  101. this[subInfo].set(key, registrations);
  102. if (this[isReady]) {
  103. this[innerClient].subscribe(reg, listener);
  104. }
  105. }
  106. /**
  107. * do unSubscribe
  108. *
  109. * @param {Object} reg - subscription info
  110. * @param {Function} listener - callback function
  111. * @return {void}
  112. */
  113. [unSubscribe](reg, listener) {
  114. debug('[ClusterClient:%s] unSubscribe %j', this.options.name, reg);
  115. const key = this.options.formatKey(reg);
  116. const registrations = this[subInfo].get(key) || [];
  117. const newRegistrations = [];
  118. if (listener) {
  119. for (const arr of registrations) {
  120. if (arr[1] !== listener) {
  121. newRegistrations.push(arr);
  122. }
  123. }
  124. }
  125. this[subInfo].set(key, newRegistrations);
  126. if (this[isReady]) {
  127. this[innerClient].unSubscribe(reg, listener);
  128. }
  129. }
  130. /**
  131. * do publish
  132. *
  133. * @param {Object} reg - publish info
  134. * @return {void}
  135. */
  136. [publish](reg) {
  137. debug('[ClusterClient:%s] publish %j', this.options.name, reg);
  138. const key = this.options.formatKey(reg);
  139. this[pubInfo].set(key, reg);
  140. if (this[isReady]) {
  141. this[innerClient].publish(reg);
  142. }
  143. }
  144. /**
  145. * invoke a method asynchronously
  146. *
  147. * @param {String} method - the method name
  148. * @param {Array} args - the arguments list
  149. * @param {Function} callback - callback function
  150. * @return {void}
  151. */
  152. [invoke](method, args, callback) {
  153. if (!this[isReady]) {
  154. this.ready(err => {
  155. if (err) {
  156. callback && callback(err);
  157. return;
  158. }
  159. this[innerClient].invoke(method, args, callback);
  160. });
  161. return;
  162. }
  163. debug('[ClusterClient:%s] invoke method: %s, args: %j', this.options.name, method, args);
  164. this[innerClient].invoke(method, args, callback);
  165. }
  166. async [close]() {
  167. try {
  168. // close after ready, in case of innerClient is initializing
  169. await this.ready();
  170. } catch (err) {
  171. // ignore
  172. }
  173. const client = this[innerClient];
  174. if (client) {
  175. // prevent re-initializing
  176. if (this[closeHandler]) {
  177. client.removeListener('close', this[closeHandler]);
  178. }
  179. if (client.close) {
  180. await utils.callFn(client.close.bind(client));
  181. }
  182. }
  183. }
  184. }
  185. module.exports = WrapperBase;