123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137 |
- 'use strict';
- const debug = require('debug')('egg:util:messenger:ipc');
- const is = require('is-type-of');
- const sendmessage = require('sendmessage');
- const EventEmitter = require('events');
- /**
- * Communication between app worker and agent worker by IPC channel
- */
- class Messenger extends EventEmitter {
- constructor() {
- super();
- this.pid = String(process.pid);
- // pids of agent or app maneged by master
- // - retrieve app worker pids when it's an agent worker
- // - retrieve agent worker pids when it's an app worker
- this.opids = [];
- this.on('egg-pids', pids => {
- this.opids = pids;
- });
- this._onMessage = this._onMessage.bind(this);
- process.on('message', this._onMessage);
- }
- /**
- * Send message to all agent and app
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- broadcast(action, data) {
- debug('[%s] broadcast %s with %j', this.pid, action, data);
- this.send(action, data, 'app');
- this.send(action, data, 'agent');
- return this;
- }
- /**
- * send message to the specified process
- * @param {String} pid - the process id of the receiver
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendTo(pid, action, data) {
- debug('[%s] send %s with %j to %s', this.pid, action, data, pid);
- sendmessage(process, {
- action,
- data,
- receiverPid: String(pid),
- });
- return this;
- }
- /**
- * send message to one app worker by random
- * - if it's running in agent, it will send to one of app workers
- * - if it's running in app, it will send to agent
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendRandom(action, data) {
- /* istanbul ignore if */
- if (!this.opids.length) return this;
- const pid = random(this.opids);
- this.sendTo(String(pid), action, data);
- return this;
- }
- /**
- * send message to app
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendToApp(action, data) {
- debug('[%s] send %s with %j to all app', this.pid, action, data);
- this.send(action, data, 'app');
- return this;
- }
- /**
- * send message to agent
- * @param {String} action - message key
- * @param {Object} data - message value
- * @return {Messenger} this
- */
- sendToAgent(action, data) {
- debug('[%s] send %s with %j to all agent', this.pid, action, data);
- this.send(action, data, 'agent');
- return this;
- }
- /**
- * @param {String} action - message key
- * @param {Object} data - message value
- * @param {String} to - let master know how to send message
- * @return {Messenger} this
- */
- send(action, data, to) {
- sendmessage(process, {
- action,
- data,
- to,
- });
- return this;
- }
- _onMessage(message) {
- if (message && is.string(message.action)) {
- debug('[%s] got message %s with %j, receiverPid: %s',
- this.pid, message.action, message.data, message.receiverPid);
- this.emit(message.action, message.data);
- }
- }
- close() {
- process.removeListener('message', this._onMessage);
- this.removeAllListeners();
- }
- /**
- * @method Messenger#on
- * @param {String} action - message key
- * @param {Object} data - message value
- */
- }
- module.exports = Messenger;
- function random(arr) {
- const index = Math.floor(Math.random() * arr.length);
- return arr[index];
- }
|