local.js 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. 'use strict';
  2. const debug = require('debug')('egg:util:messenger:local');
  3. const is = require('is-type-of');
  4. const EventEmitter = require('events');
  5. /**
  6. * Communication between app worker and agent worker with EventEmitter
  7. */
  8. class Messenger extends EventEmitter {
  9. constructor(egg) {
  10. super();
  11. this.egg = egg;
  12. }
  13. /**
  14. * Send message to all agent and app
  15. * @param {String} action - message key
  16. * @param {Object} data - message value
  17. * @return {Messenger} this
  18. */
  19. broadcast(action, data) {
  20. debug('[%s] broadcast %s with %j', this.pid, action, data);
  21. this.send(action, data, 'both');
  22. return this;
  23. }
  24. /**
  25. * send message to the specified process
  26. * Notice: in single process mode, it only can send to self process,
  27. * and it will send to both agent and app's messengers.
  28. * @param {String} pid - the process id of the receiver
  29. * @param {String} action - message key
  30. * @param {Object} data - message value
  31. * @return {Messenger} this
  32. */
  33. sendTo(pid, action, data) {
  34. debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
  35. if (pid !== process.pid) return this;
  36. this.send(action, data, 'both');
  37. return this;
  38. }
  39. /**
  40. * send message to one worker by random
  41. * Notice: in single process mode, we only start one agent worker and one app worker
  42. * - if it's running in agent, it will send to one of app workers
  43. * - if it's running in app, it will send to agent
  44. * @param {String} action - message key
  45. * @param {Object} data - message value
  46. * @return {Messenger} this
  47. */
  48. sendRandom(action, data) {
  49. debug('[%s] send %s with %j to opposite', this.pid, action, data);
  50. this.send(action, data, 'opposite');
  51. return this;
  52. }
  53. /**
  54. * send message to app
  55. * @param {String} action - message key
  56. * @param {Object} data - message value
  57. * @return {Messenger} this
  58. */
  59. sendToApp(action, data) {
  60. debug('[%s] send %s with %j to all app', this.pid, action, data);
  61. this.send(action, data, 'application');
  62. return this;
  63. }
  64. /**
  65. * send message to agent
  66. * @param {String} action - message key
  67. * @param {Object} data - message value
  68. * @return {Messenger} this
  69. */
  70. sendToAgent(action, data) {
  71. debug('[%s] send %s with %j to all agent', this.pid, action, data);
  72. this.send(action, data, 'agent');
  73. return this;
  74. }
  75. /**
  76. * @param {String} action - message key
  77. * @param {Object} data - message value
  78. * @param {String} to - let master know how to send message
  79. * @return {Messenger} this
  80. */
  81. send(action, data, to) {
  82. // use nextTick to keep it async as IPC messenger
  83. process.nextTick(() => {
  84. const { egg } = this;
  85. let application;
  86. let agent;
  87. let opposite;
  88. if (egg.type === 'application') {
  89. application = egg;
  90. agent = egg.agent;
  91. opposite = agent;
  92. } else {
  93. agent = egg;
  94. application = egg.application;
  95. opposite = application;
  96. }
  97. if (!to) to = egg.type === 'application' ? 'agent' : 'application';
  98. if (application && application.messenger && (to === 'application' || to === 'both')) {
  99. application.messenger._onMessage({ action, data });
  100. }
  101. if (agent && agent.messenger && (to === 'agent' || to === 'both')) {
  102. agent.messenger._onMessage({ action, data });
  103. }
  104. if (opposite && opposite.messenger && to === 'opposite') {
  105. opposite.messenger._onMessage({ action, data });
  106. }
  107. });
  108. return this;
  109. }
  110. _onMessage(message) {
  111. if (message && is.string(message.action)) {
  112. debug('[%s] got message %s with %j', this.pid, message.action, message.data);
  113. this.emit(message.action, message.data);
  114. }
  115. }
  116. close() {
  117. this.removeAllListeners();
  118. }
  119. /**
  120. * @method Messenger#on
  121. * @param {String} action - message key
  122. * @param {Object} data - message value
  123. */
  124. }
  125. module.exports = Messenger;