'use strict'; const is = require('is-type-of'); const Base = require('sdk-base'); const Packet = require('./protocol/packet'); const Response = require('./protocol/response'); class Connection extends Base { /** * Socket Connection among Leader and Follower * * @param {Object} options * - {Socket} socket - the socket instance * - {Number} responseTimeout - the response timeout * - {Transcode} transcode - serialze / deserialze methods * @constructor */ constructor(options) { super(options); this._socket = options.socket; this._invokes = new Map(); this.key = this._socket.remotePort; this._lastActiveTime = Date.now(); this._transcode = options.transcode; this._lastError = null; // listen socket events this._socket.on('readable', () => { this._handleReadable(); }); this._socket.on('error', err => { this._handleSocketError(err); }); this._socket.on('close', () => { this._handleClose(); }); // try read data from buffer at first this._handleReadable(); } get isOk() { return this._socket && this._socket.writable; } get logger() { return this.options.logger; } get lastActiveTime() { return this._lastActiveTime; } set lastActiveTime(val) { this._lastActiveTime = val; } /** * send packet * * @param {Packet} packet - the packet * @param {Function} [callback] - callback function * @return {void} */ send(packet, callback) { this._write(packet.encode()); if (!packet.isResponse) { const id = packet.id; const timeout = packet.timeout; this._invokes.set(id, { id, timer: setTimeout(() => { const err = new Error(`[ClusterClient] no response in ${timeout}ms, remotePort#${this.key}`); err.name = 'ClusterConnectionResponseTimeoutError'; callback(err, timeout); this._invokes.delete(id); }, timeout), callback, }); } } close(err) { if (!this._socket) { return Promise.resolve(); } this._socket.destroy(err); return this.await('close'); } _handleReadable() { try { let remaining = false; do { remaining = this._readPacket(); } while (remaining); } catch (err) { this.close(err); } } _handleSocketError(err) { this._lastError = err; if (err.code === 'ECONNRESET') { this.logger.warn('[ClusterClient:Connection] socket is closed by other side while there were still unhandled data in the socket buffer'); } else { this.emit('error', err); } } _handleClose() { this._cleanInvokes(this._lastError); this.emit('close'); } _cleanInvokes(err) { if (!err) { err = new Error('The socket was closed.'); err.name = 'ClusterSocketCloseError'; } for (const req of this._invokes.values()) { clearTimeout(req.timer); req.callback(err); } this._invokes.clear(); } _read(n) { return this._socket.read(n); } _write(bytes) { if (!this.isOk) { return false; } return this._socket.write(bytes); } _getHeader() { return this._read(24); } _getBodyLength(header) { return header.readInt32BE(16) + header.readInt32BE(20); } _readPacket() { if (is.nullOrUndefined(this._bodyLength)) { this._header = this._getHeader(); if (!this._header) { return false; } this._bodyLength = this._getBodyLength(this._header); } let body; // body may be emtry if (this._bodyLength > 0) { body = this._read(this._bodyLength); if (!body) { return false; } } this._bodyLength = null; const packet = Packet.decode(Buffer.concat([ this._header, body ])); const id = packet.id; if (packet.isResponse) { const info = this._invokes.get(id); if (info) { clearTimeout(info.timer); info.callback(null, packet.data); this._invokes.delete(id); } } else { process.nextTick(() => this.emit('request', packet, new Response({ id, timeout: packet.timeout, }))); } return true; } } module.exports = Connection;