'use strict'; const net = require('net'); const is = require('is-type-of'); const assert = require('assert'); const Base = require('sdk-base'); const addressKey = Symbol('address'); const defaultOptions = { noDelay: true, connectTimeout: 3000, responseTimeout: 3000, heartbeatInterval: 5000, needHeartbeat: true, concurrent: 0, logger: console, }; const noop = () => {}; let seed = 0; class TCPBase extends Base { /** * A base class for tcp client with basic functions * * @param {Object} options * - {String} host - server host * - {Number} port - server port * - {Number} headerLength - length of the packet header, this field is optional, * but if you not provider, you must override getHeader method * - {Boolean} [noDelay] - whether use the Nagle algorithm or not,defaults to true * - {Number} [concurrent] - the number of concurrent packet, defaults to zero, means no limit * - {Number} [responseTimeout] - limit the maximum time for waiting a response * - {Logger} [logger] - the logger client * @class */ constructor(options) { super(); this.options = Object.assign({}, defaultOptions, options); if (!this.options.path) { assert(this.options.host, 'options.host is required'); assert(this.options.port, 'options.port is required'); } if (this.options.needHeartbeat) { assert(this.heartBeatPacket, 'heartBeatPacket getter must be implemented if needHeartbeat'); } this.clientId = ++seed; this._heartbeatTimer = null; this._socket = null; this._header = null; this._bodyLength = null; this._lastError = null; this._queue = []; this._invokes = new Map(); this[addressKey] = this.options.host + ':' + this.options.port; this._lastHeartbeatTime = 0; this._lastReceiveDataTime = 0; this._connect(); this.ready(err => { if (!err && this.options.needHeartbeat) { this._startHeartbeat(); } }); } /** * get packet header * * @return {Buffer} header */ getHeader() { return this.read(this.options.headerLength); } /* eslint-disable valid-jsdoc, no-unused-vars */ /** * get body length from header * * @param {Buffer} header - header data * @return {Number} bodyLength */ getBodyLength(header) { throw new Error('not implement'); } /** * return a heartbeat packet * * @property {Buffer} TCPBase#heartBeatPacket */ get heartBeatPacket() { throw new Error('not implement'); } /** * send heartbeat packet * * @return {void} */ sendHeartBeat() { this._socket.write(this.heartBeatPacket); } /** * deserialze method, leave it to implement by subclass * * @param {Buffer} buf - binary data * @return {Object} packet object */ decode(buf) { throw new Error('not implement'); } /* eslint-enable valid-jsdoc, no-unused-vars */ /** * if the connection is writable, also including flow control logic * * @property {Boolean} TCPBase#_writable */ get _writable() { if (this.options.concurrent && this._invokes.size >= this.options.concurrent) { return false; } return this.isOK; } /** * if the connection is healthy or not * * @property {Boolean} TCPBase#isOK */ get isOK() { return this._socket && this._socket.writable; } /** * remote address * * @property {String} TCPBase#address */ get address() { return this[addressKey]; } /** * logger * * @property {Logger} TCPBase#logger */ get logger() { return this.options.logger; } /** * Pulls some data out of the socket buffer and returns it. * If no data available to be read, null is returned * * @param {Number} n - to specify how much data to read * @return {Buffer} - data */ read(n) { return this._socket.read(n); } /** * send packet to server * * @param {Object} packet * - {Number} id - packet id * - {Buffer} data - binary data * - {Boolean} [oneway] - oneway or not * - {Number} [timeout] - the maximum time for waiting a response * @param {Function} [callback] - Call this function,when processing is complete, optional. * @return {void} */ send(packet, callback = noop) { if (!this._socket) { const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`); err.id = packet.id; err.data = packet.data.toString('base64'); if (packet.oneway) { err.oneway = true; callback(); this.emit('error', err); } else { callback(err); } return; } if (packet.oneway) { this._socket.write(packet.data); callback(); return; } if (!this._writable) { this._queue.push([ packet, callback ]); return; } const meta = { id: packet.id, dataLength: packet.data.length, bufferSize1: this._socket.bufferSize, bufferSize2: -1, startTime: Date.now(), endTime: -1, }; let endTime; meta.writeSuccess = this._socket.write(packet.data, () => { endTime = Date.now(); }); const timeout = packet.timeout || this.options.responseTimeout; this._invokes.set(packet.id, { meta, packet, timer: setTimeout(() => { if (!this._socket) { return; } meta.bufferSize2 = this._socket.bufferSize; meta.endTime = endTime; this._finishInvoke(packet.id); const err = new Error(`Server no response in ${timeout}ms, address#${this[addressKey]}`); err.socketMeta = meta; err.name = 'ResponseTimeoutError'; callback(err); }, timeout), callback, }); } /** * thunk style api of send(packet, callback) * * @param {Object} packet * - {Number} id - packet id * - {Buffer} data - binary data * - {Boolean} [oneway] - oneway or not * - {Number} [timeout] - the maximum time for waiting a response * @return {Function} thunk function */ sendThunk(packet) { return callback => this.send(packet, callback); } _finishInvoke(id) { this._invokes.delete(id); if (this._writable) { this._resume(); } } _errorCallback(callback, err) { if (!err) { err = new Error(`The socket was closed. (address: ${this[addressKey]})`); err.name = 'SocketCloseError'; } callback && callback(err); } // mark all invokes timeout _cleanInvokes(err) { for (const id of this._invokes.keys()) { const req = this._invokes.get(id); clearTimeout(req.timer); this._errorCallback(req.callback, err); } this._invokes.clear(); } // clean up the queue _cleanQueue(err) { let args = this._queue.pop(); while (args) { // args[0] 是packet, args[1]是callback this._errorCallback(args[1], err); args = this._queue.pop(); } } _resume() { const args = this._queue.shift(); if (args) { this.send(args[0], args[1]); } } // read data from socket,and decode it to packet object _readPacket() { if (is.nullOrUndefined(this._bodyLength)) { this._header = this.getHeader(); if (!this._header) { return false; } this._bodyLength = this.getBodyLength(this._header); } let body; if (this._bodyLength > 0) { body = this.read(this._bodyLength); if (!body) { return false; } } this._bodyLength = null; const entity = this.decode(body, this._header); // the schema of entity // { // id: 'request id', // isResponse: true, // data: {} // deserialized object // } let type = 'request'; if (!entity.hasOwnProperty('isResponse')) { entity.isResponse = this._invokes.has(entity.id); } if (entity.isResponse) { type = 'response'; const invoke = this._invokes.get(entity.id); if (invoke) { this._finishInvoke(entity.id); clearTimeout(invoke.timer); process.nextTick(() => { invoke.callback(entity.error, entity.data); }); } } if (entity.data) { process.nextTick(() => { this.emit(type, entity, this[addressKey]); }); } return true; } /** * close the socket * * @param {Error} err - the error which makes socket closed * @return {void} */ close(err) { if (!this._socket) { return Promise.resolve(); } this._socket.destroy(err); return this.await('close'); } _handleClose() { if (!this._socket) { return; } this._socket.removeAllListeners(); this._socket = null; this._cleanInvokes(this._lastError); // clean timer if (this._heartbeatTimer) { clearInterval(this._heartbeatTimer); this._heartbeatTimer = null; } this._cleanQueue(this._lastError); this.emit('close'); } _handleReadable() { this._lastReceiveDataTime = Date.now(); try { let remaining = false; do { remaining = this._readPacket(); } while (remaining); } catch (err) { this.close(err); } } _connect(done) { if (!done) { done = err => { this.ready(err ? err : true); }; } const { port, host, localAddress, localPort, family, hints, lookup, path } = this.options; const socket = this._socket = net.connect({ port, host, localAddress, localPort, family, hints, lookup, path, }); socket.setNoDelay(this.options.noDelay); socket.on('readable', () => { this._handleReadable(); }); socket.once('close', () => { this._handleClose(); }); socket.on('error', err => { err.message += ' (address: ' + this[addressKey] + ')'; this._lastError = err; if (err.code === 'ECONNRESET') { this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer'); } else { this.emit('error', err); } }); socket.setTimeout(this.options.connectTimeout, () => { const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`); err.name = 'TcpConnectionTimeoutError'; err.host = this.options.host; err.port = this.options.port; this.close(err); }); socket.once('connect', () => { // set timeout back to zero after connected socket.setTimeout(0); this.emit('connect'); }); Promise.race([ this.await('connect'), this.await('error'), ]).then(done, done); } _startHeartbeat() { this._heartbeatTimer = setInterval(() => { const duration = this._lastHeartbeatTime - this._lastReceiveDataTime; if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) { const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`); err.name = 'ServerNoResponseError'; this.close(err); return; } // flow control if (this._invokes.size > 0 || !this.isOK) { return; } this._lastHeartbeatTime = Date.now(); this.sendHeartBeat(); }, this.options.heartbeatInterval); } } module.exports = TCPBase;