move more database reads into the database worker
This commit is contained in:
@@ -337,10 +337,8 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
|
||||
* -1 if you didn't find it
|
||||
|
||||
*/
|
||||
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX child process
|
||||
const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => {
|
||||
const cb = Util.once(Util.mkAsync(_cb));
|
||||
const store = Env.store;
|
||||
const Log = Env.Log;
|
||||
|
||||
// lastKnownhash === -1 means we want the complete history
|
||||
if (lastKnownHash === -1) { return void cb(null, 0); }
|
||||
@@ -384,7 +382,7 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX chil
|
||||
|
||||
offset = lkh;
|
||||
}));
|
||||
}).nThen((waitFor) => {
|
||||
}).nThen((w) => {
|
||||
// if offset is less than zero then presumably the channel has no messages
|
||||
// returning falls through to the next block and therefore returns -1
|
||||
if (offset !== -1) { return; }
|
||||
@@ -392,18 +390,12 @@ const getHistoryOffset = (Env, channelName, lastKnownHash, _cb) => { // XXX chil
|
||||
// do a lookup from the index
|
||||
// FIXME maybe we don't need this anymore?
|
||||
// otherwise we have a non-negative offset and we can start to read from there
|
||||
store.readMessagesBin(channelName, 0, (msgObj, readMore, abort) => {
|
||||
// tryParse return a parsed message or undefined
|
||||
const msg = tryParse(Env, msgObj.buff.toString('utf8'));
|
||||
// if it was undefined then go onto the next message
|
||||
if (typeof msg === "undefined") { return readMore(); }
|
||||
if (typeof(msg[4]) !== 'string' || lastKnownHash !== getHash(msg[4], Log)) {
|
||||
return void readMore();
|
||||
Env.getHashOffset(channelName, lastKnownHash, w(function (err, _offset) {
|
||||
if (err) {
|
||||
w.abort();
|
||||
return void cb(err);
|
||||
}
|
||||
offset = msgObj.offset;
|
||||
abort();
|
||||
}, waitFor(function (err) {
|
||||
if (err) { waitFor.abort(); return void cb(err); }
|
||||
offset = _offset;
|
||||
}));
|
||||
}).nThen(() => {
|
||||
cb(null, offset);
|
||||
@@ -897,6 +889,46 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
|
||||
});
|
||||
};
|
||||
|
||||
Env.getFileSize = function (channel, cb) {
|
||||
sendCommand({
|
||||
command: 'GET_FILE_SIZE',
|
||||
channel: channel,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Env.getDeletedPads = function (channels, cb) {
|
||||
sendCommand({
|
||||
command: "GET_DELETED_PADS",
|
||||
channels: channels,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Env.getTotalSize = function (channels, cb) {
|
||||
// we could take out locks for all of these channels,
|
||||
// but it's OK if the size is slightly off
|
||||
sendCommand({
|
||||
command: 'GET_TOTAL_SIZE',
|
||||
channels: channels,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Env.getMultipleFileSize = function (channels, cb) {
|
||||
sendCommand({
|
||||
command: "GET_MULTIPLE_FILE_SIZE",
|
||||
channels: channels,
|
||||
}, cb);
|
||||
};
|
||||
|
||||
Env.getHashOffset = function (channel, hash, cb) {
|
||||
Env.store.getWeakLock(channel, function (next) {
|
||||
sendCommand({
|
||||
command: 'GET_HASH_OFFSET',
|
||||
channel: channel,
|
||||
hash: hash,
|
||||
}, Util.both(next, cb));
|
||||
});
|
||||
};
|
||||
|
||||
//console.log("index workers ready");
|
||||
cb(void 0);
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user