compute metadata in the same child process that builds indexes
This commit is contained in:
parent
479b76f848
commit
471e374533
@ -18,15 +18,7 @@ Data.getMetadataRaw = function (Env, channel /* channelName */, _cb) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Env.batchMetadata(channel, cb, function (done) {
|
Env.batchMetadata(channel, cb, function (done) {
|
||||||
var ref = {};
|
Env.computeMetadata(channel, done);
|
||||||
var lineHandler = Meta.createLineHandler(ref, Env.Log.error);
|
|
||||||
return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
|
|
||||||
if (err) {
|
|
||||||
// stream errors?
|
|
||||||
return void done(err);
|
|
||||||
}
|
|
||||||
done(void 0, ref.meta);
|
|
||||||
});
|
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@ -247,11 +247,10 @@ module.exports.create = function (config, cb) {
|
|||||||
channelExpirationMs: config.channelExpirationMs,
|
channelExpirationMs: config.channelExpirationMs,
|
||||||
verbose: config.verbose,
|
verbose: config.verbose,
|
||||||
openFileLimit: config.openFileLimit,
|
openFileLimit: config.openFileLimit,
|
||||||
}, w(function (err, computeIndex) {
|
}, w(function (err) {
|
||||||
if (err) {
|
if (err) {
|
||||||
throw new Error(err);
|
throw new Error(err);
|
||||||
}
|
}
|
||||||
Env.computeIndex = computeIndex;
|
|
||||||
}));
|
}));
|
||||||
}).nThen(function (w) {
|
}).nThen(function (w) {
|
||||||
// create a task store
|
// create a task store
|
||||||
|
|||||||
@ -337,7 +337,7 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
|
|||||||
* -1 if you didn't find it
|
* -1 if you didn't find it
|
||||||
|
|
||||||
*/
|
*/
|
||||||
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
|
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX child process
|
||||||
const cb = Util.once(Util.mkAsync(_cb));
|
const cb = Util.once(Util.mkAsync(_cb));
|
||||||
const store = Env.store;
|
const store = Env.store;
|
||||||
const Log = Env.Log;
|
const Log = Env.Log;
|
||||||
@ -454,7 +454,7 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|||||||
Used by:
|
Used by:
|
||||||
* GET_HISTORY_RANGE
|
* GET_HISTORY_RANGE
|
||||||
*/
|
*/
|
||||||
const getOlderHistory = function (Env, channelName, oldestKnownHash, cb) {
|
const getOlderHistory = function (Env, channelName, oldestKnownHash, cb) { // XXX child process
|
||||||
const store = Env.store;
|
const store = Env.store;
|
||||||
const Log = Env.Log;
|
const Log = Env.Log;
|
||||||
var messageBuffer = [];
|
var messageBuffer = [];
|
||||||
@ -833,7 +833,14 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
|
|||||||
});
|
});
|
||||||
|
|
||||||
worker.on('message', function (res) {
|
worker.on('message', function (res) {
|
||||||
if (!res || !res.txid) { return; }
|
if (!res) { return; }
|
||||||
|
if (!res.txid) {
|
||||||
|
// !report errors...
|
||||||
|
if (res.error) {
|
||||||
|
Env.Log.error(res.error, res.value);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
//console.log(res);
|
//console.log(res);
|
||||||
try {
|
try {
|
||||||
response.handle(res.txid, [res.error, res.value]);
|
response.handle(res.txid, [res.error, res.value]);
|
||||||
@ -860,20 +867,18 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
|
|||||||
};
|
};
|
||||||
|
|
||||||
var workerIndex = 0;
|
var workerIndex = 0;
|
||||||
var sendCommand = function (Env, channel, cb) {
|
var sendCommand = function (msg, _cb) {
|
||||||
|
var cb = Util.once(Util.mkAsync(_cb));
|
||||||
|
|
||||||
workerIndex = (workerIndex + 1) % workers.length;
|
workerIndex = (workerIndex + 1) % workers.length;
|
||||||
if (workers.length === 0 ||
|
if (workers.length === 0 ||
|
||||||
typeof(workers[workerIndex].send) !== 'function') {
|
typeof(workers[workerIndex].send) !== 'function') {
|
||||||
return void cb("NO_WORKERS");
|
return void cb("NO_WORKERS");
|
||||||
}
|
}
|
||||||
Env.store.getWeakLock(channel, function (next) {
|
|
||||||
const txid = Util.uid();
|
const txid = Util.uid();
|
||||||
response.expect(txid, Util.both(next, cb), 45000);
|
msg.txid = txid;
|
||||||
workers[workerIndex].send({
|
response.expect(txid, cb, 45000);
|
||||||
txid: txid,
|
workers[workerIndex].send(msg);
|
||||||
args: channel,
|
|
||||||
});
|
|
||||||
});
|
|
||||||
};
|
};
|
||||||
|
|
||||||
nThen(function (w) {
|
nThen(function (w) {
|
||||||
@ -885,6 +890,30 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
|
|||||||
}));
|
}));
|
||||||
});
|
});
|
||||||
}).nThen(function () {
|
}).nThen(function () {
|
||||||
|
Env.computeIndex = function (Env, channel, cb) {
|
||||||
|
Env.store.getWeakLock(channel, function (next) {
|
||||||
|
sendCommand({
|
||||||
|
channel: channel,
|
||||||
|
command: 'COMPUTE_INDEX',
|
||||||
|
}, function (err, index) {
|
||||||
|
next();
|
||||||
|
cb(err, index);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
Env.computeMetadata = function (channel, cb) {
|
||||||
|
Env.store.getWeakLock(channel, function (next) {
|
||||||
|
sendCommand({
|
||||||
|
channel: channel,
|
||||||
|
command: 'COMPUTE_METADATA',
|
||||||
|
}, function (err, metadata) {
|
||||||
|
next();
|
||||||
|
cb(err, metadata);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
//console.log("index workers ready");
|
//console.log("index workers ready");
|
||||||
cb(void 0, sendCommand);
|
cb(void 0, sendCommand);
|
||||||
});
|
});
|
||||||
@ -925,14 +954,13 @@ HK.initializeValidationWorkers = function (Env) {
|
|||||||
|
|
||||||
var nextWorker = 0;
|
var nextWorker = 0;
|
||||||
const send = function (msg, _cb) {
|
const send = function (msg, _cb) {
|
||||||
|
var cb = Util.once(Util.mkAsync(_cb));
|
||||||
// let's be paranoid about asynchrony and only calling back once..
|
// let's be paranoid about asynchrony and only calling back once..
|
||||||
nextWorker = (nextWorker + 1) % workers.length;
|
nextWorker = (nextWorker + 1) % workers.length;
|
||||||
if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') {
|
if (workers.length === 0 || typeof(workers[nextWorker].send) !== 'function') {
|
||||||
console.error(workers);
|
return void cb("INVALID_WORKERS");
|
||||||
throw new Error("INVALID_WORKERS");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var cb = Util.once(Util.mkAsync(_cb));
|
|
||||||
var txid = msg.txid = Util.uid();
|
var txid = msg.txid = Util.uid();
|
||||||
|
|
||||||
// expect a response within 15s
|
// expect a response within 15s
|
||||||
|
|||||||
@ -5,6 +5,7 @@ const HK = require("../hk-util");
|
|||||||
const Store = require("../storage/file");
|
const Store = require("../storage/file");
|
||||||
const Util = require("../common-util");
|
const Util = require("../common-util");
|
||||||
const nThen = require("nthen");
|
const nThen = require("nthen");
|
||||||
|
const Meta = require("../metadata");
|
||||||
|
|
||||||
const Env = {};
|
const Env = {};
|
||||||
|
|
||||||
@ -46,7 +47,13 @@ const tryParse = function (Env, str) {
|
|||||||
* including the initial metadata line, if it exists
|
* including the initial metadata line, if it exists
|
||||||
|
|
||||||
*/
|
*/
|
||||||
const computeIndex = function (channelName, cb) {
|
const computeIndex = function (data, cb) {
|
||||||
|
if (!data || !data.channel) {
|
||||||
|
return void cb('E_NO_CHANNEL');
|
||||||
|
}
|
||||||
|
|
||||||
|
const channelName = data.channel;
|
||||||
|
|
||||||
const cpIndex = [];
|
const cpIndex = [];
|
||||||
let messageBuf = [];
|
let messageBuf = [];
|
||||||
let i = 0;
|
let i = 0;
|
||||||
@ -125,45 +132,60 @@ const computeIndex = function (channelName, cb) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
const computeMetadata = function (data, cb, errorHandler) {
|
||||||
|
const ref = {};
|
||||||
|
const lineHandler = Meta.createLineHandler(ref, errorHandler);
|
||||||
|
return void store.readChannelMetadata(data.channel, lineHandler, function (err) {
|
||||||
|
if (err) {
|
||||||
|
// stream errors?
|
||||||
|
return void cb(err);
|
||||||
|
}
|
||||||
|
cb(void 0, ref.meta);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
const COMMANDS = {
|
||||||
|
COMPUTE_INDEX: computeIndex,
|
||||||
|
COMPUTE_METADATA: computeMetadata,
|
||||||
|
};
|
||||||
|
|
||||||
process.on('message', function (data) {
|
process.on('message', function (data) {
|
||||||
if (!data || !data.txid) {
|
if (!data || !data.txid) {
|
||||||
return void process.send({
|
return void process.send({
|
||||||
error:'E_INVAL'
|
error:'E_INVAL'
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
const txid = data.txid;
|
|
||||||
|
const cb = function (err, value) {
|
||||||
|
if (err) {
|
||||||
|
return void process.send({
|
||||||
|
txid: data.txid,
|
||||||
|
error: err,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
process.send({
|
||||||
|
txid: data.txid,
|
||||||
|
value: value,
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
if (!ready) {
|
if (!ready) {
|
||||||
return void init(data.config, function (err) {
|
return void init(data.config, function (err) {
|
||||||
if (err) {
|
if (err) { return void cb(err); }
|
||||||
return void process.send({
|
|
||||||
txid: txid,
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
ready = true;
|
ready = true;
|
||||||
process.send({txid: txid,});
|
cb();
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
const channel = data.args;
|
const command = COMMANDS[data.command];
|
||||||
if (!channel) {
|
if (typeof(command) !== 'function') {
|
||||||
return void process.send({
|
return void cb("E_BAD_COMMAND");
|
||||||
error: 'E_NO_CHANNEL',
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
command(data, cb, function (label, info) {
|
||||||
// computeIndex
|
// for streaming errors
|
||||||
computeIndex(channel, function (err, index) {
|
process.send({
|
||||||
if (err) {
|
error: label,
|
||||||
return void process.send({
|
value: info,
|
||||||
txid: txid,
|
|
||||||
error: err,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
return void process.send({
|
|
||||||
txid: txid,
|
|
||||||
value: index,
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user