'use strict'; const debug = require('debug')('cluster-client#follower'); const is = require('is-type-of'); const Base = require('tcp-base'); const Packet = require('./protocol/packet'); const Request = require('./protocol/request'); const Response = require('./protocol/response'); class Follower extends Base { /** * "Fake" Client, forward request to leader * * @param {Object} options * - {Number} port - the port * - {Map} descriptors - interface descriptors * - {Transcode} transcode - serialze / deserialze methods * - {Number} responseTimeout - the timeout * @constructor */ constructor(options) { // local address options.host = '127.0.0.1'; super(options); this._publishMethodName = this._findMethodName('publish'); this._subInfo = new Set(); this._subData = new Map(); this._transcode = options.transcode; this._closeByUser = false; this.on('request', req => this._handleRequest(req)); // avoid warning message this.setMaxListeners(100); } get isLeader() { return false; } get logger() { return this.options.logger; } get heartBeatPacket() { const heartbeat = new Request({ connObj: { type: 'heartbeat', }, timeout: this.options.responseTimeout, }); return heartbeat.encode(); } getHeader() { return this.read(24); } getBodyLength(header) { return header.readInt32BE(16) + header.readInt32BE(20); } close(err) { this._closeByUser = true; return super.close(err); } decode(body, header) { const buf = Buffer.concat([ header, body ]); const packet = Packet.decode(buf); const connObj = packet.connObj; if (connObj && connObj.type === 'invoke_result') { let data; if (packet.data) { data = this.options.transcode.decode(packet.data); } if (connObj.success) { return { id: packet.id, isResponse: packet.isResponse, data, }; } const error = new Error(data.message); Object.assign(error, data); return { id: packet.id, isResponse: packet.isResponse, error, }; } return { id: packet.id, isResponse: packet.isResponse, connObj: packet.connObj, data: packet.data, }; } send(...args) { // just ignore after close if (this._closeByUser) { return; } return super.send(...args); } formatKey(reg) { return '$$inner$$__' + this.options.formatKey(reg); } subscribe(reg, listener) { const key = this.formatKey(reg); this.on(key, listener); // no need duplicate subscribe if (!this._subInfo.has(key)) { debug('[Follower:%s] subscribe %j for first time', this.options.name, reg); const req = new Request({ connObj: { type: 'subscribe', key, reg }, timeout: this.options.responseTimeout, }); // send subscription this.send({ id: req.id, oneway: true, data: req.encode(), }); this._subInfo.add(key); } else if (this._subData.has(key)) { debug('[Follower:%s] subscribe %j', this.options.name, reg); process.nextTick(() => { listener(this._subData.get(key)); }); } return this; } unSubscribe(reg, listener) { const key = this.formatKey(reg); if (listener) { this.removeListener(key, listener); } else { this.removeAllListeners(key); } if (this.listeners(key).length === 0) { debug('[Follower:%s] no more subscriber for %j, send unSubscribe req to leader', this.options.name, reg); this._subInfo.delete(key); const req = new Request({ connObj: { type: 'unSubscribe', key, reg }, timeout: this.options.responseTimeout, }); // send subscription this.send({ id: req.id, oneway: true, data: req.encode(), }); } } publish(reg) { this.invoke(this._publishMethodName, [ reg ]); return this; } invoke(method, args, callback) { const oneway = !is.function(callback); // if no callback, means oneway const argLength = args.length; let data; // data: // +-----+---------------+-----+---------------+ // | len | arg1 body | len | arg2 body | ... // +-----+---------------+-----+---------------+ if (argLength > 0) { let argsBufLength = 0; const arr = []; for (const arg of args) { const argBuf = this._transcode.encode(arg); const len = argBuf.length; const buf = Buffer.alloc(4 + len); buf.writeInt32BE(len, 0); argBuf.copy(buf, 4, 0, len); arr.push(buf); argsBufLength += (len + 4); } data = Buffer.concat(arr, argsBufLength); } const req = new Request({ connObj: { type: 'invoke', method, argLength, oneway, }, data, timeout: this.options.responseTimeout, }); // send invoke request this.send({ id: req.id, oneway, data: req.encode(), }, callback); } _registerChannel() { const req = new Request({ connObj: { type: 'register_channel', channelName: this.options.name, }, timeout: this.options.responseTimeout, }); // send invoke request this.send({ id: req.id, oneway: false, data: req.encode(), }, (err, data) => { if (err) { // if socket alive, do retry if (this._socket) { err.message = `register to channel: ${this.options.name} failed, will retry after 3s, ${err.message}`; this.logger.warn(err); // if exception, retry after 3s setTimeout(() => this._registerChannel(), 3000); } else { this.ready(err); } return; } const res = this._transcode.decode(data); if (res.success) { debug('[Follower:%s] register to channel: %s success', this.options.name, this.options.name); this.ready(true); } else { const error = new Error(res.error.message); Object.assign(error, res.error); this.ready(error); } }); } _findMethodName(type) { for (const method of this.options.descriptors.keys()) { const descriptor = this.options.descriptors.get(method); if (descriptor.type === 'delegate' && descriptor.to === type) { return method; } } return null; } _handleRequest(req) { debug('[Follower:%s] receive req: %j from leader', this.options.name, req); const connObj = req.connObj || {}; if (connObj.type === 'subscribe_result') { const result = this._transcode.decode(req.data); this.emit(connObj.key, result); this._subData.set(connObj.key, result); // feedback const res = new Response({ id: req.id, timeout: req.timeout, connObj: { type: 'subscribe_result_res' }, }); this.send({ id: req.id, oneway: true, data: res.encode(), }); } } _connect(done) { if (!done) { done = err => { if (err) { this.ready(err); } else { // register to proper channel, difference type of client into difference channel this._registerChannel(); } }; } super._connect(done); } } module.exports = Follower;