connection.js 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. 'use strict';
  2. const is = require('is-type-of');
  3. const Base = require('sdk-base');
  4. const Packet = require('./protocol/packet');
  5. const Response = require('./protocol/response');
  6. class Connection extends Base {
  7. /**
  8. * Socket Connection among Leader and Follower
  9. *
  10. * @param {Object} options
  11. * - {Socket} socket - the socket instance
  12. * - {Number} responseTimeout - the response timeout
  13. * - {Transcode} transcode - serialze / deserialze methods
  14. * @constructor
  15. */
  16. constructor(options) {
  17. super(options);
  18. this._socket = options.socket;
  19. this._invokes = new Map();
  20. this.key = this._socket.remotePort;
  21. this._lastActiveTime = Date.now();
  22. this._transcode = options.transcode;
  23. this._lastError = null;
  24. // listen socket events
  25. this._socket.on('readable', () => { this._handleReadable(); });
  26. this._socket.on('error', err => { this._handleSocketError(err); });
  27. this._socket.on('close', () => { this._handleClose(); });
  28. // try read data from buffer at first
  29. this._handleReadable();
  30. }
  31. get isOk() {
  32. return this._socket && this._socket.writable;
  33. }
  34. get logger() {
  35. return this.options.logger;
  36. }
  37. get lastActiveTime() {
  38. return this._lastActiveTime;
  39. }
  40. set lastActiveTime(val) {
  41. this._lastActiveTime = val;
  42. }
  43. /**
  44. * send packet
  45. *
  46. * @param {Packet} packet - the packet
  47. * @param {Function} [callback] - callback function
  48. * @return {void}
  49. */
  50. send(packet, callback) {
  51. this._write(packet.encode());
  52. if (!packet.isResponse) {
  53. const id = packet.id;
  54. const timeout = packet.timeout;
  55. this._invokes.set(id, {
  56. id,
  57. timer: setTimeout(() => {
  58. const err = new Error(`[ClusterClient] no response in ${timeout}ms, remotePort#${this.key}`);
  59. err.name = 'ClusterConnectionResponseTimeoutError';
  60. callback(err, timeout);
  61. this._invokes.delete(id);
  62. }, timeout),
  63. callback,
  64. });
  65. }
  66. }
  67. close(err) {
  68. if (!this._socket) {
  69. return Promise.resolve();
  70. }
  71. this._socket.destroy(err);
  72. return this.await('close');
  73. }
  74. _handleReadable() {
  75. try {
  76. let remaining = false;
  77. do {
  78. remaining = this._readPacket();
  79. }
  80. while (remaining);
  81. } catch (err) {
  82. this.close(err);
  83. }
  84. }
  85. _handleSocketError(err) {
  86. this._lastError = err;
  87. if (err.code === 'ECONNRESET') {
  88. this.logger.warn('[ClusterClient:Connection] socket is closed by other side while there were still unhandled data in the socket buffer');
  89. } else {
  90. this.emit('error', err);
  91. }
  92. }
  93. _handleClose() {
  94. this._cleanInvokes(this._lastError);
  95. this.emit('close');
  96. }
  97. _cleanInvokes(err) {
  98. if (!err) {
  99. err = new Error('The socket was closed.');
  100. err.name = 'ClusterSocketCloseError';
  101. }
  102. for (const req of this._invokes.values()) {
  103. clearTimeout(req.timer);
  104. req.callback(err);
  105. }
  106. this._invokes.clear();
  107. }
  108. _read(n) {
  109. return this._socket.read(n);
  110. }
  111. _write(bytes) {
  112. if (!this.isOk) {
  113. return false;
  114. }
  115. return this._socket.write(bytes);
  116. }
  117. _getHeader() {
  118. return this._read(24);
  119. }
  120. _getBodyLength(header) {
  121. return header.readInt32BE(16) + header.readInt32BE(20);
  122. }
  123. _readPacket() {
  124. if (is.nullOrUndefined(this._bodyLength)) {
  125. this._header = this._getHeader();
  126. if (!this._header) {
  127. return false;
  128. }
  129. this._bodyLength = this._getBodyLength(this._header);
  130. }
  131. let body;
  132. // body may be emtry
  133. if (this._bodyLength > 0) {
  134. body = this._read(this._bodyLength);
  135. if (!body) {
  136. return false;
  137. }
  138. }
  139. this._bodyLength = null;
  140. const packet = Packet.decode(Buffer.concat([ this._header, body ]));
  141. const id = packet.id;
  142. if (packet.isResponse) {
  143. const info = this._invokes.get(id);
  144. if (info) {
  145. clearTimeout(info.timer);
  146. info.callback(null, packet.data);
  147. this._invokes.delete(id);
  148. }
  149. } else {
  150. process.nextTick(() => this.emit('request', packet, new Response({
  151. id,
  152. timeout: packet.timeout,
  153. })));
  154. }
  155. return true;
  156. }
  157. }
  158. module.exports = Connection;