recover from worker faults and unify worker types

This commit is contained in:
ansuz
2020-04-16 13:02:07 -04:00
parent e0a6852b79
commit 04ab7f538a
3 changed files with 122 additions and 212 deletions

View File

@@ -3,103 +3,14 @@
const Util = require("../common-util");
const nThen = require('nthen');
const OS = require("os");
const numCPUs = OS.cpus().length;
const { fork } = require('child_process');
const Workers = module.exports;
const PID = process.pid;
const CRYPTO_PATH = 'lib/workers/crypto-worker';
const DB_PATH = 'lib/workers/db-worker';
const MAX_JOBS = 16;
Workers.initializeValidationWorkers = function (Env) {
if (typeof(Env.validateMessage) !== 'undefined') {
return void console.error("validation workers are already initialized");
}
// Create our workers
const workers = [];
for (let i = 0; i < numCPUs; i++) {
workers.push(fork(CRYPTO_PATH));
}
const response = Util.response(function (errLabel, info) {
Env.Log.error('HK_VALIDATE_WORKER__' + errLabel, info);
});
var initWorker = function (worker) {
worker.on('message', function (res) {
if (!res || !res.txid) { return; }
response.handle(res.txid, [res.error, res.value]);
});
var substituteWorker = Util.once( function () {
Env.Log.info("SUBSTITUTE_VALIDATION_WORKER", '');
var idx = workers.indexOf(worker);
if (idx !== -1) {
workers.splice(idx, 1);
}
// Spawn a new one
var w = fork(CRYPTO_PATH);
workers.push(w);
initWorker(w);
});
// Spawn a new process in one ends
worker.on('exit', substituteWorker);
worker.on('close', substituteWorker);
worker.on('error', function (err) {
substituteWorker();
Env.Log.error('VALIDATION_WORKER_ERROR', {
error: err,
});
});
};
workers.forEach(initWorker);
var nextWorker = 0;
const send = function (msg, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
// let's be paranoid about asynchrony and only calling back once..
nextWorker = (nextWorker + 1) % workers.length;
if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') {
return void cb("INVALID_WORKERS");
}
var txid = msg.txid = Util.uid();
// expect a response within 45s
response.expect(txid, cb, 60000);
// Send the request
workers[nextWorker].send(msg);
};
Env.validateMessage = function (signedMsg, key, cb) {
send({
msg: signedMsg,
key: key,
command: 'INLINE',
}, cb);
};
Env.checkSignature = function (signedMsg, signature, publicKey, cb) {
send({
command: 'DETACHED',
sig: signature,
msg: signedMsg,
key: publicKey,
}, cb);
};
Env.hashChannelList = function (channels, cb) {
send({
command: 'HASH_CHANNEL_LIST',
channels: channels,
}, cb);
};
};
Workers.initializeIndexWorkers = function (Env, config, _cb) {
Workers.initialize = function (Env, config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
const workers = [];
@@ -124,12 +35,18 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
return response.expected(id)? guid(): id;
};
const MAX_JOBS = 32;
var workerOffset = -1;
var queue = [];
var getAvailableWorkerIndex = function () {
// If there is already a backlog of tasks you can avoid some work
// by going to the end of the line
if (queue.length) { return -1; }
var L = workers.length;
if (L === 0) {
console.log("no workers available"); // XXX
Log.error('NO_WORKERS_AVAILABLE', {
queue: queue.length,
});
return -1;
}
@@ -157,8 +74,6 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
return -1;
};
var queue = [];
var sendCommand = function (msg, _cb) {
var index = getAvailableWorkerIndex();
@@ -383,11 +298,33 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) {
}, cb);
};
// Synchronous crypto functions
Env.validateMessage = function (signedMsg, key, cb) {
sendCommand({
msg: signedMsg,
key: key,
command: 'INLINE',
}, cb);
};
Env.checkSignature = function (signedMsg, signature, publicKey, cb) {
sendCommand({
command: 'DETACHED',
sig: signature,
msg: signedMsg,
key: publicKey,
}, cb);
};
Env.hashChannelList = function (channels, cb) {
sendCommand({
command: 'HASH_CHANNEL_LIST',
channels: channels,
}, cb);
};
cb(void 0);
});
};
Workers.initialize = function (Env, config, cb) {
Workers.initializeValidationWorkers(Env);
Workers.initializeIndexWorkers(Env, config, cb);
};