base.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. 'use strict';
  2. const net = require('net');
  3. const is = require('is-type-of');
  4. const assert = require('assert');
  5. const Base = require('sdk-base');
  6. const addressKey = Symbol('address');
  7. const defaultOptions = {
  8. noDelay: true,
  9. connectTimeout: 3000,
  10. responseTimeout: 3000,
  11. heartbeatInterval: 5000,
  12. needHeartbeat: true,
  13. concurrent: 0,
  14. logger: console,
  15. };
  16. const noop = () => {};
  17. let seed = 0;
  18. class TCPBase extends Base {
  19. /**
  20. * A base class for tcp client with basic functions
  21. *
  22. * @param {Object} options
  23. * - {String} host - server host
  24. * - {Number} port - server port
  25. * - {Number} headerLength - length of the packet header, this field is optional,
  26. * but if you not provider, you must override getHeader method
  27. * - {Boolean} [noDelay] - whether use the Nagle algorithm or not,defaults to true
  28. * - {Number} [concurrent] - the number of concurrent packet, defaults to zero, means no limit
  29. * - {Number} [responseTimeout] - limit the maximum time for waiting a response
  30. * - {Logger} [logger] - the logger client
  31. * @class
  32. */
  33. constructor(options) {
  34. super();
  35. this.options = Object.assign({}, defaultOptions, options);
  36. if (!this.options.path) {
  37. assert(this.options.host, 'options.host is required');
  38. assert(this.options.port, 'options.port is required');
  39. }
  40. if (this.options.needHeartbeat) {
  41. assert(this.heartBeatPacket, 'heartBeatPacket getter must be implemented if needHeartbeat');
  42. }
  43. this.clientId = ++seed;
  44. this._heartbeatTimer = null;
  45. this._socket = null;
  46. this._header = null;
  47. this._bodyLength = null;
  48. this._lastError = null;
  49. this._queue = [];
  50. this._invokes = new Map();
  51. this[addressKey] = this.options.host + ':' + this.options.port;
  52. this._lastHeartbeatTime = 0;
  53. this._lastReceiveDataTime = 0;
  54. this._connect();
  55. this.ready(err => {
  56. if (!err && this.options.needHeartbeat) {
  57. this._startHeartbeat();
  58. }
  59. });
  60. }
  61. /**
  62. * get packet header
  63. *
  64. * @return {Buffer} header
  65. */
  66. getHeader() {
  67. return this.read(this.options.headerLength);
  68. }
  69. /* eslint-disable valid-jsdoc, no-unused-vars */
  70. /**
  71. * get body length from header
  72. *
  73. * @param {Buffer} header - header data
  74. * @return {Number} bodyLength
  75. */
  76. getBodyLength(header) {
  77. throw new Error('not implement');
  78. }
  79. /**
  80. * return a heartbeat packet
  81. *
  82. * @property {Buffer} TCPBase#heartBeatPacket
  83. */
  84. get heartBeatPacket() {
  85. throw new Error('not implement');
  86. }
  87. /**
  88. * send heartbeat packet
  89. *
  90. * @return {void}
  91. */
  92. sendHeartBeat() {
  93. this._socket.write(this.heartBeatPacket);
  94. }
  95. /**
  96. * deserialze method, leave it to implement by subclass
  97. *
  98. * @param {Buffer} buf - binary data
  99. * @return {Object} packet object
  100. */
  101. decode(buf) {
  102. throw new Error('not implement');
  103. }
  104. /* eslint-enable valid-jsdoc, no-unused-vars */
  105. /**
  106. * if the connection is writable, also including flow control logic
  107. *
  108. * @property {Boolean} TCPBase#_writable
  109. */
  110. get _writable() {
  111. if (this.options.concurrent && this._invokes.size >= this.options.concurrent) {
  112. return false;
  113. }
  114. return this.isOK;
  115. }
  116. /**
  117. * if the connection is healthy or not
  118. *
  119. * @property {Boolean} TCPBase#isOK
  120. */
  121. get isOK() {
  122. return this._socket && this._socket.writable;
  123. }
  124. /**
  125. * remote address
  126. *
  127. * @property {String} TCPBase#address
  128. */
  129. get address() {
  130. return this[addressKey];
  131. }
  132. /**
  133. * logger
  134. *
  135. * @property {Logger} TCPBase#logger
  136. */
  137. get logger() {
  138. return this.options.logger;
  139. }
  140. /**
  141. * Pulls some data out of the socket buffer and returns it.
  142. * If no data available to be read, null is returned
  143. *
  144. * @param {Number} n - to specify how much data to read
  145. * @return {Buffer} - data
  146. */
  147. read(n) {
  148. return this._socket.read(n);
  149. }
  150. /**
  151. * send packet to server
  152. *
  153. * @param {Object} packet
  154. * - {Number} id - packet id
  155. * - {Buffer} data - binary data
  156. * - {Boolean} [oneway] - oneway or not
  157. * - {Number} [timeout] - the maximum time for waiting a response
  158. * @param {Function} [callback] - Call this function,when processing is complete, optional.
  159. * @return {void}
  160. */
  161. send(packet, callback = noop) {
  162. if (!this._socket) {
  163. const err = new Error(`[TCPBase] The socket was closed. (address: ${this[addressKey]})`);
  164. err.id = packet.id;
  165. err.data = packet.data.toString('base64');
  166. if (packet.oneway) {
  167. err.oneway = true;
  168. callback();
  169. this.emit('error', err);
  170. } else {
  171. callback(err);
  172. }
  173. return;
  174. }
  175. if (packet.oneway) {
  176. this._socket.write(packet.data);
  177. callback();
  178. return;
  179. }
  180. if (!this._writable) {
  181. this._queue.push([ packet, callback ]);
  182. return;
  183. }
  184. const meta = {
  185. id: packet.id,
  186. dataLength: packet.data.length,
  187. bufferSize1: this._socket.bufferSize,
  188. bufferSize2: -1,
  189. startTime: Date.now(),
  190. endTime: -1,
  191. };
  192. let endTime;
  193. meta.writeSuccess = this._socket.write(packet.data, () => {
  194. endTime = Date.now();
  195. });
  196. const timeout = packet.timeout || this.options.responseTimeout;
  197. this._invokes.set(packet.id, {
  198. meta,
  199. packet,
  200. timer: setTimeout(() => {
  201. if (!this._socket) {
  202. return;
  203. }
  204. meta.bufferSize2 = this._socket.bufferSize;
  205. meta.endTime = endTime;
  206. this._finishInvoke(packet.id);
  207. const err = new Error(`Server no response in ${timeout}ms, address#${this[addressKey]}`);
  208. err.socketMeta = meta;
  209. err.name = 'ResponseTimeoutError';
  210. callback(err);
  211. }, timeout),
  212. callback,
  213. });
  214. }
  215. /**
  216. * thunk style api of send(packet, callback)
  217. *
  218. * @param {Object} packet
  219. * - {Number} id - packet id
  220. * - {Buffer} data - binary data
  221. * - {Boolean} [oneway] - oneway or not
  222. * - {Number} [timeout] - the maximum time for waiting a response
  223. * @return {Function} thunk function
  224. */
  225. sendThunk(packet) {
  226. return callback => this.send(packet, callback);
  227. }
  228. _finishInvoke(id) {
  229. this._invokes.delete(id);
  230. if (this._writable) {
  231. this._resume();
  232. }
  233. }
  234. _errorCallback(callback, err) {
  235. if (!err) {
  236. err = new Error(`The socket was closed. (address: ${this[addressKey]})`);
  237. err.name = 'SocketCloseError';
  238. }
  239. callback && callback(err);
  240. }
  241. // mark all invokes timeout
  242. _cleanInvokes(err) {
  243. for (const id of this._invokes.keys()) {
  244. const req = this._invokes.get(id);
  245. clearTimeout(req.timer);
  246. this._errorCallback(req.callback, err);
  247. }
  248. this._invokes.clear();
  249. }
  250. // clean up the queue
  251. _cleanQueue(err) {
  252. let args = this._queue.pop();
  253. while (args) {
  254. // args[0] 是packet, args[1]是callback
  255. this._errorCallback(args[1], err);
  256. args = this._queue.pop();
  257. }
  258. }
  259. _resume() {
  260. const args = this._queue.shift();
  261. if (args) {
  262. this.send(args[0], args[1]);
  263. }
  264. }
  265. // read data from socket,and decode it to packet object
  266. _readPacket() {
  267. if (is.nullOrUndefined(this._bodyLength)) {
  268. this._header = this.getHeader();
  269. if (!this._header) {
  270. return false;
  271. }
  272. this._bodyLength = this.getBodyLength(this._header);
  273. }
  274. let body;
  275. if (this._bodyLength > 0) {
  276. body = this.read(this._bodyLength);
  277. if (!body) {
  278. return false;
  279. }
  280. }
  281. this._bodyLength = null;
  282. const entity = this.decode(body, this._header);
  283. // the schema of entity
  284. // {
  285. // id: 'request id',
  286. // isResponse: true,
  287. // data: {} // deserialized object
  288. // }
  289. let type = 'request';
  290. if (!entity.hasOwnProperty('isResponse')) {
  291. entity.isResponse = this._invokes.has(entity.id);
  292. }
  293. if (entity.isResponse) {
  294. type = 'response';
  295. const invoke = this._invokes.get(entity.id);
  296. if (invoke) {
  297. this._finishInvoke(entity.id);
  298. clearTimeout(invoke.timer);
  299. process.nextTick(() => {
  300. invoke.callback(entity.error, entity.data);
  301. });
  302. }
  303. }
  304. if (entity.data) {
  305. process.nextTick(() => {
  306. this.emit(type, entity, this[addressKey]);
  307. });
  308. }
  309. return true;
  310. }
  311. /**
  312. * close the socket
  313. *
  314. * @param {Error} err - the error which makes socket closed
  315. * @return {void}
  316. */
  317. close(err) {
  318. if (!this._socket) {
  319. return Promise.resolve();
  320. }
  321. this._socket.destroy(err);
  322. return this.await('close');
  323. }
  324. _handleClose() {
  325. if (!this._socket) {
  326. return;
  327. }
  328. this._socket.removeAllListeners();
  329. this._socket = null;
  330. this._cleanInvokes(this._lastError);
  331. // clean timer
  332. if (this._heartbeatTimer) {
  333. clearInterval(this._heartbeatTimer);
  334. this._heartbeatTimer = null;
  335. }
  336. this._cleanQueue(this._lastError);
  337. this.emit('close');
  338. }
  339. _handleReadable() {
  340. this._lastReceiveDataTime = Date.now();
  341. try {
  342. let remaining = false;
  343. do {
  344. remaining = this._readPacket();
  345. } while (remaining);
  346. } catch (err) {
  347. this.close(err);
  348. }
  349. }
  350. _connect(done) {
  351. if (!done) {
  352. done = err => {
  353. this.ready(err ? err : true);
  354. };
  355. }
  356. const { port, host, localAddress, localPort, family, hints, lookup, path } = this.options;
  357. const socket = this._socket = net.connect({
  358. port, host, localAddress, localPort, family, hints, lookup, path,
  359. });
  360. socket.setNoDelay(this.options.noDelay);
  361. socket.on('readable', () => { this._handleReadable(); });
  362. socket.once('close', () => { this._handleClose(); });
  363. socket.on('error', err => {
  364. err.message += ' (address: ' + this[addressKey] + ')';
  365. this._lastError = err;
  366. if (err.code === 'ECONNRESET') {
  367. this.logger.warn('[TCPBase] socket is closed by other side while there were still unhandled data in the socket buffer');
  368. } else {
  369. this.emit('error', err);
  370. }
  371. });
  372. socket.setTimeout(this.options.connectTimeout, () => {
  373. const err = new Error(`[TCPBase] socket connect timeout (${this.options.connectTimeout}ms)`);
  374. err.name = 'TcpConnectionTimeoutError';
  375. err.host = this.options.host;
  376. err.port = this.options.port;
  377. this.close(err);
  378. });
  379. socket.once('connect', () => {
  380. // set timeout back to zero after connected
  381. socket.setTimeout(0);
  382. this.emit('connect');
  383. });
  384. Promise.race([
  385. this.await('connect'),
  386. this.await('error'),
  387. ]).then(done, done);
  388. }
  389. _startHeartbeat() {
  390. this._heartbeatTimer = setInterval(() => {
  391. const duration = this._lastHeartbeatTime - this._lastReceiveDataTime;
  392. if (this._lastReceiveDataTime && duration > this.options.heartbeatInterval) {
  393. const err = new Error(`server ${this[addressKey]} no response in ${duration}ms, maybe the socket is end on the other side.`);
  394. err.name = 'ServerNoResponseError';
  395. this.close(err);
  396. return;
  397. }
  398. // flow control
  399. if (this._invokes.size > 0 || !this.isOK) {
  400. return;
  401. }
  402. this._lastHeartbeatTime = Date.now();
  403. this.sendHeartBeat();
  404. }, this.options.heartbeatInterval);
  405. }
  406. }
  407. module.exports = TCPBase;