refactor: NSPs+

This commit is contained in:
Tjatse 2016-12-08 17:05:14 +08:00
parent 5c1c18d479
commit 7ee75faef1
5 changed files with 776 additions and 689 deletions

File diff suppressed because it is too large Load Diff

298
lib/pm.js
View File

@ -1,4 +1,5 @@
var spawn = require('child_process').spawn
'use strict'
var fs = require('fs')
var _ = require('lodash')
var async = require('async')
@ -6,77 +7,89 @@ var rpc = require('pm2-axon-rpc')
var axon = require('pm2-axon')
/**
* Forever lib.
* @type {{}}
* Process management lib.
* @type {Object}
*/
var pm = module.exports = {}
var re_blank = /^[\s\r\t]*$/
var allowedEvents = ['start', 'restart', 'exit', 'online']
/**
* Subscribe event BUS.
* @param {String} sockPath
* @param {Function} cb
* @param {Object} context
* Make options more safe.
* @param {Object} options
* @param {Mixed} context
* @return {Object}
*/
pm.sub = function (sockPath, cb, context) {
function safeOptions (options, context) {
if (_.isString(options)) {
options = {
socketPath: options
}
}
if (!options.context && context) {
options.context = context
}
return options
}
/**
* Subscribe event BUS.
* @param {Object} options:
* {String} socketPath
* {Object} context
* @param {Function} fn
*/
pm.sub = function (options, fn) {
var sub = axon.socket('sub-emitter')
// Once awake from sleeping.
sub.on('log:*', function (e, d) {
// Do not subscribe it.
sub.off('log:*')
d.event = 'awake'
cb.call(context, d)
})
options = safeOptions(options, this)
// Process events.
sub.on('process:*', function (e, d) {
if (d && !!~allowedEvents.indexOf(d.event)) {
cb.call(context, d)
fn.call(options.context, d)
}
})
sub.connect(sockPath)
sub.connect(options.socketPath)
return sub
}
/**
* Get PM2 version.
* @param {String} sockPath
* @param {Function} cb
* @param {String} socketPath
* @param {Function} fn
*/
pm.version = function (sockPath, cb) {
pm.version = function (socketPath, fn) {
pm._rpc({
sockPath: sockPath,
socketPath: socketPath,
events: [
['getVersion', {}, cb]
['getVersion', {}, fn]
]
})
}
/**
* List available processes.
* @param {String} sockPath
* @param {Function} cb
* @param {Object} context
* @param {Object} options:
* {String} socketPath
* {Object} context
* @param {Function} fn
*/
pm.list = function (sockPath, cb, context) {
if (!fs.existsSync(sockPath)) {
return cb.call(context, [])
pm.list = function (options, fn) {
options = safeOptions(options, this)
if (!fs.existsSync(options.socketPath)) {
return fn.call(options.context, [])
}
pm._rpc({
sockPath: sockPath,
socketPath: options.socketPath,
events: [
['getMonitorData', {}, cb]
['getMonitorData', {}, fn]
],
context: context || this
context: options.context
})
}
/**
* Execute remote RPC events.
* @param {Object} opts including:
* {String} sockPath
* {String} socketPath
* {Object} context
* {Object} args
* {Object} events
@ -84,25 +97,27 @@ pm.list = function (sockPath, cb, context) {
* value: callback function
* @private
*/
pm._rpc = function (opts) {
pm._rpc = function (options) {
options = safeOptions(options)
var req = axon.socket('req')
var rpcSock = req.connect(opts.sockPath)
var rpcSock = req.connect(options.socketPath)
var rpcClient = new rpc.Client(req)
// Connect RPC server.
rpcSock.on('connect', function () {
// Execute request.
var waterfalls = opts.events.map(function (event) {
var waterfalls = options.events.map(function (event) {
return function (next) {
var cb = typeof event[event.length - 1] === 'function' ? event.pop() : null
var cb = _.isFunction(event[event.length - 1]) ? event.pop() : null
if (cb) {
event.push(function () {
// Wrap arguments, no [].slice (avoid leak)!!!
var args = new Array(arguments.length)
for (var i = 0; i < args; i++) {
var argsLen = arguments.length
var args = new Array(argsLen)
for (var i = 0; i < argsLen; i++) {
args[i] = arguments[i]
}
cb.apply(opts.context, arguments)
cb.apply(options.context, arguments)
next()
})
}
@ -116,92 +131,123 @@ pm._rpc = function (opts) {
rpcSock.close()
})
})
// ignore error
rpcSock.on('error', function (err) {
console.error('rpc error:', err.message)
try {
rpcSock.close()
} catch (err) {}
})
}
/**
* Find process by pm_id.
* @param {String} sockPath
* @param {String} id
* @param {Function} cb
* @param {Object} options:
* {String} socketPath
* {String} id
* @param {Function} fn
* @private
*/
pm._findById = function (sockPath, id, cb) {
pm.list(sockPath, function (err, procs) {
pm._findById = function (options, fn) {
options = safeOptions(options)
pm.list(options, function (err, procs) {
if (err) {
return cb(err)
return fn(err)
}
if (!procs || procs.length === 0) {
return cb(new Error('No PM2 process running, the sockPath is "' + sockPath + '", please make sure it is existing!'))
return fn(new Error('No PM2 process running, the socketPath is "' + options.socketPath + '", please make sure it is existing!'))
}
var proc = _.find(procs, function (p) {
return p && p.pm_id === id
return p && p.pm_id === options.id
})
if (!proc) {
return cb(new Error('Cannot find pm process by pm_id: ' + id))
return fn(new Error('Cannot find pm process by pm_id: ' + options.id))
}
cb(null, proc)
}, true)
fn(null, proc)
})
}
/**
* Trigger actions of process by pm_id.
* @param {String} sockPath
* @param {String} id
* @param {Function} cb
* @param {Object} options:
* {String} socketPath
* {String} action
* {String} id
* @param {Function} fn
*/
pm.action = function (sockPath, action, id, cb) {
if (id === 'all') {
pm.list(sockPath, function (err, procs) {
pm.action = function (options, fn) {
options = safeOptions(options)
if (options.id === 'all') {
return pm.list(options, function (err, procs) {
if (err) {
return cb(err)
}
if (!procs || procs.length === 0) {
return cb(new Error('No PM2 process running, the sockPath is "' + sockPath + '", please make sure it is existing!'))
return fn(err)
}
async.map(procs, function (proc, next) {
pm._actionByPMId(sockPath, proc, action, next.bind(null, null))
}, cb)
})
} else {
pm._findById(sockPath, id, function (err, proc) {
if (err) {
return cb(err)
if (!procs || procs.length === 0) {
return fn(new Error('No PM2 process is running!'))
}
pm._actionByPMId(sockPath, proc, action, cb)
// Do the jobs without catching errors.
async.map(procs, function (proc, next) {
pm._actionByPMId({
socketPath: options.socketPath,
process: proc,
action: options.action
}, next.bind(null, null))
}, fn)
})
}
pm._findById(options, function (err, proc) {
if (err) {
return fn(err)
}
pm._actionByPMId({
socketPath: options.socketPath,
process: proc,
action: options.action
}, fn)
})
}
/**
* Trigger actions of process by pm_id.
* @param {String} sockPath
* @param {Object} proc
* @param {String} action
* @param {Function} cb
* @param {Object} options:
* {String} socketPath
* {Object} process
* {String} action
* @param {Function} fn
* @private
*/
pm._actionByPMId = function (sockPath, proc, action, cb) {
var noBusEvent = action === 'delete' && proc.pm2_env.status !== 'online'
var pmId = proc.pm_id
pm._actionByPMId = function (options, fn) {
var noBusEvent = action === 'delete' && options.process.pm2_env.status !== 'online'
var pmId = options.process.pm_id
var action = options.action
//
// event keys:
// restartProcessId
// deleteProcessId
// stopProcessId
// saveProcessId
// stopWatch
// restartWatch
//
action += 'ProcessId'
// watch event
var watchEvent = ['stopWatch', action, {
id: pmId
}, function () {}]
if (!!~['restart'].indexOf(action)) { // eslint-disable-line no-extra-boolean-cast
watchEvent.splice(0, 1, 'restartWatch')
watchEvent.pop()
}
// wrap action event
var actionEvent = [action, pmId, function (err, sock) {
cb(err, noBusEvent)
fn(err, noBusEvent)
}]
console.debug('[pm:' + pmId + ']', action)
if (action === 'restartProcessId') {
actionEvent.splice(1, 1, {
id: pmId
@ -209,94 +255,10 @@ pm._actionByPMId = function (sockPath, proc, action, cb) {
}
pm._rpc({
sockPath: sockPath,
socketPath: options.socketPath,
events: [
watchEvent,
actionEvent
]
})
}
/**
* Tail logs.
* @param {Object} opts
* @param {Function} each Iterator
* @param {Function} cb
* @returns {*}
*/
pm.tail = function (opts, each, cb) {
// Fetch the proccess that we need.
pm._findById(opts.sockPath, opts.pm_id, function (err, proc) {
if (err) {
return cb(err)
}
proc.pm2_log = opts.logPath
// Tail logs.
cb(null, pm._tailLogs(proc, each))
})
}
/**
* Use linux `tail` command to grep logs.
* @param {Object} proc
* @param {Function} cb
* @returns {*}
* @private
*/
pm._tailLogs = function (proc, cb) {
var logs = {
'pm2': proc.pm2_log
}
if (proc.pm_log_path) {
logs.entire = proc.pm2_env.pm_log_path
} else {
if (proc.pm2_env.pm_out_log_path) {
logs.out = proc.pm2_env.pm_out_log_path
}
if (proc.pm2_env.pm_err_log_path) {
logs.err = proc.pm2_env.pm_err_log_path
}
}
var logFiles = []
for (var key in logs) {
var file = logs[key]
if (fs.existsSync(file)) {
logFiles.push(file)
}
}
if (logFiles.length === 0) {
return null
}
var tail = spawn('tail', ['-n', 20, '-f'].concat(logFiles), {
killSignal: 'SIGTERM',
detached: true,
stdio: ['ignore', 'pipe', 'pipe']
})
// Use utf8 encoding.
tail.stdio.forEach(function (stdio) {
stdio && stdio.setEncoding('utf8')
})
// stdout.
tail.stdout.on('data', function (data) {
var lines = []
data.split(/\n/).forEach(function (line) {
if (!re_blank.test(line)) {
lines.push(line)
}
})
if (lines.length > 0) {
cb(null, lines)
}
})
// handle error.
tail.stderr.on('data', function (data) {
console.error(data.toString())
tail.disconnect()
cb(new Error(data.toString().replace(/\n/, '')))
})
tail.unref()
return tail
}

View File

@ -1,3 +1,5 @@
'use strict'
var os = require('os')
/**
@ -54,6 +56,7 @@ var stat = module.exports = {
* @param context
*/
stat.cpuUsage = function (fn, context) {
// check 1s changes
setTimeout(function (ctx, stat1) {
var stat2 = ctx.cpuInfo()
var perc = 100 * (1 - (stat2.idle - stat1.idle) / (stat2.total - stat1.total))

View File

@ -1,19 +1,45 @@
'use strict'
var fs = require('fs')
var _ = require('lodash')
var re_comment = /^\s*;/
var re_setion = /^\s*\[([^\]]+)\]\s*$/
var re_kv = /^([^=]+)=(.*)$/
var re_boolean = /^(true|false)$/i
var regComment = /^\s*;/
var regSetion = /^\s*\[([^\]]+)\]\s*$/
var regKV = /^([^=]+)=(.*)$/
var regBoolean = /^(true|false|yes|no|y|n)$/i
var regBooleanTrue = /^(true|yes|y)$/i
/**
* Namespaces of socket.io
* @type {{SYS: string, LOG: string, PROC: string}}
*/
exports.NSP = {
SYS: '/sys',
SYS: '/system',
LOG: '/log',
PROC: '/proc'
PROCESS: '/proccess'
}
/**
* All events of socket.io
* @type {Object}
*/
exports.SOCKET_EVENTS = {
ERROR: 'error',
CONNECTION: 'connection',
CONNECT: 'connect',
CONNECT_ERROR: 'connect_error',
DISCONNECT: 'disconnect',
DATA: 'data',
DATA_ACTION: 'data.action',
DATA_USAGE: 'data.usage',
DATA_PROCESSES: 'data.processes',
DATA_SYSTEM_STATS: 'data.sysstat',
DATA_PM2_VERSION: 'data.pm2version',
PULL_LOGS: 'pull.log',
PULL_USAGE: 'pull.usage',
PULL_LOGS_END: 'pull.log_end',
PULL_PROCESSES: 'pull.processes',
PULL_ACTION: 'pull.action'
}
/**
@ -32,7 +58,7 @@ function File (options) {
return new File(options)
}
if (typeof options === 'string') {
if (_.isString(options)) {
options = {
file: options
}
@ -65,25 +91,25 @@ File.prototype.loadSync = function () {
return
}
// Remove comments.
if (re_comment.test(line)) {
if (regComment.test(line)) {
return
}
var ms
// Sections.
if ((ms = line.match(re_setion)) && ms.length === 2) {
if ((ms = line.match(regSetion)) && ms.length === 2) {
json[sec = ms[1].trim()] = {}
return
}
// Key-value pairs.
if ((ms = line.match(re_kv)) && ms.length === 3) {
if ((ms = line.match(regKV)) && ms.length === 3) {
var key = ms[1].trim()
var value = ms[2].trim()
// Parse boolean and number.
if (!isNaN(value)) {
value = parseFloat(value)
} else if (re_boolean.test(value)) {
value = value.toLowerCase() === 'true'
} else if (regBoolean.test(value)) {
value = regBooleanTrue.test(value)
}
if (sec) {
json[sec][key] = value
@ -102,21 +128,18 @@ File.prototype.loadSync = function () {
* @returns {File}
*/
File.prototype.saveSync = function () {
function wrapValue (key, value) {
return key + ' = ' + (typeof value === 'string' ? value : JSON.stringify(value)) + '\n'
}
var ini = ''
for (var key in this._data) {
var value = this._data[key]
// TODO: Array type.
if (typeof value === 'object') {
if (_.isObject(value)) {
ini += '[ ' + key + ' ]\n'
for (var subKey in value) {
ini += wrapValue(subKey, value[subKey])
ini += _wrapValue(subKey, value[subKey])
}
ini += '\n'
}
ini += wrapValue(key, value)
ini += _wrapValue(key, value)
}
fs.writeFileSync(this.options.file, ini)
return this
@ -144,14 +167,15 @@ File.prototype.val = function (key, value, def) {
// Load config from File.
this.loadSync()
if (typeof value === 'undefined') {
if (_.isUndefined(value)) {
// Get config.
return this._data[key]
} else if (value == null) {
}
if (value == null) {
// Clear config.
delete this._data[key]
// Reset to default if necessary.
if (typeof def !== 'undefined') {
if (!_.isUndefined(def)) {
this._data[key] = def
}
return this.saveSync()
@ -163,3 +187,7 @@ File.prototype.val = function (key, value, def) {
this.saveSync()
return this
}
function _wrapValue (key, value) {
return key + ' = ' + (_.isString(value) ? value : JSON.stringify(value)) + '\n'
}

View File

@ -1,9 +1,18 @@
'use strict'
var chalk = require('chalk')
var hacks = ['debug', 'log', 'info', 'warn', 'error']
var colors = ['grey', '', 'green', 'yellow', 'red']
/**
* Hacked stdout
* @param {Object} options
* @return {N/A}
*/
module.exports = function (options) {
options = options || {}
var lev = options.level
var hacks = ['debug', 'log', 'info', 'warn', 'error']
if ((typeof lev === 'string' && typeof (lev = hacks.indexOf(lev)) === 'undefined') || (isFinite(lev) && (lev < 0 || lev > hacks.length))) {
options.level = 0
@ -16,7 +25,6 @@ module.exports = function (options) {
return
}
var colors = ['grey', '', 'green', 'yellow', 'red']
var consoled = {}
hacks.forEach(function (method) {