feature: shares TAIL logs and watching PM2 by sub-emitter

This commit is contained in:
Tjatse 2014-12-16 17:40:06 +08:00
parent fc33345699
commit bfc2d6507e
2 changed files with 87 additions and 63 deletions

View File

@ -3,7 +3,6 @@ var fs = require('fs'),
nconf = require('nconf'),
Debug = require('./util/debug'),
stat = require('./stat'),
chokidar = require('chokidar'),
_ = require('lodash'),
chalk = require('chalk'),
ansiHTML = require('ansi-html'),
@ -141,8 +140,8 @@ Monitor.prototype.run = function(){
this._beats = {};
// Watch `pids` directory
this._watchPids();
// Watching PM2
this._startWatching();
// Listen connection event.
this._sockio.on('connection', this._connectSock.bind(this));
@ -163,23 +162,8 @@ Monitor.prototype._connectSock = function(socket){
}.bind(this));
// Tail logs
socket.on('tail_beat', function(pm_id){
this._tailLogs(pm_id, socket);
}.bind(this));
socket.on('tail_destroy', function(pm_id){
this._beatTimer && clearTimeout(this._beatTimer);
var beat = this._beats[pm_id];
beat.tails && beat.tails.forEach(function(tail){
tail.kill('SIGTERM');
});
delete this._beats[pm_id];
this._log.d(chalk.magenta('tail'), chalk.red('destroy'), pm_id);
// Recheck
this._checkTailBeat();
}.bind(this));
socket.on('tail_beat', this._tailLogs.bind(this, socket));
socket.on('tail_destroy', this._checkTailBeat.bind(this, socket.id))
// Trigger actions of process.
socket.on('action', function(action, id){
@ -209,20 +193,39 @@ Monitor.prototype._connectSock = function(socket){
* @param {String} pm_id
* @private
*/
Monitor.prototype._tailLogs = function(pm_id, socket){
if (this._beats[pm_id]) {
this._beats[pm_id].tick = Date.now();
Monitor.prototype._tailLogs = function(socket, pm_id){
var beat;
if ((beat = this._beats[pm_id])) {
(!beat.sockets[socket.id]) && (beat.sockets[socket.id] = socket);
beat.tick = Date.now();
this._beats[pm_id] = beat;
return;
}
this._log.d(chalk.magenta('tail'), pm_id);
this._log.i('tail', pm_id);
this._beats[pm_id] = {
tick: Date.now()
tick : Date.now(),
sockets: {}
};
this._beats[pm_id].sockets[socket.id] = socket;
function broadcast(data){
var beat = this._beats[pm_id];
if (!beat) {
this._log.e('beat does not exist.');
return;
}
for (var key in beat.sockets) {
beat.sockets[key].emit('tail', data)
}
}
function emitError(err){
socket.emit('tail', {pm_id: pm_id, msg: '<span style="color: #ff0000">Error: ' + err.message + '</span>'});
broadcast.call(this, {
pm_id: pm_id,
msg: '<span style="color: #ff0000">Error: ' + err.message + '</span>'
});
}
pm.tail({
@ -231,19 +234,19 @@ Monitor.prototype._tailLogs = function(pm_id, socket){
pm_id : pm_id
}, function(err, lines){
if (err) {
return emitError(err);
return emitError.call(this, err);
}
// Emit tail to client
socket.emit('tail', {
// Emit tail to clients.
broadcast.call(this, {
pm_id: pm_id,
msg : lines.map(function(line){
msg: lines.map(function(line){
line = line.replace(/\s/, '&nbsp;');
return '<span>' + ansiHTML(line) + '</span>';
}).join('')
});
}, function(err, tails){
}.bind(this), function(err, tails){
if (err) {
return emitError(err);
return emitError.call(this, err);
}
this._log.d(chalk.magenta('tail'), 'tailing...');
@ -257,17 +260,33 @@ Monitor.prototype._tailLogs = function(pm_id, socket){
* @returns {number}
* @private
*/
Monitor.prototype._checkTailBeat = function(){
Monitor.prototype._checkTailBeat = function(socketId, uid){
this._beatTimer && clearTimeout(this._beatTimer);
for (var key in this._beats) {
var beat = this._beats[key];
// Kill timeout beats.
if (Date.now() - beat.tick > 4000) {
beat.tails && beat.tails.forEach(function(tail){
tail.kill('SIGTERM');
});
this._log.d(chalk.magenta('tail'), chalk.red('destroy'), key);
delete this._beats[key];
function destroyTail(beat, key){
beat.tails && beat.tails.forEach(function(tail){
tail.kill('SIGTERM');
});
this._log.d(chalk.magenta('tail'), chalk.red('destroy'), key);
delete this._beats[key];
}
if (socketId && uid) {
this._log.i('tail', chalk.red('destroy'), uid, socketId);
var beat = this._beats[uid];
if (beat && beat.sockets) {
delete beat.sockets[socketId];
}
if (Object.keys(beat.sockets).length == 0) {
destroyTail.call(this, beat, uid);
}
} else {
for (var key in this._beats) {
var beat = this._beats[key];
// Kill timeout beats.
if (Date.now() - beat.tick > 4000) {
destroyTail.call(this, beat, key);
}
}
}
@ -325,28 +344,11 @@ Monitor.prototype._systemStat = function(cb){
}
/**
* Watch `pids` directory.
* Watching PM2
* @private
*/
Monitor.prototype._watchPids = function(){
var pidPath = this.options.pm2Conf.DEFAULT_PID_PATH;
// Verify directory exist or not.
if (!fs.existsSync(pidPath)) {
this._procs = 'The `pids` directory does not exist, it is due to locate PM2 root failed, try to set it by `$ pm2-gui set pm2 [ROOT]`';
return this._broadcast('info', this._procs);
}
this._log.i('chokidar', 'watching', pidPath);
// Chokidar doesn't watch the `0 byte size` file at all, so we try to watch `pids` directory.
// And if there has any changes, try to get sockets from `sock`.
chokidar.watch(pidPath, {
ignored : false,
persistent: true
}).on('all', function(e, p){
this._log.i('chokidar', e, p);
Monitor.prototype._startWatching = function(){
pm.sub(this.options.pm2Conf.DAEMON_PUB_PORT, function(){
// Avoid refresh bomb.
if (this._throttle) {
clearTimeout(this._throttle);
@ -356,6 +358,11 @@ Monitor.prototype._watchPids = function(){
ctx._refreshProcs();
}, 500, this);
}.bind(this));
this._throttle = setTimeout(function(ctx){
ctx._throttle = null;
ctx._refreshProcs();
}, 500, this);
};
/**

View File

@ -14,7 +14,24 @@ var spawn = require('child_process').spawn,
*/
var pm = module.exports = {};
var re_blank = /^[\s\r\t]*$/;
var re_blank = /^[\s\r\t]*$/,
allowedEvents = ['start', 'restart', 'exit']
/**
* Subscribe event BUS.
* @param {String} sockPath
* @param {Function} cb
*/
pm.sub = function(sockPath, cb){
var sub = axon.socket('sub-emitter'),
sub_sock = sub.connect(sockPath);
sub.on('process:*', function(event, data){
if(data && !!~allowedEvents.indexOf(data.event)){
cb();
}
});
};
/**
* Get PM2 version.