leader.js 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. 'use strict';
  2. const debug = require('debug')('cluster-client#leader');
  3. const co = require('co');
  4. const is = require('is-type-of');
  5. const Base = require('sdk-base');
  6. const utils = require('./utils');
  7. const random = require('utility').random;
  8. const ClusterServer = require('./server');
  9. const Connection = require('./connection');
  10. const Request = require('./protocol/request');
  11. const Response = require('./protocol/response');
  12. class Leader extends Base {
  13. /**
  14. * The Leader hold the real client
  15. *
  16. * @param {Object} options
  17. * - {String} name - client name, default is the class name
  18. * - {ClusterServer} server - the cluster server
  19. * - {Boolean} isBroadcast - whether broadcast subscrption result to all followers or just one, default is true
  20. * - {Number} heartbeatInterval - the heartbeat interval
  21. * - {Function} createRealClient - to create the real client
  22. * @constructor
  23. */
  24. constructor(options) {
  25. super(options);
  26. this._connections = new Map();
  27. this._subListeners = new Map(); // subscribe key => listener
  28. this._subConnMap = new Map(); // subscribe key => conn key
  29. this._subData = new Map();
  30. // local socket server
  31. this._server = this.options.server;
  32. this._transcode = this.options.transcode;
  33. this._isReady = false;
  34. this._closeByUser = false;
  35. // the real client
  36. this._realClient = this.options.createRealClient();
  37. this._subscribeMethodName = this._findMethodName('subscribe');
  38. this._publishMethodName = this._findMethodName('publish');
  39. // event delegate
  40. utils.delegateEvents(this._realClient, this);
  41. if (is.function(this._realClient.ready)) {
  42. this._realClient.ready(err => {
  43. if (err) {
  44. this.ready(err);
  45. } else {
  46. this._isReady = true;
  47. this.ready(true);
  48. }
  49. });
  50. } else {
  51. this._isReady = true;
  52. this.ready(true);
  53. }
  54. this._handleConnection = this._handleConnection.bind(this);
  55. // subscribe its own channel
  56. this._server.on(`${this.options.name}_connection`, this._handleConnection);
  57. this._server.once('close', () => { this.emit('server_closed'); });
  58. this.on('server_closed', () => {
  59. this._handleClose().catch(err => { this.emit('error', err); });
  60. });
  61. // maxIdleTime is 3 times of heartbeatInterval
  62. const heartbeatInterval = this.options.heartbeatInterval;
  63. const maxIdleTime = this.options.heartbeatInterval * 3;
  64. this._heartbeatTimer = setInterval(() => {
  65. const now = Date.now();
  66. for (const conn of this._connections.values()) {
  67. const dur = now - conn.lastActiveTime;
  68. if (dur > maxIdleTime) {
  69. const err = new Error(`client no response in ${dur}ms exceeding maxIdleTime ${maxIdleTime}ms, maybe the connection is close on other side.`);
  70. err.name = 'ClusterClientNoResponseError';
  71. conn.close(err);
  72. }
  73. }
  74. }, heartbeatInterval);
  75. }
  76. get isLeader() {
  77. return true;
  78. }
  79. get logger() {
  80. return this.options.logger;
  81. }
  82. formatKey(reg) {
  83. return '$$inner$$__' + this.options.formatKey(reg);
  84. }
  85. subscribe(reg, listener) {
  86. const transcode = this._transcode;
  87. const conn = Object.create(Base.prototype, {
  88. isMock: { value: true },
  89. key: { value: `${this.options.name}_mock_conn_${utils.nextId()}` },
  90. lastActiveTime: {
  91. get() {
  92. return Date.now();
  93. },
  94. },
  95. listener: {
  96. get() {
  97. return listener;
  98. },
  99. },
  100. send: {
  101. value(req) {
  102. const result = transcode.decode(req.data);
  103. process.nextTick(() => {
  104. listener(result);
  105. });
  106. },
  107. },
  108. close: { value() {} },
  109. });
  110. conn.once('close', () => {
  111. this._connections.delete(conn.key);
  112. for (const connKeySet of this._subConnMap.values()) {
  113. connKeySet.delete(conn.key);
  114. }
  115. });
  116. this._connections.set(conn.key, conn);
  117. this._doSubscribe(reg, conn);
  118. }
  119. unSubscribe(reg, listener) {
  120. const key = this.formatKey(reg);
  121. const connKeySet = this._subConnMap.get(key) || new Set();
  122. const newConnKeySet = new Set();
  123. for (const connKey of connKeySet.values()) {
  124. const conn = this._connections.get(connKey);
  125. if (!conn) {
  126. continue;
  127. }
  128. if (conn.isMock && (!listener || conn.listener === listener)) {
  129. this._connections.delete(connKey);
  130. continue;
  131. }
  132. newConnKeySet.add(connKey);
  133. }
  134. this._subConnMap.set(key, newConnKeySet);
  135. }
  136. publish(reg) {
  137. this._realClient[this._publishMethodName](reg);
  138. }
  139. invoke(methodName, args, callback) {
  140. let method = this._realClient[methodName];
  141. // compatible with generatorFunction
  142. if (is.generatorFunction(method)) {
  143. method = co.wrap(method);
  144. }
  145. args.push(callback);
  146. const ret = method.apply(this._realClient, args);
  147. if (callback && is.promise(ret)) {
  148. ret.then(result => callback(null, result), err => callback(err))
  149. // to avoid uncaught exception in callback function, then cause unhandledRejection
  150. .catch(err => { this._errorHandler(err); });
  151. }
  152. }
  153. _doSubscribe(reg, conn) {
  154. const key = this.formatKey(reg);
  155. const callback = err => {
  156. if (err) {
  157. this._errorHandler(err);
  158. }
  159. };
  160. const isBroadcast = this.options.isBroadcast;
  161. const timeout = this.options.responseTimeout;
  162. const connKeySet = this._subConnMap.get(key) || new Set();
  163. connKeySet.add(conn.key);
  164. this._subConnMap.set(key, connKeySet);
  165. // only subscribe once in cluster mode, and broadcast to all followers
  166. if (!this._subListeners.has(key)) {
  167. const listener = result => {
  168. const data = this._transcode.encode(result);
  169. this._subData.set(key, data);
  170. const connKeySet = this._subConnMap.get(key);
  171. if (!connKeySet) {
  172. return;
  173. }
  174. let keys = Array.from(connKeySet.values());
  175. // if isBroadcast equal to false, random pick one to notify
  176. if (!isBroadcast) {
  177. keys = [ keys[random(keys.length)] ];
  178. }
  179. for (const connKey of keys) {
  180. const conn = this._connections.get(connKey);
  181. if (conn) {
  182. debug('[Leader:%s] push subscribe data to cluster client#%s', this.options.name, connKey);
  183. conn.send(new Request({
  184. timeout,
  185. connObj: {
  186. type: 'subscribe_result',
  187. key,
  188. },
  189. data,
  190. }), callback);
  191. }
  192. }
  193. };
  194. this._subListeners.set(key, listener);
  195. this._realClient[this._subscribeMethodName](reg, listener);
  196. } else if (this._subData.has(key) && isBroadcast) {
  197. conn.send(new Request({
  198. timeout,
  199. connObj: {
  200. type: 'subscribe_result',
  201. key,
  202. },
  203. data: this._subData.get(key),
  204. }), callback);
  205. }
  206. }
  207. _findMethodName(type) {
  208. return utils.findMethodName(this.options.descriptors, type);
  209. }
  210. // handle new socket connect
  211. _handleConnection(socket, req) {
  212. debug('[Leader:%s] socket connected, port: %d', this.options.name, socket.remotePort);
  213. const conn = new Connection({
  214. socket,
  215. name: this.options.name,
  216. logger: this.logger,
  217. transcode: this.options.transcode,
  218. requestTimeout: this.options.requestTimeout,
  219. });
  220. this._connections.set(conn.key, conn);
  221. conn.once('close', () => {
  222. this._connections.delete(conn.key);
  223. for (const connKeySet of this._subConnMap.values()) {
  224. connKeySet.delete(conn.key);
  225. }
  226. });
  227. conn.on('error', err => this._errorHandler(err));
  228. conn.on('request', (req, res) => this._handleRequest(req, res, conn));
  229. // handle register channel request
  230. const res = new Response({
  231. id: req.id,
  232. timeout: req.timeout,
  233. });
  234. this._handleRequest(req, res, conn);
  235. }
  236. _handleSubscribe(req, conn) {
  237. const connObj = req.connObj || {};
  238. this._doSubscribe(connObj.reg, conn);
  239. }
  240. _handleUnSubscribe(req, conn) {
  241. const connObj = req.connObj || {};
  242. const key = this.formatKey(connObj.reg);
  243. const connKeySet = this._subConnMap.get(key) || new Set();
  244. connKeySet.delete(conn.key);
  245. this._subConnMap.set(key, connKeySet);
  246. }
  247. // handle request from followers
  248. _handleRequest(req, res, conn) {
  249. const connObj = req.connObj || {};
  250. // update last active time to make sure not kick out by leader
  251. conn.lastActiveTime = Date.now();
  252. switch (connObj.type) {
  253. case 'subscribe':
  254. debug('[Leader:%s] received subscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
  255. this._handleSubscribe(req, conn);
  256. break;
  257. case 'unSubscribe':
  258. debug('[Leader:%s] received unSubscribe request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
  259. this._handleUnSubscribe(req, conn);
  260. break;
  261. case 'invoke':
  262. {
  263. debug('[Leader:%s] received invoke request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
  264. const argLength = connObj.argLength;
  265. const args = [];
  266. if (argLength > 0) {
  267. const data = req.data;
  268. for (let i = 0, offset = 0; i < argLength; ++i) {
  269. const len = data.readUInt32BE(offset);
  270. const arg = this._transcode.decode(data.slice(offset + 4, offset + 4 + len));
  271. args.push(arg);
  272. offset += (4 + len);
  273. }
  274. }
  275. if (connObj.oneway) {
  276. this.invoke(connObj.method, args);
  277. } else {
  278. const startTime = Date.now();
  279. this.invoke(connObj.method, args, (err, result) => {
  280. // no response if processing timeout, just record error
  281. if (req.timeout && Date.now() - startTime > req.timeout) {
  282. const err = new Error(`[Leader:${this.options.name}] invoke method:${connObj.method} timeout for req#${req.id}`);
  283. err.name = 'ClusterLeaderTimeoutError';
  284. err.method = connObj.method;
  285. err.args = args;
  286. this._errorHandler(err);
  287. return;
  288. }
  289. if (err) {
  290. const data = Object.assign({
  291. stack: err.stack,
  292. name: err.name,
  293. message: err.message,
  294. }, err);
  295. err.method = connObj.method;
  296. err.args = connObj.args;
  297. this._errorHandler(err);
  298. res.connObj = {
  299. type: 'invoke_result',
  300. success: false,
  301. };
  302. res.data = this._transcode.encode(data);
  303. } else {
  304. debug('[Leader:%s] send method:%s result to follower, result: %j', this.options.name, connObj.method, result);
  305. const data = this._transcode.encode(result);
  306. res.connObj = {
  307. type: 'invoke_result',
  308. success: true,
  309. };
  310. res.data = data;
  311. }
  312. conn.send(res);
  313. });
  314. }
  315. break;
  316. }
  317. case 'heartbeat':
  318. debug('[Leader:%s] received heartbeat request from follower, req: %j, conn: %s', this.options.name, req, conn.key);
  319. res.connObj = { type: 'heartbeat_res' };
  320. conn.send(res);
  321. break;
  322. case 'register_channel':
  323. // make sure response after leader is ready
  324. this.ready(err => {
  325. if (err) {
  326. res.connObj = { type: 'register_channel_res' };
  327. const data = Object.assign({
  328. message: err.message,
  329. stack: err.stack,
  330. name: err.name,
  331. }, err);
  332. res.data = this._transcode.encode({
  333. success: false,
  334. error: data,
  335. });
  336. } else {
  337. res.connObj = { type: 'register_channel_res' };
  338. res.data = this._transcode.encode({ success: true });
  339. }
  340. conn.send(res);
  341. });
  342. break;
  343. default:
  344. {
  345. const err = new Error(`unsupport data type: ${connObj.type}`);
  346. err.name = 'ClusterRequestTypeError';
  347. this._errorHandler(err);
  348. break;
  349. }
  350. }
  351. }
  352. // emit error asynchronously
  353. _errorHandler(err) {
  354. setImmediate(() => {
  355. if (!this._closeByUser) {
  356. this.emit('error', err);
  357. }
  358. });
  359. }
  360. async _handleClose() {
  361. debug('[Leader:%s] leader server is closed', this.options.name);
  362. // close the real client
  363. if (this._realClient) {
  364. const originClose = this._findMethodName('close');
  365. if (originClose) {
  366. // support common function, generatorFunction, and function returning a promise
  367. await utils.callFn(this._realClient[originClose].bind(this._realClient));
  368. }
  369. }
  370. clearInterval(this._heartbeatTimer);
  371. this._heartbeatTimer = null;
  372. this.emit('close');
  373. }
  374. async close() {
  375. this._closeByUser = true;
  376. debug('[Leader:%s] try to close leader', this.options.name);
  377. // 1. stop listening to server channel
  378. this._server.removeListener(`${this.options.name}_connection`, this._handleConnection);
  379. // 2. close all mock connections
  380. for (const conn of this._connections.values()) {
  381. if (conn.isMock) {
  382. conn.emit('close');
  383. }
  384. }
  385. // 3. close server
  386. // CANNOT close server directly by server.close(), other cluster clients may be using it
  387. this.removeAllListeners('server_closed');
  388. await ClusterServer.close(this.options.name, this._server);
  389. // 5. close real client
  390. await this._handleClose();
  391. }
  392. }
  393. module.exports = Leader;