cluster.js 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. 'use strict';
  2. const debug = require('debug')('cluster-client');
  3. const Base = require('./base');
  4. const Leader = require('../leader');
  5. const Follower = require('../follower');
  6. const ClusterServer = require('../server');
  7. // Symbol
  8. const {
  9. init,
  10. logger,
  11. isReady,
  12. innerClient,
  13. createClient,
  14. closeHandler,
  15. } = require('../symbol');
  16. class ClusterClient extends Base {
  17. constructor(options) {
  18. super(options);
  19. this[closeHandler] = () => {
  20. this[logger].warn('[ClusterClient:%s] %s closed, and try to init it again', this.options.name, this[innerClient].isLeader ? 'leader' : 'follower');
  21. this[isReady] = false;
  22. this.ready(false);
  23. this[init]().catch(err => { this.ready(err); });
  24. };
  25. }
  26. async [createClient]() {
  27. const name = this.options.name;
  28. const port = this.options.port;
  29. let server;
  30. if (this.options.isLeader === true) {
  31. server = await ClusterServer.create(name, port);
  32. if (!server) {
  33. throw new Error(`create "${name}" leader failed, the port:${port} is occupied by other`);
  34. }
  35. } else if (this.options.isLeader === false) {
  36. // wait for leader active
  37. await ClusterServer.waitFor(port, this.options.maxWaitTime);
  38. } else {
  39. debug('[ClusterClient:%s] init cluster client, try to seize the leader on port:%d', name, port);
  40. server = await ClusterServer.create(name, port);
  41. }
  42. if (server) {
  43. debug('[ClusterClient:%s] has seized port %d, and serves as leader client.', name, port);
  44. return new Leader(Object.assign({ server }, this.options));
  45. }
  46. debug('[ClusterClient:%s] gives up seizing port %d, and serves as follower client.', name, port);
  47. return new Follower(this.options);
  48. }
  49. }
  50. module.exports = ClusterClient;