diff --git a/lib/mon.js b/lib/mon.js index 6f2526d..91420ca 100644 --- a/lib/mon.js +++ b/lib/mon.js @@ -3,7 +3,6 @@ var fs = require('fs'), nconf = require('nconf'), Debug = require('./util/debug'), stat = require('./stat'), - chokidar = require('chokidar'), _ = require('lodash'), chalk = require('chalk'), ansiHTML = require('ansi-html'), @@ -141,8 +140,8 @@ Monitor.prototype.run = function(){ this._beats = {}; - // Watch `pids` directory - this._watchPids(); + // Watching PM2 + this._startWatching(); // Listen connection event. this._sockio.on('connection', this._connectSock.bind(this)); @@ -163,23 +162,8 @@ Monitor.prototype._connectSock = function(socket){ }.bind(this)); // Tail logs - socket.on('tail_beat', function(pm_id){ - this._tailLogs(pm_id, socket); - }.bind(this)); - - socket.on('tail_destroy', function(pm_id){ - this._beatTimer && clearTimeout(this._beatTimer); - var beat = this._beats[pm_id]; - beat.tails && beat.tails.forEach(function(tail){ - tail.kill('SIGTERM'); - }); - delete this._beats[pm_id]; - - this._log.d(chalk.magenta('tail'), chalk.red('destroy'), pm_id); - - // Recheck - this._checkTailBeat(); - }.bind(this)); + socket.on('tail_beat', this._tailLogs.bind(this, socket)); + socket.on('tail_destroy', this._checkTailBeat.bind(this, socket.id)) // Trigger actions of process. socket.on('action', function(action, id){ @@ -209,20 +193,39 @@ Monitor.prototype._connectSock = function(socket){ * @param {String} pm_id * @private */ -Monitor.prototype._tailLogs = function(pm_id, socket){ - if (this._beats[pm_id]) { - this._beats[pm_id].tick = Date.now(); +Monitor.prototype._tailLogs = function(socket, pm_id){ + var beat; + if ((beat = this._beats[pm_id])) { + (!beat.sockets[socket.id]) && (beat.sockets[socket.id] = socket); + beat.tick = Date.now(); + this._beats[pm_id] = beat; return; } - this._log.d(chalk.magenta('tail'), pm_id); + this._log.i('tail', pm_id); this._beats[pm_id] = { - tick: Date.now() + tick : Date.now(), + sockets: {} }; + this._beats[pm_id].sockets[socket.id] = socket; + + function broadcast(data){ + var beat = this._beats[pm_id]; + if (!beat) { + this._log.e('beat does not exist.'); + return; + } + for (var key in beat.sockets) { + beat.sockets[key].emit('tail', data) + } + } function emitError(err){ - socket.emit('tail', {pm_id: pm_id, msg: 'Error: ' + err.message + ''}); + broadcast.call(this, { + pm_id: pm_id, + msg: 'Error: ' + err.message + '' + }); } pm.tail({ @@ -231,19 +234,19 @@ Monitor.prototype._tailLogs = function(pm_id, socket){ pm_id : pm_id }, function(err, lines){ if (err) { - return emitError(err); + return emitError.call(this, err); } - // Emit tail to client - socket.emit('tail', { + // Emit tail to clients. + broadcast.call(this, { pm_id: pm_id, - msg : lines.map(function(line){ + msg: lines.map(function(line){ line = line.replace(/\s/, ' '); return '' + ansiHTML(line) + ''; }).join('') }); - }, function(err, tails){ + }.bind(this), function(err, tails){ if (err) { - return emitError(err); + return emitError.call(this, err); } this._log.d(chalk.magenta('tail'), 'tailing...'); @@ -257,17 +260,33 @@ Monitor.prototype._tailLogs = function(pm_id, socket){ * @returns {number} * @private */ -Monitor.prototype._checkTailBeat = function(){ +Monitor.prototype._checkTailBeat = function(socketId, uid){ this._beatTimer && clearTimeout(this._beatTimer); - for (var key in this._beats) { - var beat = this._beats[key]; - // Kill timeout beats. - if (Date.now() - beat.tick > 4000) { - beat.tails && beat.tails.forEach(function(tail){ - tail.kill('SIGTERM'); - }); - this._log.d(chalk.magenta('tail'), chalk.red('destroy'), key); - delete this._beats[key]; + + function destroyTail(beat, key){ + beat.tails && beat.tails.forEach(function(tail){ + tail.kill('SIGTERM'); + }); + this._log.d(chalk.magenta('tail'), chalk.red('destroy'), key); + delete this._beats[key]; + } + + if (socketId && uid) { + this._log.i('tail', chalk.red('destroy'), uid, socketId); + var beat = this._beats[uid]; + if (beat && beat.sockets) { + delete beat.sockets[socketId]; + } + if (Object.keys(beat.sockets).length == 0) { + destroyTail.call(this, beat, uid); + } + } else { + for (var key in this._beats) { + var beat = this._beats[key]; + // Kill timeout beats. + if (Date.now() - beat.tick > 4000) { + destroyTail.call(this, beat, key); + } } } @@ -325,28 +344,11 @@ Monitor.prototype._systemStat = function(cb){ } /** - * Watch `pids` directory. + * Watching PM2 * @private */ -Monitor.prototype._watchPids = function(){ - var pidPath = this.options.pm2Conf.DEFAULT_PID_PATH; - - // Verify directory exist or not. - if (!fs.existsSync(pidPath)) { - this._procs = 'The `pids` directory does not exist, it is due to locate PM2 root failed, try to set it by `$ pm2-gui set pm2 [ROOT]`'; - return this._broadcast('info', this._procs); - } - - this._log.i('chokidar', 'watching', pidPath); - - // Chokidar doesn't watch the `0 byte size` file at all, so we try to watch `pids` directory. - // And if there has any changes, try to get sockets from `sock`. - chokidar.watch(pidPath, { - ignored : false, - persistent: true - }).on('all', function(e, p){ - this._log.i('chokidar', e, p); - +Monitor.prototype._startWatching = function(){ + pm.sub(this.options.pm2Conf.DAEMON_PUB_PORT, function(){ // Avoid refresh bomb. if (this._throttle) { clearTimeout(this._throttle); @@ -356,6 +358,11 @@ Monitor.prototype._watchPids = function(){ ctx._refreshProcs(); }, 500, this); }.bind(this)); + + this._throttle = setTimeout(function(ctx){ + ctx._throttle = null; + ctx._refreshProcs(); + }, 500, this); }; /** diff --git a/lib/pm.js b/lib/pm.js index 8cd18e1..4c1404f 100644 --- a/lib/pm.js +++ b/lib/pm.js @@ -14,7 +14,24 @@ var spawn = require('child_process').spawn, */ var pm = module.exports = {}; -var re_blank = /^[\s\r\t]*$/; +var re_blank = /^[\s\r\t]*$/, + allowedEvents = ['start', 'restart', 'exit'] + +/** + * Subscribe event BUS. + * @param {String} sockPath + * @param {Function} cb + */ +pm.sub = function(sockPath, cb){ + var sub = axon.socket('sub-emitter'), + sub_sock = sub.connect(sockPath); + + sub.on('process:*', function(event, data){ + if(data && !!~allowedEvents.indexOf(data.event)){ + cb(); + } + }); +}; /** * Get PM2 version.