ipc.js 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. 'use strict';
  2. const debug = require('debug')('egg:util:messenger:ipc');
  3. const is = require('is-type-of');
  4. const sendmessage = require('sendmessage');
  5. const EventEmitter = require('events');
  6. /**
  7. * Communication between app worker and agent worker by IPC channel
  8. */
  9. class Messenger extends EventEmitter {
  10. constructor() {
  11. super();
  12. this.pid = String(process.pid);
  13. // pids of agent or app maneged by master
  14. // - retrieve app worker pids when it's an agent worker
  15. // - retrieve agent worker pids when it's an app worker
  16. this.opids = [];
  17. this.on('egg-pids', pids => {
  18. this.opids = pids;
  19. });
  20. this._onMessage = this._onMessage.bind(this);
  21. process.on('message', this._onMessage);
  22. }
  23. /**
  24. * Send message to all agent and app
  25. * @param {String} action - message key
  26. * @param {Object} data - message value
  27. * @return {Messenger} this
  28. */
  29. broadcast(action, data) {
  30. debug('[%s] broadcast %s with %j', this.pid, action, data);
  31. this.send(action, data, 'app');
  32. this.send(action, data, 'agent');
  33. return this;
  34. }
  35. /**
  36. * send message to the specified process
  37. * @param {String} pid - the process id of the receiver
  38. * @param {String} action - message key
  39. * @param {Object} data - message value
  40. * @return {Messenger} this
  41. */
  42. sendTo(pid, action, data) {
  43. debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
  44. sendmessage(process, {
  45. action,
  46. data,
  47. receiverPid: String(pid),
  48. });
  49. return this;
  50. }
  51. /**
  52. * send message to one app worker by random
  53. * - if it's running in agent, it will send to one of app workers
  54. * - if it's running in app, it will send to agent
  55. * @param {String} action - message key
  56. * @param {Object} data - message value
  57. * @return {Messenger} this
  58. */
  59. sendRandom(action, data) {
  60. /* istanbul ignore if */
  61. if (!this.opids.length) return this;
  62. const pid = random(this.opids);
  63. this.sendTo(String(pid), action, data);
  64. return this;
  65. }
  66. /**
  67. * send message to app
  68. * @param {String} action - message key
  69. * @param {Object} data - message value
  70. * @return {Messenger} this
  71. */
  72. sendToApp(action, data) {
  73. debug('[%s] send %s with %j to all app', this.pid, action, data);
  74. this.send(action, data, 'app');
  75. return this;
  76. }
  77. /**
  78. * send message to agent
  79. * @param {String} action - message key
  80. * @param {Object} data - message value
  81. * @return {Messenger} this
  82. */
  83. sendToAgent(action, data) {
  84. debug('[%s] send %s with %j to all agent', this.pid, action, data);
  85. this.send(action, data, 'agent');
  86. return this;
  87. }
  88. /**
  89. * @param {String} action - message key
  90. * @param {Object} data - message value
  91. * @param {String} to - let master know how to send message
  92. * @return {Messenger} this
  93. */
  94. send(action, data, to) {
  95. sendmessage(process, {
  96. action,
  97. data,
  98. to,
  99. });
  100. return this;
  101. }
  102. _onMessage(message) {
  103. if (message && is.string(message.action)) {
  104. debug('[%s] got message %s with %j, receiverPid: %s',
  105. this.pid, message.action, message.data, message.receiverPid);
  106. this.emit(message.action, message.data);
  107. }
  108. }
  109. close() {
  110. process.removeListener('message', this._onMessage);
  111. this.removeAllListeners();
  112. }
  113. /**
  114. * @method Messenger#on
  115. * @param {String} action - message key
  116. * @param {Object} data - message value
  117. */
  118. }
  119. module.exports = Messenger;
  120. function random(arr) {
  121. const index = Math.floor(Math.random() * arr.length);
  122. return arr[index];
  123. }