diff --git a/lib/monitor.js b/lib/monitor.js
index b4ff108..292f0e4 100644
--- a/lib/monitor.js
+++ b/lib/monitor.js
@@ -1,3 +1,6 @@
+'use strict'
+
+var ms = require('ms')
var fs = require('fs')
var path = require('path')
var _ = require('lodash')
@@ -7,11 +10,16 @@ var totalmem = require('os').totalmem()
var pidusage = require('pidusage')
var url = require('url')
var socketIOClient = require('socket.io-client')
+
var pm = require('./pm')
var stat = require('./stat')
var conf = require('./util/conf')
var Log = require('./util/log')
+
var defConf
+var ignoredErrorKeys = ['namespace', 'keepANSI']
+var msKeys = ['refresh', 'process_refresh']
+var allowedSysStatsKeys = ['cpus', 'arch', 'hostname', 'platform', 'release', 'uptime', 'memory']
module.exports = Monitor
@@ -30,7 +38,7 @@ function Monitor (options) {
this._init(options)
}
-Monitor.ACCEPT_KEYS = ['pm2', 'refresh', 'daemonize', 'readonly', 'max_restarts', 'port', 'log', 'agent', 'remotes', 'origins']
+Monitor.ACCEPT_KEYS = ['pm2', 'daemonize', 'readonly', 'max_restarts', 'port', 'log', 'agent', 'remotes', 'origins'].concat(msKeys)
Monitor.DEF_CONF_FILE = 'pm2-gui.ini'
Monitor.PM2_DAEMON_PROPS = ['DAEMON_RPC_PORT', 'DAEMON_PUB_PORT']
@@ -38,14 +46,7 @@ Monitor.PM2_DAEMON_PROPS = ['DAEMON_RPC_PORT', 'DAEMON_PUB_PORT']
* Run socket.io server.
*/
Monitor.prototype.run = function () {
- this._noClient = true
-
- this._tails = {}
- this._usages = {}
-
- // Observe PM2
this._observePM2()
-
this._listeningSocketIO()
}
@@ -54,510 +55,49 @@ Monitor.prototype.run = function () {
* @return {[type]} [description]
*/
Monitor.prototype.quit = function () {
- if (this.pm2Sock) {
+ // close pm2 subscriber if necessary.
+ if (this._cache.pm2Subscriber) {
console.debug('Closing pm2 pub emitter socket.')
- this.pm2Sock.close()
+ this._cache.pm2Subscriber.close()
+ this._cache.pm2Subscriber = null
}
- if (this._sockio) {
- console.debug('Closing socket.io server.')
- this._sockio.close()
- console.debug('Destroying tails.')
- this._killTailProcess()
+ // close log subscriber if necessary.
+ this._closeLogSubscribers()
+
+ // close pm2 sockio if necessary.
+ if (this._cache.sockio) {
+ console.debug('Closing socket.io server.')
+ this._cache.sockio.close()
+ this._cache.sockio = null
}
}
/**
* Connect to socket.io server.
- * @param {String} ns the namespace.
- * @param {Function} success
- * @param {Function} failure
+ * @param {Object} options.
+ * @param {Function} fn
*/
-Monitor.prototype.connect = function (options, success, failure) {
+Monitor.prototype.connect = function (options, fn) {
if (!options.port) {
throw new Error('Port is required!')
}
var serverUri = Monitor.toConnectionString(options)
-
- success = _.once(success)
- failure = _.once(failure)
-
console.info('Connecting to', serverUri)
+
+ fn = _.once(fn)
+
var socket = socketIOClient(serverUri)
- socket.on('connect', function () {
- success(socket)
+ socket.on(conf.SOCKET_EVENTS.CONNECT, function () {
+ fn(null, socket)
})
- socket.on('error', function (err) {
- !failure(err, socket)
+ socket.on(conf.SOCKET_EVENTS.ERROR, function (err) {
+ fn(err, socket)
})
- socket.on('connect_error', function (err) {
- !failure(err, socket)
- })
-}
-
-/**
- * Resolve home path.
- * @param {String} pm2Home
- * @returns {*}
- * @private
- */
-Monitor.prototype._resolveHome = function (pm2Home) {
- if (pm2Home && pm2Home.indexOf('~/') === 0) {
- // Get root directory of PM2.
- pm2Home = process.env.PM2_HOME || path.resolve(process.env.HOME || process.env.HOMEPATH, pm2Home.substr(2))
-
- // Make sure exist.
- if (!pm2Home || !fs.existsSync(pm2Home)) {
- throw new Error('PM2 root can not be located, try to initialize PM2 by executing `pm2 ls` or set environment variable vi `export PM2_HOME=[ROOT]`.')
- }
- }
- return pm2Home
-}
-
-/**
- * Initialize options and configurations.
- * @private
- */
-Monitor.prototype._init = function (options) {
- options = options || {}
-
- defConf = conf.File(options.confFile || path.resolve(__dirname, '..', Monitor.DEF_CONF_FILE)).loadSync().valueOf()
- defConf = _.pick.call(null, defConf, Monitor.ACCEPT_KEYS)
-
- options = _.pick.apply(options, Monitor.ACCEPT_KEYS).valueOf()
- options = _.defaults(options, defConf)
-
- options.pm2 = this._resolveHome(options.pm2)
- Log(options.log)
-
- // Load PM2 config.
- var pm2ConfPath = path.join(options.pm2, 'conf.js')
- var fbMsg = ''
- try {
- options.pm2Conf = require(pm2ConfPath)(options.pm2)
- if (!options.pm2Conf) {
- throw new Error(404)
- }
- } catch (err) {
- fbMsg = 'Can not load PM2 config, the file "' + pm2ConfPath + '" does not exist or empty, fallback to auto-load by pm2 home. '
- console.warn(fbMsg)
- options.pm2Conf = {
- DAEMON_RPC_PORT: path.resolve(options.pm2, 'rpc.sock'),
- DAEMON_PUB_PORT: path.resolve(options.pm2, 'pub.sock'),
- PM2_LOG_FILE_PATH: path.resolve(options.pm2, 'pm2.log')
- }
- }
-
- Monitor.PM2_DAEMON_PROPS.forEach(function (prop) {
- var val = options.pm2Conf[prop]
- if (!val || !fs.existsSync(val)) {
- throw new Error(fbMsg + 'Unfortunately ' + (val || prop) + ' can not found, please makesure that your pm2 is running and the home path is correct.')
- }
- })
-
- // Bind socket.io server to context.
- if (options.sockio) {
- this.sockio = options.sockio
- delete options.sockio
- }
-
- // Bind to context.
- this.options = options
- Object.freeze(this.options)
-}
-
-/**
- * Connection event of `sys` namespace.
- * @param {Socket} socket
- * @private
- */
-Monitor.prototype._connectSysSock = function (socket) {
- var self = this
- // Still has one client connects to server at least.
- self._noClient = false
-
- socket.on('disconnect', function () {
- // Check connecting client.
- self._noClient = self._sockio.of(conf.NSP.SYS).sockets.length === 0
- })
-
- // Trigger actions of process.
- socket.on('action', function (action, id) {
- var prefix = '[pm2:' + id + ']'
- console.debug(prefix, action, 'sending to pm2 daemon...')
- if (self.options.readonly) {
- console.warn(prefix, 'denied, readonly!')
- return socket.emit('action', id, 'Can not complete the action due to denied by server, it is readonly!')
- }
- pm.action(self.options.pm2Conf.DAEMON_RPC_PORT, action, id, function (err, forceRefresh) {
- if (err) {
- console.error(action, err.message)
- return socket.emit('action', id, 'Can not complete the action due to ' + err.message)
- }
- console.debug(prefix, action, 'completed!')
- forceRefresh && self._throttleRefresh()
- })
- })
- sendProcs()
- socket.on('procs', sendProcs)
- self._pm2Ver(socket)
- this._sysStat && this._broadcast('system_stat', this._sysStat)
-
- // Grep system states once and again.
- if (this._status !== 'R') {
- this._nextTick(this.options.refresh || 5000)
- }
-
- function sendProcs () {
- self._procs && socket.emit(typeof self._procs === 'string' ? 'info' : 'procs', self._procs)
- }
-}
-
-/**
- * Connection event of `log` namespace.
- * @param {socket.io} socket
- * @private
- */
-Monitor.prototype._connectLogSock = function (socket) {
- var self = this
-
- // Emit error.
- function emitError (err, pmId, keepANSI) {
- var data = {
- pm_id: pmId,
- msg: keepANSI ? chalk.red(err.message) : 'Error: ' + err.message + ''
- }
- self._broadcast.call(self, 'log', data, conf.NSP.LOG) // eslint-disable-line no-useless-call
- }
-
- function startTailProcess (pmId, keepANSI) {
- socket._pm_id = pmId
-
- if (self._tails[pmId]) {
- return
- }
-
- // Tail logs.
- pm.tail({
- sockPath: self.options.pm2Conf.DAEMON_RPC_PORT,
- logPath: self.options.pm2Conf.PM2_LOG_FILE_PATH,
- pm_id: pmId
- }, function (err, lines) {
- if (err) {
- return emitError(err, pmId, keepANSI)
- }
- // Emit logs to clients.
- var data = {
- pm_id: pmId,
- msg: lines.map(function (line) {
- if (!keepANSI) {
- line = line.replace(/\s/, ' ')
- return '' + ansiHTML(line) + ''
- } else {
- return line
- }
- }).join(keepANSI ? '\n' : '')
- }
- self._broadcast.call(self, 'log', data, conf.NSP.LOG) // eslint-disable-line no-useless-call
- }, function (err, tail) {
- if (err) {
- return emitError(err, pmId, keepANSI)
- }
- if (!tail) {
- return emitError(new Error('No log can be found.'), pmId, keepANSI)
- }
-
- console.info('[pm2:' + pmId + ']', 'tail starting...')
- self._tails[pmId] = tail
- })
- }
-
- socket.on('disconnect', self._killTailProcess.bind(self))
- socket.on('tail_kill', self._killTailProcess.bind(self))
- socket.on('tail', startTailProcess)
- console.info('Connected to ' + socket.nsp.name + '!')
-}
-
-/**
- * Connection event of `proc` namespace.
- * @param {socket.io} socket
- * @private
- */
-Monitor.prototype._connectProcSock = function (socket) {
- var self = this
- // Emit error.
- function emitError (err, pid) {
- var data = {
- pid: pid,
- msg: 'Error: ' + err.message + ''
- }
- self._broadcast.call(self, 'proc', data, conf.NSP.PROC) // eslint-disable-line no-useless-call
- }
-
- function killObserver () {
- var socks = self._sockio.of(conf.NSP.PROC).sockets
- var canNotBeDeleted = {}
-
- if (Array.isArray(socks) && socks.length > 0) {
- socks.forEach(function (sock) {
- if (sock._pid) {
- canNotBeDeleted[sock._pid.toString()] = 1
- }
- })
- }
-
- for (var pid in self._usages) {
- var timer
- if (!canNotBeDeleted[pid] && (timer = self._usages[pid])) {
- clearInterval(timer)
- delete self._usages[pid]
- console.debug('[pid:' + pid + ']', 'cpu and memory observer destroyed!')
- }
- }
- }
-
- function runObserver (pid) {
- socket._pid = pid
-
- var pidStr = pid.toString()
- if (self._usages[pidStr]) {
- return
- }
-
- console.debug('[pid:' + pidStr + ']', 'cpu and memory observer is running...')
-
- function runTimer () {
- pidusage.stat(pid, function (err, stat) {
- if (err) {
- clearInterval(self._usages[pidStr])
- delete self._usages[pidStr]
- return emitError.call(self, err, pid)
- }
- stat.memory = stat.memory * 100 / totalmem
-
- var data = {
- pid: pid,
- time: Date.now(),
- usage: stat
- }
- self._broadcast.call(self, 'proc', data, conf.NSP.PROC) // eslint-disable-line no-useless-call
- })
- }
-
- self._usages[pidStr] = setInterval(runTimer, 3000)
- runTimer(this)
- }
-
- socket.on('disconnect', killObserver)
- socket.on('proc', runObserver)
- console.info('Connected to ' + socket.nsp.name + '!')
-}
-
-/**
- * Grep system state loop
- * @param {Number} tick
- * @private
- */
-Monitor.prototype._nextTick = function (tick, continuously) {
- // Return it if worker is running.
- if (this._status === 'R' && !continuously) {
- return
- }
- // Running
- this._status = 'R'
- console.debug('monitor heartbeat per', tick + 'ms')
- // Grep system state
- this._systemStat(function () {
- // If there still has any client, grep again after `tick` ms.
- if (!this._noClient) {
- return setTimeout(this._nextTick.bind(this, tick, true), tick)
- }
- // Stop
- delete this._status
- console.debug('monitor heartbeat destroyed!')
- })
-}
-
-/**
- * Grep system states.
- * @param {Function} cb
- * @private
- */
-Monitor.prototype._systemStat = function (cb) {
- stat.cpuUsage(function (err, cpuUsage) {
- if (err) {
- // Log only.
- console.error('Can not load system/cpu/memory information: ', err.message)
- } else {
- // System states.
- this._sysStat = _.defaults(_(stat).pick('cpus', 'arch', 'hostname', 'platform', 'release', 'uptime', 'memory').clone(), {
- cpu: cpuUsage
- })
- this._broadcast.call(this, 'system_stat', this._sysStat) // eslint-disable-line no-useless-call
- }
- cb.call(this)
- }, this)
-}
-
-/**
- * Observe PM2
- * @private
- */
-Monitor.prototype._observePM2 = function () {
- var pm2Daemon = this.options.pm2Conf.DAEMON_PUB_PORT
- console.info('Connecting to pm2 daemon:', pm2Daemon)
- this.pm2Sock = pm.sub(pm2Daemon, function (data) {
- console.info(chalk.magenta(data.event), data.process.name + '-' + data.process.pm_id)
- this._throttleRefresh()
- }, this)
-
- // Enforce a refresh operation if RPC is not online.
- this._throttleRefresh()
-}
-
-/**
- * Throttle the refresh behavior to avoid refresh bomb
- * @private
- */
-Monitor.prototype._throttleRefresh = function () {
- if (this._throttle) {
- clearTimeout(this._throttle)
- }
- this._throttle = setTimeout(function (ctx) {
- ctx._throttle = null
- ctx._refreshProcs()
- }, 500, this)
-}
-
-/**
- * Refresh processes
- * @private
- */
-Monitor.prototype._refreshProcs = function () {
- pm.list(this.options.pm2Conf.DAEMON_RPC_PORT, function (err, procs) {
- if (err) {
- return this._broadcast('info', 'Can not connect to pm2 daemon, ' + err.message)
- }
- // Wrap processes and cache them.
- this._procs = procs.map(function (proc) {
- proc.pm2_env = proc.pm2_env || {
- USER: 'UNKNOWN'
- }
- var pm2Env = {
- user: proc.pm2_env.USER
- }
-
- for (var key in proc.pm2_env) {
- // Ignore useless fields.
- if (key.slice(0, 1) === '_' ||
- key.indexOf('axm_') === 0 || !!~['versioning', 'command'].indexOf(key) ||
- key.charCodeAt(0) <= 90) {
- continue
- }
- pm2Env[key] = proc.pm2_env[key]
- }
- proc.pm2_env = pm2Env
- return proc
- })
- // Emit to client.
- this._broadcast('procs', this._procs)
- }, this)
-}
-
-/**
- * Get PM2 version and return it to client.
- * @private
- */
-Monitor.prototype._pm2Ver = function (socket) {
- var pm2RPC = this.options.pm2Conf.DAEMON_RPC_PORT
- console.info('Fetching pm2 version:', pm2RPC)
- pm.version(pm2RPC, function (err, version) {
- socket.emit('pm2_ver', (err || !version) ? '0.0.0' : version)
- })
-}
-
-/**
- * Broadcast to all connected clients.
- * @param {String} event
- * @param {Object} data
- * @param {String} nsp
- * @private
- */
-Monitor.prototype._broadcast = function (event, data, nsp) {
- nsp = nsp || conf.NSP.SYS
-
- if (this._noClient) {
- return console.debug('No client is connecting, ignore broadcasting', event, 'to', nsp)
- }
- console.debug('Broadcasting', event, 'to', nsp)
- this._sockio.of(nsp).emit(event, data)
-}
-
-/**
- * Destroy tails.
- * @param {Number} pm_id
- * @return {[type]}
- */
-Monitor.prototype._killTailProcess = function (pmId) {
- var self = this
-
- function killTail (id) {
- var tail = self._tails[id]
- if (!tail) {
- return
- }
- try {
- tail.kill('SIGTERM')
- } catch (err) {}
-
- delete self._tails[id]
- console.info('[pm2:' + id + ']', 'tail destroyed!')
- }
- if (!isNaN(pmId)) {
- return killTail(pmId)
- }
-
- var socks = self._sockio.of(conf.NSP.LOG).sockets
- var canNotBeDeleted = {}
- if (socks && socks.length > 0) {
- socks.forEach(function (sock) {
- canNotBeDeleted[sock._pm_id] = 1
- })
- }
-
- for (var _id in self._tails) {
- if (!canNotBeDeleted[_id]) {
- killTail(_id)
- }
- }
-}
-
-/**
- * Listening all the nsp.
- */
-Monitor.prototype._listeningSocketIO = function () {
- if (!this._sockio || this._sockio._listening) {
- console.warn('Avoid duplicated listening!')
- return
- }
-
- this._sockio._listening = true
- for (var nsp in conf.NSP) {
- this._sockio.of(conf.NSP[nsp]).on('connection', this['_connect' + (nsp[0] + nsp.toLowerCase().slice(1)) + 'Sock'].bind(this))
- console.info('Listening connection event on', nsp.toLowerCase())
- }
-
- var auth
- if (!(this.options.agent && (auth = this.options.agent.authorization))) {
- return
- }
- this._sockio.use(function (socket, next) {
- if (auth !== socket.handshake.query.auth) {
- return next(new Error('unauthorized'))
- }
- next()
+ socket.on(conf.SOCKET_EVENTS.CONNECT_ERROR, function (err) {
+ fn(err, socket)
})
}
@@ -654,7 +194,7 @@ Monitor.parseConnectionString = function (connectionString) {
if (connectionString) {
connectionString = url.parse(connectionString)
connection.hostname = connectionString.hostname
- connection.port = connectionString.port
+ connection.port = !isNaN(connectionString.port) ? parseFloat(connectionString.port) : connectionString.port
connection.path = (connectionString.path || '').replace(/^\/+/, '')
connection.protocol = connectionString.protocol
}
@@ -663,13 +203,559 @@ Monitor.parseConnectionString = function (connectionString) {
Object.defineProperty(Monitor.prototype, 'sockio', {
set: function (io) {
- if (this._sockio) {
- this._sockio.close()
+ if (this._cache.sockio) {
+ this._cache.sockio.close()
}
- this._sockio = io
+ this._cache.sockio = io
this._listeningSocketIO()
},
get: function () {
- return this._sockio
+ return this._cache.sockio
}
})
+
+/**
+ * Resolve home path of PM2.
+ * @param {String} pm2Home
+ * @returns {*}
+ * @private
+ */
+Monitor.prototype._resolveHome = function (pm2Home) {
+ if (pm2Home && pm2Home.indexOf('~/') === 0) {
+ // Get root directory of PM2.
+ pm2Home = process.env.PM2_HOME || path.resolve(process.env.HOME || process.env.HOMEPATH, pm2Home.substr(2))
+
+ // Make sure exist.
+ if (!pm2Home || !fs.existsSync(pm2Home)) {
+ throw new Error('PM2 root can not be located, try to initialize PM2 by executing `pm2 ls` or set environment variable vi `export PM2_HOME=[ROOT]`.')
+ }
+ }
+ return pm2Home
+}
+
+/**
+ * Initialize options and configurations.
+ * @param {Object} options
+ * @return {N/A}
+ */
+Monitor.prototype._init = function (options) {
+ // mixing options & default settings.
+ options = options || {}
+
+ defConf = conf.File(options.confFile || path.resolve(__dirname, '..', Monitor.DEF_CONF_FILE)).loadSync().valueOf()
+ defConf = _.pick.call(null, defConf, Monitor.ACCEPT_KEYS)
+
+ options = _.pick.apply(options, Monitor.ACCEPT_KEYS).valueOf()
+ options = _.defaults(options, defConf)
+ // converts various time formats to milliseconds
+ msKeys.forEach(function (timeKey) {
+ var time
+ if (_.isString(time = options[timeKey])) {
+ options[timeKey] = ms(time)
+ }
+ })
+ options.pm2 = this._resolveHome(options.pm2)
+ // init logger.
+ Log(options.log)
+
+ // load PM2 config.
+ var pm2ConfPath = path.join(options.pm2, 'conf.js')
+ var fbMsg = ''
+ try {
+ options.pm2Conf = require(pm2ConfPath)(options.pm2)
+ if (!options.pm2Conf) {
+ throw new Error(404)
+ }
+ } catch (err) {
+ fbMsg = 'Can not load PM2 config, the file "' + pm2ConfPath + '" does not exist or empty, fallback to auto-load by pm2 home. '
+ console.warn(fbMsg)
+ options.pm2Conf = {
+ DAEMON_RPC_PORT: path.resolve(options.pm2, 'rpc.sock'),
+ DAEMON_PUB_PORT: path.resolve(options.pm2, 'pub.sock')
+ }
+ }
+
+ Monitor.PM2_DAEMON_PROPS.forEach(function (prop) {
+ var val = options.pm2Conf[prop]
+ if (!val || !fs.existsSync(val)) {
+ throw new Error(fbMsg + 'Unfortunately ' + (val || prop) + ' can not found, please makesure that your pm2 is running and the home path is correct.')
+ }
+ })
+
+ // Bind socket.io server to context.
+ if (options.sockio) {
+ this._cache.sockio = options.sockio
+ delete options.sockio
+ }
+ // Bind to context.
+ this.options = options
+ Object.freeze(this.options)
+
+ this._cache = {
+ noClient: true,
+ usages: {},
+ logSubscribers: {},
+ pm2Subscriber: null,
+ sockio: null,
+ processes: null,
+ sysStat: null,
+ throttle: null,
+ awake: false
+ }
+}
+
+/**
+ * Emit error to clients.
+ * @param {Error} err
+ * @param {Object} options
+ * @return {N/A}
+ */
+Monitor.prototype._emitError = function (err, options) {
+ var data = _.extend({
+ error: options.keepANSI ? chalk.red(err.message) : 'Error: ' + err.message + ''
+ }, _.omit(options, ignoredErrorKeys))
+ this._broadcast(conf.SOCKET_EVENTS.ERROR, data, conf.NSP[options.namespace])
+}
+
+/**
+ * Connection event of `sys` namespace.
+ * @param {Socket} socket
+ * @private
+ */
+Monitor.prototype._connectSysSocket = function (socket) {
+ var self = this
+
+ self._cache.noClient = false
+ console.info('Connected to ' + socket.nsp.name + '!')
+
+ socket.on(conf.SOCKET_EVENTS.DISCONNECT, disconnect)
+ socket.on(conf.SOCKET_EVENTS.PULL_ACTION, actions)
+
+ // pm2 version
+ self._pm2Ver(socket)
+ // prefetch system status
+ this._cache.sysStat && this._broadcast(conf.SOCKET_EVENTS.DATA_SYSTEM_STATS, this._cache.sysStat, conf.NSP.SYS)
+
+ // Grep system states once and again.
+ if (this._status !== 'R') {
+ this._nextTick(this.options.refresh || 5000)
+ }
+
+ function disconnect () {
+ // Check connecting client.
+ self._cache.noClient = self._cache.sockio.of(conf.NSP.SYS).sockets.length === 0
+ }
+
+ function actions (action, id) {
+ var prefix = '[pm2:' + id + ']'
+ console.debug(prefix, action, 'sending to pm2 daemon...')
+ if (self.options.readonly) {
+ console.warn(prefix, 'denied, readonly!!!')
+ return socket.emit(conf.SOCKET_EVENTS.DATA_ACTION, {
+ id: id,
+ error: 'Fatal to execute the <' + action + '> operation due to denied by server, it is readonly!'
+ })
+ }
+ pm.action({
+ socketPath: self.options.pm2Conf.DAEMON_RPC_PORT,
+ action: action,
+ id: id
+ }, function (err, forceRefresh) {
+ if (err) {
+ console.error(action, err.message)
+ return socket.emit(conf.SOCKET_EVENTS.DATA_ACTION, {
+ id: id,
+ error: 'Fatal to execute the <' + action + '> operation due to ' + err.message
+ })
+ }
+ console.debug(prefix, action, 'completed(' + (forceRefresh ? 'force refresh' : 'holding') + ')')
+ forceRefresh && self._throttleRefresh()
+ })
+ }
+}
+
+/**
+ * Connection event of `log` namespace.
+ * @param {socket.io} socket
+ * @private
+ */
+Monitor.prototype._connectLogSocket = function (socket) {
+ var self = this
+
+ socket.on(conf.SOCKET_EVENTS.DISCONNECT, self._closeLogSubscribers.bind(self))
+ socket.on(conf.SOCKET_EVENTS.PULL_LOGS_END, self._closeLogSubscribers.bind(self))
+ socket.on(conf.SOCKET_EVENTS.PULL_LOGS, subscribeLog)
+ console.info('Connected to ' + socket.nsp.name + '!')
+
+ function subscribeLog (pmId, keepANSI) {
+ console.info('[pm2:' + pmId + ']', 'subscribing...')
+ socket._pm_id = pmId
+ socket._ansi = !!keepANSI
+
+ if (self._cache.logSubscribers[pmId]) {
+ console.warn('[pm2:' + pmId + ']', 'subscribed!!!')
+ return
+ }
+ self._cache.logSubscribers[pmId] = socket
+ self._logSubscriberChanged()
+ socket.emit(conf.SOCKET_EVENTS.DATA, {
+ id: pmId,
+ text: '[' + (new Date()).toLocaleString() + '] waiting for logs...'
+ })
+ console.info('[pm2:' + pmId + ']', 'subscribed!!!')
+ }
+}
+
+/**
+ * Connection event of `proc` namespace.
+ * @param {socket.io} socket
+ * @private
+ */
+Monitor.prototype._connectProcessSocket = function (socket) {
+ var self = this
+
+ socket.on(conf.SOCKET_EVENTS.DISCONNECT, stopMonitorUsage)
+ socket.on(conf.SOCKET_EVENTS.PULL_USAGE, monitorUsage)
+ socket.on(conf.SOCKET_EVENTS.PULL_PROCESSES, sendProcs)
+ console.info('Connected to ' + socket.nsp.name + '!')
+
+ // send prefetch processes to client.
+ sendProcs()
+
+ function sendProcs () {
+ self._cache.processes && socket.emit(conf.SOCKET_EVENTS.DATA_PROCESSES, self._cache.processes, conf.NSP.PROCESS)
+ }
+
+ function monitorUsage (pid) {
+ socket._pid = pid
+
+ var pidStr = String(pid)
+ if (self._cache.usages[pidStr]) {
+ console.debug('[pid:' + pidStr + ']', 'observed!!!')
+ return
+ }
+
+ console.debug('[pid:' + pidStr + ']', 'cpu and memory observer is running...')
+
+ function runTimer () {
+ pidusage.stat(pid, function (err, stat) {
+ if (err) {
+ clearInterval(self._cache.usages[pidStr])
+ delete self._cache.usages[pidStr]
+ return self._emitError(err, {
+ id: pid,
+ namespace: conf.NSP.PROCESS
+ })
+ }
+ stat.memory = stat.memory * 100 / totalmem
+
+ var data = {
+ pid: pid,
+ time: Date.now(),
+ usage: stat
+ }
+ self._broadcast.call(self, conf.SOCKET_EVENTS.DATA_USAGE, data, conf.NSP.PROCESS) // eslint-disable-line no-useless-call
+ })
+ }
+
+ self._cache.usages[pidStr] = setInterval(runTimer, self.options.process_refresh)
+ runTimer()
+ }
+
+ function stopMonitorUsage () {
+ var socks = self._cache.sockio.of(conf.NSP.PROCESS).sockets
+ var canNotBeDeleted = {}
+
+ // delete usage observer in a safe & heavy way.
+ if (Array.isArray(socks) && socks.length > 0) {
+ socks.forEach(function (sock) {
+ if (sock._pid) {
+ canNotBeDeleted[sock._pid.toString()] = 1
+ }
+ })
+ }
+
+ for (var pid in self._cache.usages) {
+ var timer
+ if (!canNotBeDeleted[pid] && (timer = self._cache.usages[pid])) {
+ clearInterval(timer)
+ delete self._cache.usages[pid]
+ console.debug('[pid:' + pid + ']', 'cpu and memory observer has been destroyed!')
+ }
+ }
+ }
+}
+
+/**
+ * Grep system state loop
+ * @param {Number} tick
+ * @private
+ */
+Monitor.prototype._nextTick = function (tick, continuously) {
+ // Return it if worker is running.
+ if (this._status === 'R' && !continuously) {
+ return
+ }
+ // Running
+ this._status = 'R'
+ console.debug('monitor heartbeat per', tick + 'ms')
+ // Grep system state
+ this._systemStat(function () {
+ // If there still has any client, grep again after `tick` ms.
+ if (!this._cache.noClient) {
+ return setTimeout(function (that) {
+ that._nextTick(tick, true)
+ }, tick, this)
+ }
+ // Stop
+ delete this._status
+ console.debug('monitor heartbeat destroyed!')
+ })
+}
+
+/**
+ * Grep system states.
+ * @param {Function} fn
+ * @private
+ */
+Monitor.prototype._systemStat = function (fn) {
+ stat.cpuUsage(function (err, cpuUsage) {
+ if (err) {
+ // Log only.
+ console.error('Can not load system/cpu/memory informations: ', err.message)
+ } else {
+ // System states.
+ this._cache.sysStat = _.defaults(_(stat).pick(allowedSysStatsKeys).clone(), {
+ cpu: cpuUsage
+ })
+ this._broadcast(conf.SOCKET_EVENTS.DATA_SYSTEM_STATS, this._cache.sysStat, conf.NSP.SYS)
+ }
+ fn.call(this)
+ }, this)
+}
+
+/**
+ * Observe PM2
+ * @private
+ */
+Monitor.prototype._observePM2 = function () {
+ var pm2Daemon = this.options.pm2Conf.DAEMON_PUB_PORT
+ console.info('Connecting to pm2 daemon:', pm2Daemon)
+
+ this._cache.pm2Subscriber = pm.sub({
+ socketPath: pm2Daemon,
+ context: this
+ }, function (data) {
+ this._cache.awake = true
+ console.info(chalk.magenta(data.event), data.process.name + '-' + data.process.pm_id)
+ this._throttleRefresh()
+ })
+ // awake from log
+ this._logSubscriberChanged()
+ // Enforce a refresh operation if RPC is not online.
+ this._throttleRefresh()
+}
+
+/**
+ * Throttle the refresh behavior to avoid refresh bomb
+ * @private
+ */
+Monitor.prototype._throttleRefresh = function () {
+ if (this._cache.throttle) {
+ clearTimeout(this._cache.throttle)
+ }
+ this._cache.throttle = setTimeout(function (that) {
+ that._cache.throttle = null
+ that._refreshProcs()
+ }, 500, this)
+}
+
+/**
+ * Refresh processes
+ * @private
+ */
+Monitor.prototype._refreshProcs = function () {
+ pm.list({
+ socketPath: this.options.pm2Conf.DAEMON_RPC_PORT,
+ context: this
+ }, function (err, procs) {
+ if (err) {
+ err = new Error('Fatal to connect to pm2 daemon due to ' + err.message)
+ return this._emitError(err, {
+ namespace: conf.NSP.PROCESS
+ })
+ }
+ // Wrap processes and cache them.
+ this._cache.processes = procs.map(function (proc) {
+ proc.pm2_env = proc.pm2_env || {
+ USER: 'UNKNOWN'
+ }
+ var pm2Env = {
+ user: proc.pm2_env.USER
+ }
+
+ for (var key in proc.pm2_env) {
+ // Ignore useless fields.
+ if (/^(_|axm_)+/.test(key) || /versioning|command/i.test(key) || key.charCodeAt(0) <= 90) {
+ continue
+ }
+ pm2Env[key] = proc.pm2_env[key]
+ }
+ proc.pm2_env = pm2Env
+ return proc
+ })
+ // Emit to client.
+ this._broadcast(conf.SOCKET_EVENTS.DATA_PROCESSES, this._cache.processes, conf.NSP.PROCESS)
+ })
+}
+
+/**
+ * Get PM2 version and return it to client.
+ * @private
+ */
+Monitor.prototype._pm2Ver = function (socket) {
+ var pm2RPC = this.options.pm2Conf.DAEMON_RPC_PORT
+ console.info('Fetching pm2 version:', pm2RPC)
+ pm.version(pm2RPC, function (err, version) {
+ socket.emit(conf.SOCKET_EVENTS.DATA_PM2_VERSION, (err || !version) ? '0.0.0' : version, conf.NSP.SYS)
+ })
+}
+
+/**
+ * Broadcast to all connected clients.
+ * @param {String} event
+ * @param {Object} data
+ * @param {String} nsp
+ * @private
+ */
+Monitor.prototype._broadcast = function (event, data, nsp) {
+ if (this._cache.noClient) {
+ return console.debug('No client is connecting, ignore broadcasting', event, 'to', nsp)
+ }
+
+ console.debug('Broadcasting', event, 'to', nsp)
+ this._cache.sockio.of(nsp).emit(event, data)
+}
+
+/**
+ * Destroy tails.
+ * @param {Number} pm_id
+ * @return {[type]}
+ */
+Monitor.prototype._closeLogSubscribers = function (pmId) {
+ var self = this
+ // close as specific log subscriber
+ if (!isNaN(pmId)) {
+ self._logSubscriberChanged()
+ return unsubscribe(String(pmId))
+ }
+ if (_.keys(self._cache.logSubscribers).length === 0) {
+ return
+ }
+
+ // unbsusbribe all in a safe & heavy way.
+ var socks = self._cache.sockio.of(conf.NSP.LOG).sockets
+ var canNotBeDeleted = {}
+ if (socks && socks.length > 0) {
+ socks.forEach(function (sock) {
+ canNotBeDeleted[String(sock._pm_id)] = 1
+ })
+ }
+
+ var changed = false
+ for (var subId in self._cache.logSubscribers) {
+ subId = String(subId)
+ if (!canNotBeDeleted[subId]) {
+ changed = true
+ unsubscribe(subId)
+ }
+ }
+ if (changed) {
+ self._logSubscriberChanged()
+ }
+
+ function unsubscribe (id) {
+ console.info('[pm2:' + id + ']', 'unsubscribed!!!')
+ delete self._cache.logSubscribers[id]
+ }
+}
+
+/**
+ * Toggle listening on log:* events
+ * @return {N/A}
+ */
+Monitor.prototype._logSubscriberChanged = function () {
+ if (!this._cache.pm2Subscriber) {
+ console.warn('PM2 subscriber does not exist')
+ return
+ }
+ if (!this._cache.awake || _.keys(this._cache.logSubscribers).length > 0) {
+ var self = this
+ var sock
+ if (self._cache.pm2SubscriberIsLogging) {
+ console.warn('[pm2:log] activated')
+ return
+ }
+ self._cache.pm2SubscriberIsLogging = true
+ self._cache.pm2Subscriber.on('log:*', function (e, d) {
+ var pmId = d.process.pm_id
+ if (!self._cache.awake) {
+ self._cache.awake = true
+ console.info(chalk.magenta('awake:1st'), d.process.name + '-' + pmId)
+ self._throttleRefresh()
+ if (_.keys(self._cache.logSubscribers).length === 0) {
+ self._cache.pm2SubscriberIsLogging = false
+ self._cache.pm2Subscriber.off('log:*')
+ console.info('[pm2:log]', chalk.red('deactivate'), '\'cause no subscriber')
+ }
+ } else if ((sock = self._cache.logSubscribers[pmId])) {
+ var text = d.data
+ if (text) {
+ console.info('[pm2:' + pmId + '] sent log')
+ text = text.replace(/[\r\n\t]+$/, '')
+ sock.emit(conf.SOCKET_EVENTS.DATA, {
+ id: pmId,
+ text: '[' + e + ']' + (sock._ansi ? text : ansiHTML(text))
+ })
+ }
+ }
+ })
+ console.info('[pm2:log]', chalk.green('Activate'))
+ } else if (this._cache.pm2SubscriberIsLogging) {
+ this._cache.pm2SubscriberIsLogging = false
+ this._cache.pm2Subscriber.off('log:*')
+ console.info('[pm2:log]', chalk.red('deactivate'))
+ } else {
+ console.warn('[pm2:log]', 'deactivated')
+ }
+}
+
+/**
+ * Listening all the nsp.
+ */
+Monitor.prototype._listeningSocketIO = function () {
+ if (!this._cache.sockio || this._cache.sockio._listening) {
+ console.warn('Avoid duplicated listening!')
+ return
+ }
+
+ this._cache.sockio._listening = true
+ for (var nsp in conf.NSP) {
+ var fnName = '_connect' + (nsp[0] + nsp.slice(1).toLowerCase()) + 'Socket'
+ console.info('Listening connection event on', nsp.toLowerCase(), 'by func:' + fnName)
+ this._cache.sockio.of(conf.NSP[nsp]).on(conf.SOCKET_EVENTS.CONNECTION, this[fnName].bind(this))
+ }
+
+ var auth
+ if (!(this.options.agent && (auth = this.options.agent.authorization))) {
+ console.debug('* No passwd *')
+ return
+ }
+ console.debug('* socket.io handshake *')
+ this._cache.sockio.use(function (socket, next) {
+ if (auth !== socket.handshake.query.auth) {
+ return next(new Error('unauthorized'))
+ }
+ next()
+ })
+}
diff --git a/lib/pm.js b/lib/pm.js
index ce65bd0..85ac913 100644
--- a/lib/pm.js
+++ b/lib/pm.js
@@ -1,4 +1,5 @@
-var spawn = require('child_process').spawn
+'use strict'
+
var fs = require('fs')
var _ = require('lodash')
var async = require('async')
@@ -6,77 +7,89 @@ var rpc = require('pm2-axon-rpc')
var axon = require('pm2-axon')
/**
- * Forever lib.
- * @type {{}}
+ * Process management lib.
+ * @type {Object}
*/
var pm = module.exports = {}
-var re_blank = /^[\s\r\t]*$/
var allowedEvents = ['start', 'restart', 'exit', 'online']
/**
- * Subscribe event BUS.
- * @param {String} sockPath
- * @param {Function} cb
- * @param {Object} context
+ * Make options more safe.
+ * @param {Object} options
+ * @param {Mixed} context
+ * @return {Object}
*/
-pm.sub = function (sockPath, cb, context) {
+function safeOptions (options, context) {
+ if (_.isString(options)) {
+ options = {
+ socketPath: options
+ }
+ }
+ if (!options.context && context) {
+ options.context = context
+ }
+ return options
+}
+/**
+ * Subscribe event BUS.
+ * @param {Object} options:
+ * {String} socketPath
+ * {Object} context
+ * @param {Function} fn
+ */
+pm.sub = function (options, fn) {
var sub = axon.socket('sub-emitter')
- // Once awake from sleeping.
- sub.on('log:*', function (e, d) {
- // Do not subscribe it.
- sub.off('log:*')
- d.event = 'awake'
- cb.call(context, d)
- })
-
+ options = safeOptions(options, this)
// Process events.
sub.on('process:*', function (e, d) {
if (d && !!~allowedEvents.indexOf(d.event)) {
- cb.call(context, d)
+ fn.call(options.context, d)
}
})
- sub.connect(sockPath)
+ sub.connect(options.socketPath)
return sub
}
/**
* Get PM2 version.
- * @param {String} sockPath
- * @param {Function} cb
+ * @param {String} socketPath
+ * @param {Function} fn
*/
-pm.version = function (sockPath, cb) {
+pm.version = function (socketPath, fn) {
pm._rpc({
- sockPath: sockPath,
+ socketPath: socketPath,
events: [
- ['getVersion', {}, cb]
+ ['getVersion', {}, fn]
]
})
}
/**
* List available processes.
- * @param {String} sockPath
- * @param {Function} cb
- * @param {Object} context
+ * @param {Object} options:
+ * {String} socketPath
+ * {Object} context
+ * @param {Function} fn
*/
-pm.list = function (sockPath, cb, context) {
- if (!fs.existsSync(sockPath)) {
- return cb.call(context, [])
+pm.list = function (options, fn) {
+ options = safeOptions(options, this)
+ if (!fs.existsSync(options.socketPath)) {
+ return fn.call(options.context, [])
}
pm._rpc({
- sockPath: sockPath,
+ socketPath: options.socketPath,
events: [
- ['getMonitorData', {}, cb]
+ ['getMonitorData', {}, fn]
],
- context: context || this
+ context: options.context
})
}
/**
* Execute remote RPC events.
* @param {Object} opts including:
- * {String} sockPath
+ * {String} socketPath
* {Object} context
* {Object} args
* {Object} events
@@ -84,25 +97,27 @@ pm.list = function (sockPath, cb, context) {
* value: callback function
* @private
*/
-pm._rpc = function (opts) {
+pm._rpc = function (options) {
+ options = safeOptions(options)
var req = axon.socket('req')
- var rpcSock = req.connect(opts.sockPath)
+ var rpcSock = req.connect(options.socketPath)
var rpcClient = new rpc.Client(req)
// Connect RPC server.
rpcSock.on('connect', function () {
// Execute request.
- var waterfalls = opts.events.map(function (event) {
+ var waterfalls = options.events.map(function (event) {
return function (next) {
- var cb = typeof event[event.length - 1] === 'function' ? event.pop() : null
+ var cb = _.isFunction(event[event.length - 1]) ? event.pop() : null
if (cb) {
event.push(function () {
// Wrap arguments, no [].slice (avoid leak)!!!
- var args = new Array(arguments.length)
- for (var i = 0; i < args; i++) {
+ var argsLen = arguments.length
+ var args = new Array(argsLen)
+ for (var i = 0; i < argsLen; i++) {
args[i] = arguments[i]
}
- cb.apply(opts.context, arguments)
+ cb.apply(options.context, arguments)
next()
})
}
@@ -116,92 +131,123 @@ pm._rpc = function (opts) {
rpcSock.close()
})
})
+ // ignore error
+ rpcSock.on('error', function (err) {
+ console.error('rpc error:', err.message)
+ try {
+ rpcSock.close()
+ } catch (err) {}
+ })
}
/**
* Find process by pm_id.
- * @param {String} sockPath
- * @param {String} id
- * @param {Function} cb
+ * @param {Object} options:
+ * {String} socketPath
+ * {String} id
+ * @param {Function} fn
* @private
*/
-pm._findById = function (sockPath, id, cb) {
- pm.list(sockPath, function (err, procs) {
+pm._findById = function (options, fn) {
+ options = safeOptions(options)
+ pm.list(options, function (err, procs) {
if (err) {
- return cb(err)
+ return fn(err)
}
if (!procs || procs.length === 0) {
- return cb(new Error('No PM2 process running, the sockPath is "' + sockPath + '", please make sure it is existing!'))
+ return fn(new Error('No PM2 process running, the socketPath is "' + options.socketPath + '", please make sure it is existing!'))
}
var proc = _.find(procs, function (p) {
- return p && p.pm_id === id
+ return p && p.pm_id === options.id
})
if (!proc) {
- return cb(new Error('Cannot find pm process by pm_id: ' + id))
+ return fn(new Error('Cannot find pm process by pm_id: ' + options.id))
}
- cb(null, proc)
- }, true)
+ fn(null, proc)
+ })
}
/**
* Trigger actions of process by pm_id.
- * @param {String} sockPath
- * @param {String} id
- * @param {Function} cb
+ * @param {Object} options:
+ * {String} socketPath
+ * {String} action
+ * {String} id
+ * @param {Function} fn
*/
-pm.action = function (sockPath, action, id, cb) {
- if (id === 'all') {
- pm.list(sockPath, function (err, procs) {
+pm.action = function (options, fn) {
+ options = safeOptions(options)
+ if (options.id === 'all') {
+ return pm.list(options, function (err, procs) {
if (err) {
- return cb(err)
- }
- if (!procs || procs.length === 0) {
- return cb(new Error('No PM2 process running, the sockPath is "' + sockPath + '", please make sure it is existing!'))
+ return fn(err)
}
- async.map(procs, function (proc, next) {
- pm._actionByPMId(sockPath, proc, action, next.bind(null, null))
- }, cb)
- })
- } else {
- pm._findById(sockPath, id, function (err, proc) {
- if (err) {
- return cb(err)
+ if (!procs || procs.length === 0) {
+ return fn(new Error('No PM2 process is running!'))
}
- pm._actionByPMId(sockPath, proc, action, cb)
+
+ // Do the jobs without catching errors.
+ async.map(procs, function (proc, next) {
+ pm._actionByPMId({
+ socketPath: options.socketPath,
+ process: proc,
+ action: options.action
+ }, next.bind(null, null))
+ }, fn)
})
}
+ pm._findById(options, function (err, proc) {
+ if (err) {
+ return fn(err)
+ }
+ pm._actionByPMId({
+ socketPath: options.socketPath,
+ process: proc,
+ action: options.action
+ }, fn)
+ })
}
/**
* Trigger actions of process by pm_id.
- * @param {String} sockPath
- * @param {Object} proc
- * @param {String} action
- * @param {Function} cb
+ * @param {Object} options:
+ * {String} socketPath
+ * {Object} process
+ * {String} action
+ * @param {Function} fn
* @private
*/
-pm._actionByPMId = function (sockPath, proc, action, cb) {
- var noBusEvent = action === 'delete' && proc.pm2_env.status !== 'online'
- var pmId = proc.pm_id
-
+pm._actionByPMId = function (options, fn) {
+ var noBusEvent = action === 'delete' && options.process.pm2_env.status !== 'online'
+ var pmId = options.process.pm_id
+ var action = options.action
+ //
+ // event keys:
+ // restartProcessId
+ // deleteProcessId
+ // stopProcessId
+ // saveProcessId
+ // stopWatch
+ // restartWatch
+ //
action += 'ProcessId'
+ // watch event
var watchEvent = ['stopWatch', action, {
id: pmId
}, function () {}]
-
if (!!~['restart'].indexOf(action)) { // eslint-disable-line no-extra-boolean-cast
watchEvent.splice(0, 1, 'restartWatch')
watchEvent.pop()
}
-
+ // wrap action event
var actionEvent = [action, pmId, function (err, sock) {
- cb(err, noBusEvent)
+ fn(err, noBusEvent)
}]
-
+ console.debug('[pm:' + pmId + ']', action)
if (action === 'restartProcessId') {
actionEvent.splice(1, 1, {
id: pmId
@@ -209,94 +255,10 @@ pm._actionByPMId = function (sockPath, proc, action, cb) {
}
pm._rpc({
- sockPath: sockPath,
+ socketPath: options.socketPath,
events: [
watchEvent,
actionEvent
]
})
}
-
-/**
- * Tail logs.
- * @param {Object} opts
- * @param {Function} each Iterator
- * @param {Function} cb
- * @returns {*}
- */
-pm.tail = function (opts, each, cb) {
- // Fetch the proccess that we need.
- pm._findById(opts.sockPath, opts.pm_id, function (err, proc) {
- if (err) {
- return cb(err)
- }
- proc.pm2_log = opts.logPath
- // Tail logs.
- cb(null, pm._tailLogs(proc, each))
- })
-}
-/**
- * Use linux `tail` command to grep logs.
- * @param {Object} proc
- * @param {Function} cb
- * @returns {*}
- * @private
- */
-pm._tailLogs = function (proc, cb) {
- var logs = {
- 'pm2': proc.pm2_log
- }
- if (proc.pm_log_path) {
- logs.entire = proc.pm2_env.pm_log_path
- } else {
- if (proc.pm2_env.pm_out_log_path) {
- logs.out = proc.pm2_env.pm_out_log_path
- }
- if (proc.pm2_env.pm_err_log_path) {
- logs.err = proc.pm2_env.pm_err_log_path
- }
- }
-
- var logFiles = []
- for (var key in logs) {
- var file = logs[key]
- if (fs.existsSync(file)) {
- logFiles.push(file)
- }
- }
- if (logFiles.length === 0) {
- return null
- }
- var tail = spawn('tail', ['-n', 20, '-f'].concat(logFiles), {
- killSignal: 'SIGTERM',
- detached: true,
- stdio: ['ignore', 'pipe', 'pipe']
- })
-
- // Use utf8 encoding.
- tail.stdio.forEach(function (stdio) {
- stdio && stdio.setEncoding('utf8')
- })
-
- // stdout.
- tail.stdout.on('data', function (data) {
- var lines = []
- data.split(/\n/).forEach(function (line) {
- if (!re_blank.test(line)) {
- lines.push(line)
- }
- })
- if (lines.length > 0) {
- cb(null, lines)
- }
- })
-
- // handle error.
- tail.stderr.on('data', function (data) {
- console.error(data.toString())
- tail.disconnect()
- cb(new Error(data.toString().replace(/\n/, '')))
- })
- tail.unref()
- return tail
-}
diff --git a/lib/stat.js b/lib/stat.js
index cecc446..8de3c4c 100644
--- a/lib/stat.js
+++ b/lib/stat.js
@@ -1,3 +1,5 @@
+'use strict'
+
var os = require('os')
/**
@@ -54,6 +56,7 @@ var stat = module.exports = {
* @param context
*/
stat.cpuUsage = function (fn, context) {
+ // check 1s changes
setTimeout(function (ctx, stat1) {
var stat2 = ctx.cpuInfo()
var perc = 100 * (1 - (stat2.idle - stat1.idle) / (stat2.total - stat1.total))
diff --git a/lib/util/conf.js b/lib/util/conf.js
index 9625f67..d93ae88 100644
--- a/lib/util/conf.js
+++ b/lib/util/conf.js
@@ -1,19 +1,45 @@
+'use strict'
+
var fs = require('fs')
var _ = require('lodash')
-var re_comment = /^\s*;/
-var re_setion = /^\s*\[([^\]]+)\]\s*$/
-var re_kv = /^([^=]+)=(.*)$/
-var re_boolean = /^(true|false)$/i
+var regComment = /^\s*;/
+var regSetion = /^\s*\[([^\]]+)\]\s*$/
+var regKV = /^([^=]+)=(.*)$/
+var regBoolean = /^(true|false|yes|no|y|n)$/i
+var regBooleanTrue = /^(true|yes|y)$/i
/**
* Namespaces of socket.io
* @type {{SYS: string, LOG: string, PROC: string}}
*/
exports.NSP = {
- SYS: '/sys',
+ SYS: '/system',
LOG: '/log',
- PROC: '/proc'
+ PROCESS: '/proccess'
+}
+
+/**
+ * All events of socket.io
+ * @type {Object}
+ */
+exports.SOCKET_EVENTS = {
+ ERROR: 'error',
+ CONNECTION: 'connection',
+ CONNECT: 'connect',
+ CONNECT_ERROR: 'connect_error',
+ DISCONNECT: 'disconnect',
+ DATA: 'data',
+ DATA_ACTION: 'data.action',
+ DATA_USAGE: 'data.usage',
+ DATA_PROCESSES: 'data.processes',
+ DATA_SYSTEM_STATS: 'data.sysstat',
+ DATA_PM2_VERSION: 'data.pm2version',
+ PULL_LOGS: 'pull.log',
+ PULL_USAGE: 'pull.usage',
+ PULL_LOGS_END: 'pull.log_end',
+ PULL_PROCESSES: 'pull.processes',
+ PULL_ACTION: 'pull.action'
}
/**
@@ -32,7 +58,7 @@ function File (options) {
return new File(options)
}
- if (typeof options === 'string') {
+ if (_.isString(options)) {
options = {
file: options
}
@@ -65,25 +91,25 @@ File.prototype.loadSync = function () {
return
}
// Remove comments.
- if (re_comment.test(line)) {
+ if (regComment.test(line)) {
return
}
var ms
// Sections.
- if ((ms = line.match(re_setion)) && ms.length === 2) {
+ if ((ms = line.match(regSetion)) && ms.length === 2) {
json[sec = ms[1].trim()] = {}
return
}
// Key-value pairs.
- if ((ms = line.match(re_kv)) && ms.length === 3) {
+ if ((ms = line.match(regKV)) && ms.length === 3) {
var key = ms[1].trim()
var value = ms[2].trim()
// Parse boolean and number.
if (!isNaN(value)) {
value = parseFloat(value)
- } else if (re_boolean.test(value)) {
- value = value.toLowerCase() === 'true'
+ } else if (regBoolean.test(value)) {
+ value = regBooleanTrue.test(value)
}
if (sec) {
json[sec][key] = value
@@ -102,21 +128,18 @@ File.prototype.loadSync = function () {
* @returns {File}
*/
File.prototype.saveSync = function () {
- function wrapValue (key, value) {
- return key + ' = ' + (typeof value === 'string' ? value : JSON.stringify(value)) + '\n'
- }
var ini = ''
for (var key in this._data) {
var value = this._data[key]
// TODO: Array type.
- if (typeof value === 'object') {
+ if (_.isObject(value)) {
ini += '[ ' + key + ' ]\n'
for (var subKey in value) {
- ini += wrapValue(subKey, value[subKey])
+ ini += _wrapValue(subKey, value[subKey])
}
ini += '\n'
}
- ini += wrapValue(key, value)
+ ini += _wrapValue(key, value)
}
fs.writeFileSync(this.options.file, ini)
return this
@@ -144,14 +167,15 @@ File.prototype.val = function (key, value, def) {
// Load config from File.
this.loadSync()
- if (typeof value === 'undefined') {
+ if (_.isUndefined(value)) {
// Get config.
return this._data[key]
- } else if (value == null) {
+ }
+ if (value == null) {
// Clear config.
delete this._data[key]
// Reset to default if necessary.
- if (typeof def !== 'undefined') {
+ if (!_.isUndefined(def)) {
this._data[key] = def
}
return this.saveSync()
@@ -163,3 +187,7 @@ File.prototype.val = function (key, value, def) {
this.saveSync()
return this
}
+
+function _wrapValue (key, value) {
+ return key + ' = ' + (_.isString(value) ? value : JSON.stringify(value)) + '\n'
+}
diff --git a/lib/util/log.js b/lib/util/log.js
index c26ef43..d77fb4d 100644
--- a/lib/util/log.js
+++ b/lib/util/log.js
@@ -1,9 +1,18 @@
+'use strict'
+
var chalk = require('chalk')
+var hacks = ['debug', 'log', 'info', 'warn', 'error']
+var colors = ['grey', '', 'green', 'yellow', 'red']
+
+/**
+ * Hacked stdout
+ * @param {Object} options
+ * @return {N/A}
+ */
module.exports = function (options) {
options = options || {}
var lev = options.level
- var hacks = ['debug', 'log', 'info', 'warn', 'error']
if ((typeof lev === 'string' && typeof (lev = hacks.indexOf(lev)) === 'undefined') || (isFinite(lev) && (lev < 0 || lev > hacks.length))) {
options.level = 0
@@ -16,7 +25,6 @@ module.exports = function (options) {
return
}
- var colors = ['grey', '', 'green', 'yellow', 'red']
var consoled = {}
hacks.forEach(function (method) {