move the 'getOlderHistory' call into the database worker
This commit is contained in:
@@ -445,48 +445,6 @@ const getHistoryAsync = (Env, channelName, lastKnownHash, beforeHash, handler, c
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
/* getOlderHistory
|
|
||||||
* allows clients to query for all messages until a known hash is read
|
|
||||||
* stores all messages in history as they are read
|
|
||||||
* can therefore be very expensive for memory
|
|
||||||
* should probably be converted to a streaming interface
|
|
||||||
|
|
||||||
Used by:
|
|
||||||
* GET_HISTORY_RANGE
|
|
||||||
*/
|
|
||||||
const getOlderHistory = function (Env, channelName, oldestKnownHash, cb) { // XXX child process
|
|
||||||
const store = Env.store;
|
|
||||||
const Log = Env.Log;
|
|
||||||
var messageBuffer = [];
|
|
||||||
var found = false;
|
|
||||||
store.getMessages(channelName, function (msgStr) {
|
|
||||||
if (found) { return; }
|
|
||||||
|
|
||||||
let parsed = tryParse(Env, msgStr);
|
|
||||||
if (typeof parsed === "undefined") { return; }
|
|
||||||
|
|
||||||
// identify classic metadata messages by their inclusion of a channel.
|
|
||||||
// and don't send metadata, since:
|
|
||||||
// 1. the user won't be interested in it
|
|
||||||
// 2. this metadata is potentially incomplete/incorrect
|
|
||||||
if (isMetadataMessage(parsed)) { return; }
|
|
||||||
|
|
||||||
var content = parsed[4];
|
|
||||||
if (typeof(content) !== 'string') { return; }
|
|
||||||
|
|
||||||
var hash = getHash(content, Log);
|
|
||||||
if (hash === oldestKnownHash) {
|
|
||||||
found = true;
|
|
||||||
}
|
|
||||||
messageBuffer.push(parsed);
|
|
||||||
}, function (err) {
|
|
||||||
if (err && err.code !== 'ENOENT') {
|
|
||||||
Log.error("HK_GET_OLDER_HISTORY", err);
|
|
||||||
}
|
|
||||||
cb(messageBuffer);
|
|
||||||
});
|
|
||||||
};
|
|
||||||
|
|
||||||
const handleRPC = function (Env, Server, seq, userId, parsed) {
|
const handleRPC = function (Env, Server, seq, userId, parsed) {
|
||||||
const HISTORY_KEEPER_ID = Env.id;
|
const HISTORY_KEEPER_ID = Env.id;
|
||||||
|
|
||||||
@@ -662,7 +620,11 @@ const handleGetHistoryRange = function (Env, Server, seq, userId, parsed) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
Server.send(userId, [seq, 'ACK']);
|
Server.send(userId, [seq, 'ACK']);
|
||||||
return void getOlderHistory(Env, channelName, oldestKnownHash, function (messages) {
|
Env.getOlderHistory(channelName, oldestKnownHash, function (err, messages) {
|
||||||
|
if (err && err.code !== 'ENOENT') {
|
||||||
|
Env.Log.error("HK_GET_OLDER_HISTORY", err);
|
||||||
|
}
|
||||||
|
|
||||||
var toSend = [];
|
var toSend = [];
|
||||||
if (typeof (desiredMessages) === "number") {
|
if (typeof (desiredMessages) === "number") {
|
||||||
toSend = messages.slice(-desiredMessages);
|
toSend = messages.slice(-desiredMessages);
|
||||||
@@ -914,8 +876,18 @@ HK.initializeIndexWorkers = function (Env, config, _cb) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Env.getOlderHistory = function (channel, oldestKnownHash, cb) {
|
||||||
|
Env.store.getWeakLock(channel, function (next) {
|
||||||
|
sendCommand({
|
||||||
|
channel: channel,
|
||||||
|
command: "GET_OLDER_HISTORY",
|
||||||
|
hash: oldestKnownHash,
|
||||||
|
}, Util.both(next, cb));
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
//console.log("index workers ready");
|
//console.log("index workers ready");
|
||||||
cb(void 0, sendCommand);
|
cb(void 0);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -144,9 +144,53 @@ const computeMetadata = function (data, cb, errorHandler) {
|
|||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* getOlderHistory
|
||||||
|
* allows clients to query for all messages until a known hash is read
|
||||||
|
* stores all messages in history as they are read
|
||||||
|
* can therefore be very expensive for memory
|
||||||
|
* should probably be converted to a streaming interface
|
||||||
|
|
||||||
|
Used by:
|
||||||
|
* GET_HISTORY_RANGE
|
||||||
|
*/
|
||||||
|
|
||||||
|
const getOlderHistory = function (data, cb) {
|
||||||
|
const oldestKnownHash = data.hash;
|
||||||
|
const channelName = data.channel;
|
||||||
|
|
||||||
|
//const store = Env.store;
|
||||||
|
//const Log = Env.Log;
|
||||||
|
var messageBuffer = [];
|
||||||
|
var found = false;
|
||||||
|
store.getMessages(channelName, function (msgStr) {
|
||||||
|
if (found) { return; }
|
||||||
|
|
||||||
|
let parsed = tryParse(Env, msgStr);
|
||||||
|
if (typeof parsed === "undefined") { return; }
|
||||||
|
|
||||||
|
// identify classic metadata messages by their inclusion of a channel.
|
||||||
|
// and don't send metadata, since:
|
||||||
|
// 1. the user won't be interested in it
|
||||||
|
// 2. this metadata is potentially incomplete/incorrect
|
||||||
|
if (HK.isMetadataMessage(parsed)) { return; }
|
||||||
|
|
||||||
|
var content = parsed[4];
|
||||||
|
if (typeof(content) !== 'string') { return; }
|
||||||
|
|
||||||
|
var hash = HK.getHash(content);
|
||||||
|
if (hash === oldestKnownHash) {
|
||||||
|
found = true;
|
||||||
|
}
|
||||||
|
messageBuffer.push(parsed);
|
||||||
|
}, function (err) {
|
||||||
|
cb(err, messageBuffer);
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
const COMMANDS = {
|
const COMMANDS = {
|
||||||
COMPUTE_INDEX: computeIndex,
|
COMPUTE_INDEX: computeIndex,
|
||||||
COMPUTE_METADATA: computeMetadata,
|
COMPUTE_METADATA: computeMetadata,
|
||||||
|
GET_OLDER_HISTORY: getOlderHistory,
|
||||||
};
|
};
|
||||||
|
|
||||||
process.on('message', function (data) {
|
process.on('message', function (data) {
|
||||||
|
|||||||
Reference in New Issue
Block a user