rewrite task execution as API instead of a script

This commit is contained in:
ansuz
2019-04-23 13:26:45 +02:00
parent ecd9647a2e
commit 7b55df5931
2 changed files with 218 additions and 109 deletions

View File

@@ -6,6 +6,11 @@ var nThen = require("nthen");
var Tasks = module.exports;
var tryParse = function (s) {
try { return JSON.parse(s); }
catch (e) { return null; }
};
var encode = function (time, command, args) {
if (typeof(time) !== 'number') { return null; }
if (typeof(command) !== 'string') { return null; }
@@ -73,25 +78,199 @@ var write = function (env, task, cb) {
});
};
// TODO implement a standard API for removing tasks
// currently they are deleted manually in 'expire-channels.js'
// var remove = function (env, id, cb) { };
var list = Tasks.list = function (env, cb) {
var rootDirs;
nThen(function (w) {
// read the root directory
Fs.readdir(env.root, w(function (e, list) {
if (e) {
env.log.error("TASK_ROOT_DIR", {
root: env.root,
error: e,
});
return void cb(e);
}
if (list.length === 0) {
w.abort();
return void cb(void 0, []);
}
rootDirs = list;
}));
}).nThen(function () {
// schedule the nested directories for exploration
// return a list of paths to tasks
var queue = nThen(function () {});
var allPaths = [];
// We prioritize a small footprint over speed, so we
// iterate over directories in serial rather than parallel
rootDirs.forEach(function (dir) {
queue.nThen(function (w) {
var subPath = Path.join(env.root, dir);
Fs.readdir(subPath, w(function (e, paths) {
if (e) {
env.log.error("TASKS_INVALID_SUBDIR", {
path: subPath,
error: e,
});
return;
}
// concat in place
Array.prototype.push.apply(allPaths, paths.map(function (p) {
return Path.join(subPath, p);
}));
}));
});
});
queue.nThen(function () {
cb(void 0, allPaths);
});
});
};
var remove = function (env, path, cb) {
Fs.unlink(path, cb);
};
var read = function (env, filePath, cb) {
Fs.readFile(filePath, 'utf8', function (e, str) {
if (e) { return void cb(e); }
var task = tryParse(str);
if (!Array.isArray(task) || task.length < 2) {
env.log("INVALID_TASK", {
path: filePath,
task: task,
});
return cb(new Error('INVALID_TASK'));
}
cb(void 0, task);
});
};
var run = Tasks.run = function (env, path, cb) {
var CURRENT = +new Date();
var Log = env.log;
var task, time, command, args;
nThen(function (w) {
read(env, path, w(function (err, _task) {
if (err) {
w.abort();
// there was a file but it wasn't valid?
return void cb(err);
}
task = _task;
time = task[0];
if (time > CURRENT) {
w.abort();
return cb();
}
command = task[1];
args = task.slice(2);
}));
}).nThen(function (w) {
switch (command) {
case 'EXPIRE':
Log.info('DELETION_SCHEDULED_EXPIRATION', {
task: task,
});
env.store.removeChannel(args[0], w());
break;
default:
Log.warn("TASKS_UNKNOWN_COMMAND", task);
}
}).nThen(function () {
// remove the task file...
remove(env, path, function (err) {
if (err) {
Log.error('TASKS_RECORD_REMOVAL', {
path: path,
err: err,
});
}
cb();
});
});
};
var runAll = function (env, cb) {
// check if already running and bail out if so
if (env.running) {
return void cb("TASK_CONCURRENCY");
}
// if not, set a flag to block concurrency and proceed
env.running = true;
var paths;
nThen(function (w) {
list(env, w(function (err, _paths) {
if (err) {
w.abort();
env.running = false;
return void cb(err);
}
paths = _paths;
}));
}).nThen(function (w) {
var done = w();
var nt = nThen(function () {});
paths.forEach(function (path) {
nt.nThen(function (w) {
run(env, path, w(function (err) {
if (err) {
// Any errors are already logged in 'run'
// the admin will need to review the logs and clean up
}
}));
});
});
nt.nThen(function () {
done();
});
}).nThen(function (/*w*/) {
env.running = false;
cb();
});
};
Tasks.create = function (config, cb) {
if (!config.store) { throw new Error("E_STORE_REQUIRED"); }
if (!config.log) { throw new Error("E_LOG_REQUIRED"); }
var env = {
root: config.taskPath || './tasks',
log: config.log,
store: config.store,
};
// make sure the path exists...
Fse.mkdirp(env.root, 0x1ff, function (err) {
if (err && err.code !== 'EEXIST') {
throw err;
}
if (err) { return void cb(err); }
cb(void 0, {
write: function (time, command, args, cb) {
var task = encode(time, command, args);
write(env, task, cb);
},
list: function (olderThan, cb) {
list(env, olderThan, cb);
},
remove: function (id, cb) {
remove(env, id, cb);
},
run: function (id, cb) {
run(env, id, cb);
},
runAll: function (cb) {
runAll(env, cb);
},
});
});
};