'use strict'; const debug = require('debug')('cluster-client:server'); const net = require('net'); const Base = require('sdk-base'); const sleep = require('mz-modules/sleep'); const Packet = require('./protocol/packet'); // share memory in current process let serverMap; if (global.serverMap) { serverMap = global.serverMap; } else { global.serverMap = serverMap = new Map(); } let typeSet; if (global.typeSet) { typeSet = global.typeSet; } else { global.typeSet = typeSet = new Set(); } function claimServer(port) { return new Promise((resolve, reject) => { const server = net.createServer(); server.listen({ port, host: '127.0.0.1', // When exclusive is true, the handle is not shared, and attempted port sharing results in an error. exclusive: true, }); function onError(err) { debug('listen %s error: %s', port, err); reject(err); } server.on('error', onError); server.on('listening', () => { server.removeListener('error', onError); debug('listen %s success', port); resolve(server); }); }); } function tryToConnect(port) { return new Promise(resolve => { const socket = net.connect(port, '127.0.0.1'); debug('try to connecting %s', port); let success = false; socket.on('connect', () => { success = true; resolve(true); // disconnect socket.end(); debug('test connected %s success, end now', port); }); socket.on('error', err => { debug('test connect %s error: %s, success: %s', port, err, success); // if success before, ignore it if (success) return; resolve(false); }); }); } class ClusterServer extends Base { /** * Manage all TCP Connections,assign them to proper channel * * @constructor * @param {Object} options * - {net.Server} server - the server * - {Number} port - the port */ constructor(options) { super(); this._sockets = new Map(); this._server = options.server; this._port = options.port; this._isClosed = false; this._server.on('connection', socket => this._handleSocket(socket)); this._server.once('close', () => { this._isClosed = true; serverMap.delete(this._port); this.emit('close'); }); this._server.once('error', err => { this.emit('error', err); }); } get isClosed() { return this._isClosed; } close() { return new Promise((resolve, reject) => { if (this.isClosed) return resolve(); this._server.close(err => { if (err) return reject(err); resolve(); }); // sockets must be closed manually, otherwise server.close callback will never be called for (const socket of this._sockets.values()) { socket.destroy(); } }); } _handleSocket(socket) { let header; let bodyLength; let body; const server = this; const key = socket.remotePort; this._sockets.set(key, socket); function onReadable() { if (!header) { header = socket.read(24); if (!header) { return; } } if (!bodyLength) { bodyLength = header.readInt32BE(16) + header.readInt32BE(20); } body = socket.read(bodyLength); if (!body) { return; } // first packet to register to channel const packet = Packet.decode(Buffer.concat([ header, body ], 24 + bodyLength)); header = null; bodyLength = null; body = null; if (packet.connObj && packet.connObj.type === 'register_channel') { const channelName = packet.connObj.channelName; const eventKey = `${channelName}_connection`; // that means leader already there if (server.listenerCount(eventKey)) { socket.removeListener('readable', onReadable); // assign to proper channel debug('new %s_connection %s connected', channelName, socket.remotePort); server.emit(`${channelName}_connection`, socket, packet); } } } socket.on('readable', onReadable); socket.once('close', () => { debug('socket %s close', key); this._sockets.delete(key); }); debug('new socket %s from follower', socket.remotePort); } /** * Occupy the port * * @param {String} name - the client name * @param {Number} port - the port * @return {ClusterServer} server */ static async create(name, port) { const key = `${name}@${port}`; let instance = serverMap.get(port); if (instance && !instance.isClosed) { if (typeSet.has(key)) { return null; } typeSet.add(key); return instance; } // compete for the local port, if got => leader, otherwise follower try { const server = await claimServer(port); instance = new ClusterServer({ server, port }); typeSet.add(key); serverMap.set(port, instance); return instance; } catch (err) { // if exception, that mean compete for port failed, then double check instance = serverMap.get(port); if (instance && !instance.isClosed) { if (typeSet.has(key)) { return null; } typeSet.add(key); return instance; } return null; } } static async close(name, server) { const port = server._port; // remove from typeSet, so other client can occupy typeSet.delete(`${name}@${port}`); let listening = false; for (const key of typeSet.values()) { if (key.endsWith(`@${port}`)) { listening = true; break; } } // close server if no one is listening on this port any more if (!listening) { const server = serverMap.get(port); if (server) await server.close(); } } /** * Wait for Leader Startup * * @param {Number} port - the port * @param {Number} timeout - the max wait time * @return {void} */ static async waitFor(port, timeout) { const start = Date.now(); let connect = false; while (!connect) { connect = await tryToConnect(port); // if timeout, throw error if (Date.now() - start > timeout) { throw new Error(`[ClusterClient] leader does not be active in ${timeout}ms on port:${port}`); } if (!connect) { await sleep(3000); } } } } module.exports = ClusterServer;