123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- 'use strict';
- const is = require('is-type-of');
- const Base = require('sdk-base');
- const Packet = require('./protocol/packet');
- const Response = require('./protocol/response');
- class Connection extends Base {
- /**
- * Socket Connection among Leader and Follower
- *
- * @param {Object} options
- * - {Socket} socket - the socket instance
- * - {Number} responseTimeout - the response timeout
- * - {Transcode} transcode - serialze / deserialze methods
- * @constructor
- */
- constructor(options) {
- super(options);
- this._socket = options.socket;
- this._invokes = new Map();
- this.key = this._socket.remotePort;
- this._lastActiveTime = Date.now();
- this._transcode = options.transcode;
- this._lastError = null;
- // listen socket events
- this._socket.on('readable', () => { this._handleReadable(); });
- this._socket.on('error', err => { this._handleSocketError(err); });
- this._socket.on('close', () => { this._handleClose(); });
- // try read data from buffer at first
- this._handleReadable();
- }
- get isOk() {
- return this._socket && this._socket.writable;
- }
- get logger() {
- return this.options.logger;
- }
- get lastActiveTime() {
- return this._lastActiveTime;
- }
- set lastActiveTime(val) {
- this._lastActiveTime = val;
- }
- /**
- * send packet
- *
- * @param {Packet} packet - the packet
- * @param {Function} [callback] - callback function
- * @return {void}
- */
- send(packet, callback) {
- this._write(packet.encode());
- if (!packet.isResponse) {
- const id = packet.id;
- const timeout = packet.timeout;
- this._invokes.set(id, {
- id,
- timer: setTimeout(() => {
- const err = new Error(`[ClusterClient] no response in ${timeout}ms, remotePort#${this.key}`);
- err.name = 'ClusterConnectionResponseTimeoutError';
- callback(err, timeout);
- this._invokes.delete(id);
- }, timeout),
- callback,
- });
- }
- }
- close(err) {
- if (!this._socket) {
- return Promise.resolve();
- }
- this._socket.destroy(err);
- return this.await('close');
- }
- _handleReadable() {
- try {
- let remaining = false;
- do {
- remaining = this._readPacket();
- }
- while (remaining);
- } catch (err) {
- this.close(err);
- }
- }
- _handleSocketError(err) {
- this._lastError = err;
- if (err.code === 'ECONNRESET') {
- this.logger.warn('[ClusterClient:Connection] socket is closed by other side while there were still unhandled data in the socket buffer');
- } else {
- this.emit('error', err);
- }
- }
- _handleClose() {
- this._cleanInvokes(this._lastError);
- this.emit('close');
- }
- _cleanInvokes(err) {
- if (!err) {
- err = new Error('The socket was closed.');
- err.name = 'ClusterSocketCloseError';
- }
- for (const req of this._invokes.values()) {
- clearTimeout(req.timer);
- req.callback(err);
- }
- this._invokes.clear();
- }
- _read(n) {
- return this._socket.read(n);
- }
- _write(bytes) {
- if (!this.isOk) {
- return false;
- }
- return this._socket.write(bytes);
- }
- _getHeader() {
- return this._read(24);
- }
- _getBodyLength(header) {
- return header.readInt32BE(16) + header.readInt32BE(20);
- }
- _readPacket() {
- if (is.nullOrUndefined(this._bodyLength)) {
- this._header = this._getHeader();
- if (!this._header) {
- return false;
- }
- this._bodyLength = this._getBodyLength(this._header);
- }
- let body;
- // body may be emtry
- if (this._bodyLength > 0) {
- body = this._read(this._bodyLength);
- if (!body) {
- return false;
- }
- }
- this._bodyLength = null;
- const packet = Packet.decode(Buffer.concat([ this._header, body ]));
- const id = packet.id;
- if (packet.isResponse) {
- const info = this._invokes.get(id);
- if (info) {
- clearTimeout(info.timer);
- info.callback(null, packet.data);
- this._invokes.delete(id);
- }
- } else {
- process.nextTick(() => this.emit('request', packet, new Response({
- id,
- timeout: packet.timeout,
- })));
- }
- return true;
- }
- }
- module.exports = Connection;
|