server.js 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. 'use strict';
  2. const debug = require('debug')('cluster-client:server');
  3. const net = require('net');
  4. const Base = require('sdk-base');
  5. const sleep = require('mz-modules/sleep');
  6. const Packet = require('./protocol/packet');
  7. // share memory in current process
  8. let serverMap;
  9. if (global.serverMap) {
  10. serverMap = global.serverMap;
  11. } else {
  12. global.serverMap = serverMap = new Map();
  13. }
  14. let typeSet;
  15. if (global.typeSet) {
  16. typeSet = global.typeSet;
  17. } else {
  18. global.typeSet = typeSet = new Set();
  19. }
  20. function claimServer(port) {
  21. return new Promise((resolve, reject) => {
  22. const server = net.createServer();
  23. server.listen({
  24. port,
  25. host: '127.0.0.1',
  26. // When exclusive is true, the handle is not shared, and attempted port sharing results in an error.
  27. exclusive: true,
  28. });
  29. function onError(err) {
  30. debug('listen %s error: %s', port, err);
  31. reject(err);
  32. }
  33. server.on('error', onError);
  34. server.on('listening', () => {
  35. server.removeListener('error', onError);
  36. debug('listen %s success', port);
  37. resolve(server);
  38. });
  39. });
  40. }
  41. function tryToConnect(port) {
  42. return new Promise(resolve => {
  43. const socket = net.connect(port, '127.0.0.1');
  44. debug('try to connecting %s', port);
  45. let success = false;
  46. socket.on('connect', () => {
  47. success = true;
  48. resolve(true);
  49. // disconnect
  50. socket.end();
  51. debug('test connected %s success, end now', port);
  52. });
  53. socket.on('error', err => {
  54. debug('test connect %s error: %s, success: %s', port, err, success);
  55. // if success before, ignore it
  56. if (success) return;
  57. resolve(false);
  58. });
  59. });
  60. }
  61. class ClusterServer extends Base {
  62. /**
  63. * Manage all TCP Connections,assign them to proper channel
  64. *
  65. * @constructor
  66. * @param {Object} options
  67. * - {net.Server} server - the server
  68. * - {Number} port - the port
  69. */
  70. constructor(options) {
  71. super();
  72. this._sockets = new Map();
  73. this._server = options.server;
  74. this._port = options.port;
  75. this._isClosed = false;
  76. this._server.on('connection', socket => this._handleSocket(socket));
  77. this._server.once('close', () => {
  78. this._isClosed = true;
  79. serverMap.delete(this._port);
  80. this.emit('close');
  81. });
  82. this._server.once('error', err => {
  83. this.emit('error', err);
  84. });
  85. }
  86. get isClosed() {
  87. return this._isClosed;
  88. }
  89. close() {
  90. return new Promise((resolve, reject) => {
  91. if (this.isClosed) return resolve();
  92. this._server.close(err => {
  93. if (err) return reject(err);
  94. resolve();
  95. });
  96. // sockets must be closed manually, otherwise server.close callback will never be called
  97. for (const socket of this._sockets.values()) {
  98. socket.destroy();
  99. }
  100. });
  101. }
  102. _handleSocket(socket) {
  103. let header;
  104. let bodyLength;
  105. let body;
  106. const server = this;
  107. const key = socket.remotePort;
  108. this._sockets.set(key, socket);
  109. function onReadable() {
  110. if (!header) {
  111. header = socket.read(24);
  112. if (!header) {
  113. return;
  114. }
  115. }
  116. if (!bodyLength) {
  117. bodyLength = header.readInt32BE(16) + header.readInt32BE(20);
  118. }
  119. body = socket.read(bodyLength);
  120. if (!body) {
  121. return;
  122. }
  123. // first packet to register to channel
  124. const packet = Packet.decode(Buffer.concat([ header, body ], 24 + bodyLength));
  125. header = null;
  126. bodyLength = null;
  127. body = null;
  128. if (packet.connObj && packet.connObj.type === 'register_channel') {
  129. const channelName = packet.connObj.channelName;
  130. const eventKey = `${channelName}_connection`;
  131. // that means leader already there
  132. if (server.listenerCount(eventKey)) {
  133. socket.removeListener('readable', onReadable);
  134. // assign to proper channel
  135. debug('new %s_connection %s connected', channelName, socket.remotePort);
  136. server.emit(`${channelName}_connection`, socket, packet);
  137. }
  138. }
  139. }
  140. socket.on('readable', onReadable);
  141. socket.once('close', () => {
  142. debug('socket %s close', key);
  143. this._sockets.delete(key);
  144. });
  145. debug('new socket %s from follower', socket.remotePort);
  146. }
  147. /**
  148. * Occupy the port
  149. *
  150. * @param {String} name - the client name
  151. * @param {Number} port - the port
  152. * @return {ClusterServer} server
  153. */
  154. static async create(name, port) {
  155. const key = `${name}@${port}`;
  156. let instance = serverMap.get(port);
  157. if (instance && !instance.isClosed) {
  158. if (typeSet.has(key)) {
  159. return null;
  160. }
  161. typeSet.add(key);
  162. return instance;
  163. }
  164. // compete for the local port, if got => leader, otherwise follower
  165. try {
  166. const server = await claimServer(port);
  167. instance = new ClusterServer({ server, port });
  168. typeSet.add(key);
  169. serverMap.set(port, instance);
  170. return instance;
  171. } catch (err) {
  172. // if exception, that mean compete for port failed, then double check
  173. instance = serverMap.get(port);
  174. if (instance && !instance.isClosed) {
  175. if (typeSet.has(key)) {
  176. return null;
  177. }
  178. typeSet.add(key);
  179. return instance;
  180. }
  181. return null;
  182. }
  183. }
  184. static async close(name, server) {
  185. const port = server._port;
  186. // remove from typeSet, so other client can occupy
  187. typeSet.delete(`${name}@${port}`);
  188. let listening = false;
  189. for (const key of typeSet.values()) {
  190. if (key.endsWith(`@${port}`)) {
  191. listening = true;
  192. break;
  193. }
  194. }
  195. // close server if no one is listening on this port any more
  196. if (!listening) {
  197. const server = serverMap.get(port);
  198. if (server) await server.close();
  199. }
  200. }
  201. /**
  202. * Wait for Leader Startup
  203. *
  204. * @param {Number} port - the port
  205. * @param {Number} timeout - the max wait time
  206. * @return {void}
  207. */
  208. static async waitFor(port, timeout) {
  209. const start = Date.now();
  210. let connect = false;
  211. while (!connect) {
  212. connect = await tryToConnect(port);
  213. // if timeout, throw error
  214. if (Date.now() - start > timeout) {
  215. throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`);
  216. }
  217. if (!connect) {
  218. await sleep(3000);
  219. }
  220. }
  221. }
  222. }
  223. module.exports = ClusterServer;