123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291 |
- 'use strict';
- const debug = require('debug')('cluster-client#follower');
- const is = require('is-type-of');
- const Base = require('tcp-base');
- const Packet = require('./protocol/packet');
- const Request = require('./protocol/request');
- const Response = require('./protocol/response');
- class Follower extends Base {
- /**
- * "Fake" Client, forward request to leader
- *
- * @param {Object} options
- * - {Number} port - the port
- * - {Map} descriptors - interface descriptors
- * - {Transcode} transcode - serialze / deserialze methods
- * - {Number} responseTimeout - the timeout
- * @constructor
- */
- constructor(options) {
- // local address
- options.host = '127.0.0.1';
- super(options);
- this._publishMethodName = this._findMethodName('publish');
- this._subInfo = new Set();
- this._subData = new Map();
- this._transcode = options.transcode;
- this._closeByUser = false;
- this.on('request', req => this._handleRequest(req));
- // avoid warning message
- this.setMaxListeners(100);
- }
- get isLeader() {
- return false;
- }
- get logger() {
- return this.options.logger;
- }
- get heartBeatPacket() {
- const heartbeat = new Request({
- connObj: {
- type: 'heartbeat',
- },
- timeout: this.options.responseTimeout,
- });
- return heartbeat.encode();
- }
- getHeader() {
- return this.read(24);
- }
- getBodyLength(header) {
- return header.readInt32BE(16) + header.readInt32BE(20);
- }
- close(err) {
- this._closeByUser = true;
- return super.close(err);
- }
- decode(body, header) {
- const buf = Buffer.concat([ header, body ]);
- const packet = Packet.decode(buf);
- const connObj = packet.connObj;
- if (connObj && connObj.type === 'invoke_result') {
- let data;
- if (packet.data) {
- data = this.options.transcode.decode(packet.data);
- }
- if (connObj.success) {
- return {
- id: packet.id,
- isResponse: packet.isResponse,
- data,
- };
- }
- const error = new Error(data.message);
- Object.assign(error, data);
- return {
- id: packet.id,
- isResponse: packet.isResponse,
- error,
- };
- }
- return {
- id: packet.id,
- isResponse: packet.isResponse,
- connObj: packet.connObj,
- data: packet.data,
- };
- }
- send(...args) {
- // just ignore after close
- if (this._closeByUser) {
- return;
- }
- return super.send(...args);
- }
- formatKey(reg) {
- return '$$inner$$__' + this.options.formatKey(reg);
- }
- subscribe(reg, listener) {
- const key = this.formatKey(reg);
- this.on(key, listener);
- // no need duplicate subscribe
- if (!this._subInfo.has(key)) {
- debug('[Follower:%s] subscribe %j for first time', this.options.name, reg);
- const req = new Request({
- connObj: { type: 'subscribe', key, reg },
- timeout: this.options.responseTimeout,
- });
- // send subscription
- this.send({
- id: req.id,
- oneway: true,
- data: req.encode(),
- });
- this._subInfo.add(key);
- } else if (this._subData.has(key)) {
- debug('[Follower:%s] subscribe %j', this.options.name, reg);
- process.nextTick(() => {
- listener(this._subData.get(key));
- });
- }
- return this;
- }
- unSubscribe(reg, listener) {
- const key = this.formatKey(reg);
- if (listener) {
- this.removeListener(key, listener);
- } else {
- this.removeAllListeners(key);
- }
- if (this.listeners(key).length === 0) {
- debug('[Follower:%s] no more subscriber for %j, send unSubscribe req to leader', this.options.name, reg);
- this._subInfo.delete(key);
- const req = new Request({
- connObj: { type: 'unSubscribe', key, reg },
- timeout: this.options.responseTimeout,
- });
- // send subscription
- this.send({
- id: req.id,
- oneway: true,
- data: req.encode(),
- });
- }
- }
- publish(reg) {
- this.invoke(this._publishMethodName, [ reg ]);
- return this;
- }
- invoke(method, args, callback) {
- const oneway = !is.function(callback); // if no callback, means oneway
- const argLength = args.length;
- let data;
- // data:
- // +-----+---------------+-----+---------------+
- // | len | arg1 body | len | arg2 body | ...
- // +-----+---------------+-----+---------------+
- if (argLength > 0) {
- let argsBufLength = 0;
- const arr = [];
- for (const arg of args) {
- const argBuf = this._transcode.encode(arg);
- const len = argBuf.length;
- const buf = Buffer.alloc(4 + len);
- buf.writeInt32BE(len, 0);
- argBuf.copy(buf, 4, 0, len);
- arr.push(buf);
- argsBufLength += (len + 4);
- }
- data = Buffer.concat(arr, argsBufLength);
- }
- const req = new Request({
- connObj: {
- type: 'invoke',
- method,
- argLength,
- oneway,
- },
- data,
- timeout: this.options.responseTimeout,
- });
- // send invoke request
- this.send({
- id: req.id,
- oneway,
- data: req.encode(),
- }, callback);
- }
- _registerChannel() {
- const req = new Request({
- connObj: {
- type: 'register_channel',
- channelName: this.options.name,
- },
- timeout: this.options.responseTimeout,
- });
- // send invoke request
- this.send({
- id: req.id,
- oneway: false,
- data: req.encode(),
- }, (err, data) => {
- if (err) {
- // if socket alive, do retry
- if (this._socket) {
- err.message = `register to channel: ${this.options.name} failed, will retry after 3s, ${err.message}`;
- this.logger.warn(err);
- // if exception, retry after 3s
- setTimeout(() => this._registerChannel(), 3000);
- } else {
- this.ready(err);
- }
- return;
- }
- const res = this._transcode.decode(data);
- if (res.success) {
- debug('[Follower:%s] register to channel: %s success', this.options.name, this.options.name);
- this.ready(true);
- } else {
- const error = new Error(res.error.message);
- Object.assign(error, res.error);
- this.ready(error);
- }
- });
- }
- _findMethodName(type) {
- for (const method of this.options.descriptors.keys()) {
- const descriptor = this.options.descriptors.get(method);
- if (descriptor.type === 'delegate' && descriptor.to === type) {
- return method;
- }
- }
- return null;
- }
- _handleRequest(req) {
- debug('[Follower:%s] receive req: %j from leader', this.options.name, req);
- const connObj = req.connObj || {};
- if (connObj.type === 'subscribe_result') {
- const result = this._transcode.decode(req.data);
- this.emit(connObj.key, result);
- this._subData.set(connObj.key, result);
- // feedback
- const res = new Response({
- id: req.id,
- timeout: req.timeout,
- connObj: { type: 'subscribe_result_res' },
- });
- this.send({
- id: req.id,
- oneway: true,
- data: res.encode(),
- });
- }
- }
- _connect(done) {
- if (!done) {
- done = err => {
- if (err) {
- this.ready(err);
- } else {
- // register to proper channel, difference type of client into difference channel
- this._registerChannel();
- }
- };
- }
- super._connect(done);
- }
- }
- module.exports = Follower;
|