123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446 |
- 'use strict';
- const net = require('net');
- const is = require('is-type-of');
- const assert = require('assert');
- const Base = require('sdk-base');
- const addressKey = Symbol('address');
- const defaultOptions = {
- noDelay: true,
- connectTimeout: 3000,
- responseTimeout: 3000,
- heartbeatInterval: 5000,
- needHeartbeat: true,
- concurrent: 0,
- logger: console,
- };
- const noop = () => {};
- let seed = 0;
- class TCPBase extends Base {
- /**
- * A base class for tcp client with basic functions
- *
- * @param {Object} options
- * - {String} host - server host
- * - {Number} port - server port
- * - {Number} headerLength - length of the packet header, this field is optional,
- * but if you not provider, you must override getHeader method
- * - {Boolean} [noDelay] - whether use the Nagle algorithm or not,defaults to true
- * - {Number} [concurrent] - the number of concurrent packet, defaults to zero, means no limit
- * - {Number} [responseTimeout] - limit the maximum time for waiting a response
- * - {Logger} [logger] - the logger client
- * @class
- */
- constructor(options) {
- super();
- this.options = Object.assign({}, defaultOptions, options);
- if (!this.options.path) {
- assert(this.options.host, 'options.host is required');
- assert(this.options.port, 'options.port is required');
- }
- if (this.options.needHeartbeat) {
- assert(this.heartBeatPacket, 'heartBeatPacket getter must be implemented if needHeartbeat');
- }
- this.clientId = ++seed;
- this._heartbeatTimer = null;
- this._socket = null;
- this._header = null;
- this._bodyLength = null;
- this._lastError = null;
- this._queue = [];
- this._invokes = new Map();
- this[addressKey] = this.options.host + ':' + this.options.port;
- this._lastHeartbeatTime = 0;
- this._lastReceiveDataTime = 0;
- this._connect();
- this.ready(err => {
- if (!err && this.options.needHeartbeat) {
- this._startHeartbeat();
- }
- });
- }
- /**
- * get packet header
- *
- * @return {Buffer} header
- */
- getHeader() {
- return this.read(this.options.headerLength);
- }
- /* eslint-disable valid-jsdoc, no-unused-vars */
- /**
- * get body length from header
- *
- * @param {Buffer} header - header data
- * @return {Number} bodyLength
- */
- getBodyLength(header) {
- throw new Error('not implement');
- }
- /**
- * return a heartbeat packet
- *
- * @property {Buffer} TCPBase#heartBeatPacket
- */
- get heartBeatPacket() {
- throw new Error('not implement');
- }
- /**
- * send heartbeat packet
- *
- * @return {void}
- */
- sendHeartBeat() {
- this._socket.write(this.heartBeatPacket);
- }
- /**
- * deserialze method, leave it to implement by subclass
- *
- * @param {Buffer} buf - binary data
- * @return {Object} packet object
- */
- decode(buf) {
- throw new Error('not implement');
- }
- /* eslint-enable valid-jsdoc, no-unused-vars */
- /**
- * if the connection is writable, also including flow control logic
- *
- * @property {Boolean} TCPBase#_writable
- */
- get _writable() {
- if (this.options.concurrent && this._invokes.size >= this.options.concurrent) {
- return false;
- }
- return this.isOK;
- }
- /**
- * if the connection is healthy or not
- *
- * @property {Boolean} TCPBase#isOK
- */
- get isOK() {
- return this._socket && this._socket.writable;
- }
- /**
- * remote address
- *
- * @property {String} TCPBase#address
- */
- get address() {
- return this[addressKey];
- }
- /**
- * logger
- *
- * @property {Logger} TCPBase#logger
- */
- get logger() {
- return this.options.logger;
- }
- /**
- * Pulls some data out of the socket buffer and returns it.
- * If no data available to be read, null is returned
- *
- * @param {Number} n - to specify how much data to read
- * @return {Buffer} - data
- */
- read(n) {
- return this._socket.read(n);
- }
- /**
- * send packet to server
- *
- * @param {Object} packet
- * - {Number} id - packet id
- * - {Buffer} data - binary data
- * - {Boolean} [oneway] - oneway or not
- * - {Number} [timeout] - the maximum time for waiting a response
- * @param {Function} [callback] - Call this function,when processing is complete, optional.
- * @return {void}
- */
- send(packet, callback = noop) {
- if (!this._socket) {
- const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`);
- err.id = packet.id;
- err.data = packet.data.toString('base64');
- if (packet.oneway) {
- err.oneway = true;
- callback();
- this.emit('error', err);
- } else {
- callback(err);
- }
- return;
- }
- if (packet.oneway) {
- this._socket.write(packet.data);
- callback();
- return;
- }
- if (!this._writable) {
- this._queue.push([ packet, callback ]);
- return;
- }
- const meta = {
- id: packet.id,
- dataLength: packet.data.length,
- bufferSize1: this._socket.bufferSize,
- bufferSize2: -1,
- startTime: Date.now(),
- endTime: -1,
- };
- let endTime;
- meta.writeSuccess = this._socket.write(packet.data, () => {
- endTime = Date.now();
- });
- const timeout = packet.timeout || this.options.responseTimeout;
- this._invokes.set(packet.id, {
- meta,
- packet,
- timer: setTimeout(() => {
- if (!this._socket) {
- return;
- }
- meta.bufferSize2 = this._socket.bufferSize;
- meta.endTime = endTime;
- this._finishInvoke(packet.id);
- const err = new Error(`Server no response in ${timeout}ms, address#${this[addressKey]}`);
- err.socketMeta = meta;
- err.name = 'ResponseTimeoutError';
- callback(err);
- }, timeout),
- callback,
- });
- }
- /**
- * thunk style api of send(packet, callback)
- *
- * @param {Object} packet
- * - {Number} id - packet id
- * - {Buffer} data - binary data
- * - {Boolean} [oneway] - oneway or not
- * - {Number} [timeout] - the maximum time for waiting a response
- * @return {Function} thunk function
- */
- sendThunk(packet) {
- return callback => this.send(packet, callback);
- }
- _finishInvoke(id) {
- this._invokes.delete(id);
- if (this._writable) {
- this._resume();
- }
- }
- _errorCallback(callback, err) {
- if (!err) {
- err = new Error(`The socket was closed. (address: ${this[addressKey]})`);
- err.name = 'SocketCloseError';
- }
- callback && callback(err);
- }
- // mark all invokes timeout
- _cleanInvokes(err) {
- for (const id of this._invokes.keys()) {
- const req = this._invokes.get(id);
- clearTimeout(req.timer);
- this._errorCallback(req.callback, err);
- }
- this._invokes.clear();
- }
- // clean up the queue
- _cleanQueue(err) {
- let args = this._queue.pop();
- while (args) {
- // args[0] 是packet, args[1]是callback
- this._errorCallback(args[1], err);
- args = this._queue.pop();
- }
- }
- _resume() {
- const args = this._queue.shift();
- if (args) {
- this.send(args[0], args[1]);
- }
- }
- // read data from socket,and decode it to packet object
- _readPacket() {
- if (is.nullOrUndefined(this._bodyLength)) {
- this._header = this.getHeader();
- if (!this._header) {
- return false;
- }
- this._bodyLength = this.getBodyLength(this._header);
- }
- let body;
- if (this._bodyLength > 0) {
- body = this.read(this._bodyLength);
- if (!body) {
- return false;
- }
- }
- this._bodyLength = null;
- const entity = this.decode(body, this._header);
- // the schema of entity
- // {
- // id: 'request id',
- // isResponse: true,
- // data: {} // deserialized object
- // }
- let type = 'request';
- if (!entity.hasOwnProperty('isResponse')) {
- entity.isResponse = this._invokes.has(entity.id);
- }
- if (entity.isResponse) {
- type = 'response';
- const invoke = this._invokes.get(entity.id);
- if (invoke) {
- this._finishInvoke(entity.id);
- clearTimeout(invoke.timer);
- process.nextTick(() => {
- invoke.callback(entity.error, entity.data);
- });
- }
- }
- if (entity.data) {
- process.nextTick(() => {
- this.emit(type, entity, this[addressKey]);
- });
- }
- return true;
- }
- /**
- * close the socket
- *
- * @param {Error} err - the error which makes socket closed
- * @return {void}
- */
- close(err) {
- if (!this._socket) {
- return Promise.resolve();
- }
- this._socket.destroy(err);
- return this.await('close');
- }
- _handleClose() {
- if (!this._socket) {
- return;
- }
- this._socket.removeAllListeners();
- this._socket = null;
- this._cleanInvokes(this._lastError);
- // clean timer
- if (this._heartbeatTimer) {
- clearInterval(this._heartbeatTimer);
- this._heartbeatTimer = null;
- }
- this._cleanQueue(this._lastError);
- this.emit('close');
- }
- _handleReadable() {
- this._lastReceiveDataTime = Date.now();
- try {
- let remaining = false;
- do {
- remaining = this._readPacket();
- } while (remaining);
- } catch (err) {
- this.close(err);
- }
- }
- _connect(done) {
- if (!done) {
- done = err => {
- this.ready(err ? err : true);
- };
- }
- const { port, host, localAddress, localPort, family, hints, lookup, path } = this.options;
- const socket = this._socket = net.connect({
- port, host, localAddress, localPort, family, hints, lookup, path,
- });
- socket.setNoDelay(this.options.noDelay);
- socket.on('readable', () => { this._handleReadable(); });
- socket.once('close', () => { this._handleClose(); });
- socket.on('error', err => {
- err.message += ' (address: ' + this[addressKey] + ')';
- this._lastError = err;
- if (err.code === 'ECONNRESET') {
- this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer');
- } else {
- this.emit('error', err);
- }
- });
- socket.setTimeout(this.options.connectTimeout, () => {
- const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`);
- err.name = 'TcpConnectionTimeoutError';
- err.host = this.options.host;
- err.port = this.options.port;
- this.close(err);
- });
- socket.once('connect', () => {
- // set timeout back to zero after connected
- socket.setTimeout(0);
- this.emit('connect');
- });
- Promise.race([
- this.await('connect'),
- this.await('error'),
- ]).then(done, done);
- }
- _startHeartbeat() {
- this._heartbeatTimer = setInterval(() => {
- const duration = this._lastHeartbeatTime - this._lastReceiveDataTime;
- if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) {
- const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`);
- err.name = 'ServerNoResponseError';
- this.close(err);
- return;
- }
- // flow control
- if (this._invokes.size > 0 || !this.isOK) {
- return;
- }
- this._lastHeartbeatTime = Date.now();
- this.sendHeartBeat();
- }, this.options.heartbeatInterval);
- }
- }
- module.exports = TCPBase;
|