123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- 'use strict';
- const debug = require('debug')('egg:util:messenger:local');
- const is = require('is-type-of');
- const EventEmitter = require('events');
- /**
- * Communication between app worker and agent worker with EventEmitter
- */
- class Messenger extends EventEmitter {
- constructor(egg) {
- super();
- this.egg = egg;
- }
- /**
- * Send message to all agent and app
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- broadcast(action, data) {
- debug('[%s] broadcast %s with %j', this.pid, action, data);
- this.send(action, data, 'both');
- return this;
- }
- /**
- * send message to the specified process
- * Notice: in single process mode, it only can send to self process,
- * and it will send to both agent and app's messengers.
- * @param {String} pid - the process id of the receiver
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendTo(pid, action, data) {
- debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
- if (pid !== process.pid) return this;
- this.send(action, data, 'both');
- return this;
- }
- /**
- * send message to one worker by random
- * Notice: in single process mode, we only start one agent worker and one app worker
- * - if it's running in agent, it will send to one of app workers
- * - if it's running in app, it will send to agent
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendRandom(action, data) {
- debug('[%s] send %s with %j to opposite', this.pid, action, data);
- this.send(action, data, 'opposite');
- return this;
- }
- /**
- * send message to app
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendToApp(action, data) {
- debug('[%s] send %s with %j to all app', this.pid, action, data);
- this.send(action, data, 'application');
- return this;
- }
- /**
- * send message to agent
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendToAgent(action, data) {
- debug('[%s] send %s with %j to all agent', this.pid, action, data);
- this.send(action, data, 'agent');
- return this;
- }
- /**
- * @param {String} action - message key
- * @param {Object} data - message value
- * @param {String} to - let master know how to send message
- * @return {Messenger} this
- */
- send(action, data, to) {
- // use nextTick to keep it async as IPC messenger
- process.nextTick(() => {
- const { egg } = this;
- let application;
- let agent;
- let opposite;
- if (egg.type === 'application') {
- application = egg;
- agent = egg.agent;
- opposite = agent;
- } else {
- agent = egg;
- application = egg.application;
- opposite = application;
- }
- if (!to) to = egg.type === 'application' ? 'agent' : 'application';
- if (application && application.messenger && (to === 'application' || to === 'both')) {
- application.messenger._onMessage({ action, data });
- }
- if (agent && agent.messenger && (to === 'agent' || to === 'both')) {
- agent.messenger._onMessage({ action, data });
- }
- if (opposite && opposite.messenger && to === 'opposite') {
- opposite.messenger._onMessage({ action, data });
- }
- });
- return this;
- }
- _onMessage(message) {
- if (message && is.string(message.action)) {
- debug('[%s] got message %s with %j', this.pid, message.action, message.data);
- this.emit(message.action, message.data);
- }
- }
- close() {
- this.removeAllListeners();
- }
- /**
- * @method Messenger#on
- * @param {String} action - message key
- * @param {Object} data - message value
- */
- }
- module.exports = Messenger;
|