diff --git a/lib/workers/index.js b/lib/workers/index.js index 3190741d3..d2944225c 100644 --- a/lib/workers/index.js +++ b/lib/workers/index.js @@ -124,16 +124,56 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { return response.expected(id)? guid(): id; }; - var workerIndex = 0; - var sendCommand = function (msg, _cb) { - var cb = Util.once(Util.mkAsync(_cb)); - - workerIndex = (workerIndex + 1) % workers.length; - if (!isWorker(workers[workerIndex])) { - return void cb("NO_WORKERS"); + const MAX_JOBS = 32; + var workerOffset = -1; + var getAvailableWorkerIndex = function () { + var L = workers.length; + if (L === 0) { + console.log("no workers available"); // XXX + return -1; } - var state = workers[workerIndex]; + // cycle through the workers once + // start from a different offset each time + // return -1 if none are available + + workerOffset = (workerOffset + 1) % L; + + var temp; + for (let i = 0; i < L; i++) { + temp = (workerOffset + i) % L; +/* I'd like for this condition to be more efficient + (`Object.keys` is sub-optimal) but I found some bugs in my initial + implementation stemming from a task counter variable going out-of-sync + with reality when a worker crashed and its tasks were re-assigned to + its substitute. I'm sure it can be done correctly and efficiently, + but this is a relatively easy way to make sure it's always up to date. + We'll see how it performs in practice before optimizing. +*/ + if (workers[temp] && Object.keys(workers[temp]).length < MAX_JOBS) { + return temp; + } + } + return -1; + }; + + var queue = []; + + var sendCommand = function (msg, _cb) { + var index = getAvailableWorkerIndex(); + + var state = workers[index]; + // if there is no worker available: + if (!isWorker(state)) { + // queue the message for when one becomes available + queue.push({ + msg: msg, + cb: _cb, + }); + return; + } + + var cb = Util.once(Util.mkAsync(_cb)); const txid = guid(); msg.txid = txid; @@ -141,14 +181,42 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { // track which worker is doing which jobs state.tasks[txid] = msg; - response.expect(txid, function (err, value) { - // clean up when you get a response - delete state[txid]; - cb(err, value); - }, 60000); + + response.expect(txid, cb, 60000); state.worker.send(msg); }; + var handleResponse = function (state, res) { + if (!res) { return; } + // handle log messages before checking if it was addressed to your PID + // it might still be useful to know what happened inside an orphaned worker + if (res.log) { + return void handleLog(res.log, res.label, res.info); + } + // but don't bother handling things addressed to other processes + // since it's basically guaranteed not to work + if (res.pid !== PID) { + return void Log.error("WRONG_PID", res); + } + + if (!res.txid) { return; } + response.handle(res.txid, [res.error, res.value]); + delete state.tasks[res.txid]; + if (!queue.length) { return; } + + var nextMsg = queue.shift(); +/* `nextMsg` was at the top of the queue. + We know that a job just finished and all of this code + is synchronous, so calling `sendCommand` should take the worker + which was just freed up. This is somewhat fragile though, so + be careful if you want to modify this block. The risk is that + we take something that was at the top of the queue and push it + to the back because the following msg took its place. OR, in an + even worse scenario, we cycle through the queue but don't run anything. +*/ + sendCommand(nextMsg.msg, nextMsg.cb); + }; + const initWorker = function (worker, cb) { const txid = guid(); @@ -170,19 +238,7 @@ Workers.initializeIndexWorkers = function (Env, config, _cb) { }); worker.on('message', function (res) { - if (!res) { return; } - // handle log messages before checking if it was addressed to your PID - // it might still be useful to know what happened inside an orphaned worker - if (res.log) { - return void handleLog(res.log, res.label, res.info); - } - // but don't bother handling things addressed to other processes - // since it's basically guaranteed not to work - if (res.pid !== PID) { - return void Log.error("WRONG_PID", res); - } - - response.handle(res.txid, [res.error, res.value]); + handleResponse(state, res); }); var substituteWorker = Util.once(function () {