finally get around to reorganizing the messiest part of history keeper

This commit is contained in:
ansuz 2020-01-16 14:25:00 -05:00
parent 4418f6a113
commit 8c5c643a25

View File

@ -730,227 +730,204 @@ module.exports.create = function (cfg) {
} }
}; };
/* onDirectMessage const handleGetHistory = function (ctx, seq, user, parsed) {
* exported for use by the netflux-server // parsed[1] is the channel id
* parses and handles all direct messages directed to the history keeper // parsed[2] is a validation key or an object containing metadata (optionnal)
* check if it's expired and execute all the associated side-effects // parsed[3] is the last known hash (optionnal)
* routes queries to the appropriate handlers sendMsg(ctx, user, [seq, 'ACK']);
* GET_HISTORY var channelName = parsed[1];
* GET_HISTORY_RANGE var config = parsed[2];
* GET_FULL_HISTORY var metadata = {};
* RPC var lastKnownHash;
* if the rpc has special hooks that the history keeper needs to be aware of...
* execute them here...
*/ // clients can optionally pass a map of attributes
const onDirectMessage = function (ctx, seq, user, json) { // if the channel already exists this map will be ignored
let parsed; // otherwise it will be stored as the initial metadata state for the channel
let channelName; if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
lastKnownHash = config.lastKnownHash;
metadata = config.metadata || {};
if (metadata.expire) {
metadata.expire = +metadata.expire * 1000 + (+new Date());
}
}
metadata.channel = channelName;
metadata.created = +new Date();
Log.silly('HK_MESSAGE', json); // if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message
try { // on the floor instead of doing a bunch of extra work
parsed = JSON.parse(json[2]); // TODO send them an error message so they know something is wrong
} catch (err) { if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
Log.error("HK_PARSE_CLIENT_MESSAGE", json); return void Log.error('HK_INVALID_KEY', metadata.validateKey);
return;
} }
// If the requested history is for an expired channel, abort nThen(function (waitFor) {
// Note the if we don't have the keys for that channel in metadata_cache, we'll var w = waitFor();
// have to abort later (once we know the expiration time)
if (checkExpired(ctx, parsed[1])) { return; }
if (parsed[0] === 'GET_HISTORY') { /* unless this is a young channel, we will serve all messages from an offset
// parsed[1] is the channel id this will not include the channel metadata, so we need to explicitly fetch that.
// parsed[2] is a validation key or an object containing metadata (optionnal) unfortunately, we can't just serve it blindly, since then young channels will
// parsed[3] is the last known hash (optionnal) send the metadata twice, so let's do a quick check of what we're going to serve...
sendMsg(ctx, user, [seq, 'ACK']); */
channelName = parsed[1]; getIndex(ctx, channelName, waitFor((err, index) => {
var config = parsed[2]; /* if there's an error here, it should be encountered
var metadata = {}; and handled by the next nThen block.
var lastKnownHash; so, let's just fall through...
// clients can optionally pass a map of attributes
// if the channel already exists this map will be ignored
// otherwise it will be stored as the initial metadata state for the channel
if (config && typeof config === "object" && !Array.isArray(parsed[2])) {
lastKnownHash = config.lastKnownHash;
metadata = config.metadata || {};
if (metadata.expire) {
metadata.expire = +metadata.expire * 1000 + (+new Date());
}
}
metadata.channel = channelName;
metadata.created = +new Date();
// if the user sends us an invalid key, we won't be able to validate their messages
// so they'll never get written to the log anyway. Let's just drop their message
// on the floor instead of doing a bunch of extra work
// TODO send them an error message so they know something is wrong
if (metadata.validateKey && !isValidValidateKeyString(metadata.validateKey)) {
return void Log.error('HK_INVALID_KEY', metadata.validateKey);
}
nThen(function (waitFor) {
var w = waitFor();
/* unless this is a young channel, we will serve all messages from an offset
this will not include the channel metadata, so we need to explicitly fetch that.
unfortunately, we can't just serve it blindly, since then young channels will
send the metadata twice, so let's do a quick check of what we're going to serve...
*/ */
getIndex(ctx, channelName, waitFor((err, index) => { if (err) { return w(); }
/* if there's an error here, it should be encountered
and handled by the next nThen block.
so, let's just fall through...
*/
if (err) { return w(); }
// it's possible that the channel doesn't have metadata // it's possible that the channel doesn't have metadata
// but in that case there's no point in checking if the channel expired // but in that case there's no point in checking if the channel expired
// or in trying to send metadata, so just skip this block // or in trying to send metadata, so just skip this block
if (!index || !index.metadata) { return void w(); } if (!index || !index.metadata) { return void w(); }
// And then check if the channel is expired. If it is, send the error and abort // And then check if the channel is expired. If it is, send the error and abort
// FIXME this is hard to read because 'checkExpired' has side effects // FIXME this is hard to read because 'checkExpired' has side effects
if (checkExpired(ctx, channelName)) { return void waitFor.abort(); } if (checkExpired(ctx, channelName)) { return void waitFor.abort(); }
// always send metadata with GET_HISTORY requests // always send metadata with GET_HISTORY requests
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(index.metadata)], w);
})); }));
}).nThen(() => { }).nThen(() => {
let msgCount = 0; let msgCount = 0;
// TODO compute lastKnownHash in a manner such that it will always skip past the metadata line? // TODO compute lastKnownHash in a manner such that it will always skip past the metadata line?
getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => { getHistoryAsync(ctx, channelName, lastKnownHash, false, (msg, readMore) => {
if (!msg) { return; } if (!msg) { return; }
msgCount++; msgCount++;
// avoid sending the metadata message a second time // avoid sending the metadata message a second time
if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); } if (isMetadataMessage(msg) && metadata_cache[channelName]) { return readMore(); }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)], readMore);
}, (err) => { }, (err) => {
if (err && err.code !== 'ENOENT') { if (err && err.code !== 'ENOENT') {
if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); } if (err.message !== 'EINVAL') { Log.error("HK_GET_HISTORY", err); }
const parsedMsg = {error:err.message, channel: channelName}; const parsedMsg = {error:err.message, channel: channelName};
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
return; return;
} }
const chan = ctx.channels[channelName]; const chan = ctx.channels[channelName];
if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) { if (msgCount === 0 && !metadata_cache[channelName] && chan && chan.indexOf(user) > -1) {
metadata_cache[channelName] = metadata; metadata_cache[channelName] = metadata;
// the index will have already been constructed and cached at this point // the index will have already been constructed and cached at this point
// but it will not have detected any metadata because it hasn't been written yet // but it will not have detected any metadata because it hasn't been written yet
// this means that the cache starts off as invalid, so we have to correct it // this means that the cache starts off as invalid, so we have to correct it
if (chan && chan.index) { chan.index.metadata = metadata; } if (chan && chan.index) { chan.index.metadata = metadata; }
// new channels will always have their metadata written to a dedicated metadata log // new channels will always have their metadata written to a dedicated metadata log
// but any lines after the first which are not amendments in a particular format will be ignored. // but any lines after the first which are not amendments in a particular format will be ignored.
// Thus we should be safe from race conditions here if just write metadata to the log as below... // Thus we should be safe from race conditions here if just write metadata to the log as below...
// TODO validate this logic // TODO validate this logic
// otherwise maybe we need to check that the metadata log is empty as well // otherwise maybe we need to check that the metadata log is empty as well
store.writeMetadata(channelName, JSON.stringify(metadata), function (err) { store.writeMetadata(channelName, JSON.stringify(metadata), function (err) {
if (err) { if (err) {
// FIXME tell the user that there was a channel error? // FIXME tell the user that there was a channel error?
return void Log.error('HK_WRITE_METADATA', { return void Log.error('HK_WRITE_METADATA', {
channel: channelName, channel: channelName,
error: err, error: err,
});
}
});
// write tasks
if(tasks && metadata.expire && typeof(metadata.expire) === 'number') {
// the fun part...
// the user has said they want this pad to expire at some point
tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
if (err) {
// if there is an error, we don't want to crash the whole server...
// just log it, and if there's a problem you'll be able to fix it
// at a later date with the provided information
Log.error('HK_CREATE_EXPIRE_TASK', err);
Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
}
}); });
} }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]); });
}
// write tasks
// End of history message: if(tasks && metadata.expire && typeof(metadata.expire) === 'number') {
let parsedMsg = {state: 1, channel: channelName}; // the fun part...
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); // the user has said they want this pad to expire at some point
}); tasks.write(metadata.expire, "EXPIRE", [ channelName ], function (err) {
}); if (err) {
} else if (parsed[0] === 'GET_HISTORY_RANGE') { // if there is an error, we don't want to crash the whole server...
channelName = parsed[1]; // just log it, and if there's a problem you'll be able to fix it
var map = parsed[2]; // at a later date with the provided information
if (!(map && typeof(map) === 'object')) { Log.error('HK_CREATE_EXPIRE_TASK', err);
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]); Log.info('HK_INVALID_EXPIRE_TASK', JSON.stringify([metadata.expire, 'EXPIRE', channelName]));
} }
});
var oldestKnownHash = map.from;
var desiredMessages = map.count;
var desiredCheckpoint = map.cpCount;
var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
}
if (!txid) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
}
sendMsg(ctx, user, [seq, 'ACK']);
return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
var toSend = [];
if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages);
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);
if (cpCount >= desiredCheckpoint) { break; }
} }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(metadata)]);
} }
toSend.forEach(function (msg) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
});
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, // End of history message:
JSON.stringify(['HISTORY_RANGE_END', txid, channelName]) let parsedMsg = {state: 1, channel: channelName};
]);
});
} else if (parsed[0] === 'GET_FULL_HISTORY') {
// parsed[1] is the channel id
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
// FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message];
}
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
}); });
} else if (rpc) { });
/* RPC Calls... */ };
var rpc_call = parsed.slice(1);
sendMsg(ctx, user, [seq, 'ACK']); const handleGetHistoryRange = function (ctx, seq, user, parsed) {
try { var channelName = parsed[1];
var map = parsed[2];
if (!(map && typeof(map) === 'object')) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'INVALID_ARGS', HISTORY_KEEPER_ID]);
}
var oldestKnownHash = map.from;
var desiredMessages = map.count;
var desiredCheckpoint = map.cpCount;
var txid = map.txid;
if (typeof(desiredMessages) !== 'number' && typeof(desiredCheckpoint) !== 'number') {
return void sendMsg(ctx, user, [seq, 'ERROR', 'UNSPECIFIED_COUNT', HISTORY_KEEPER_ID]);
}
if (!txid) {
return void sendMsg(ctx, user, [seq, 'ERROR', 'NO_TXID', HISTORY_KEEPER_ID]);
}
sendMsg(ctx, user, [seq, 'ACK']);
return void getOlderHistory(channelName, oldestKnownHash, function (messages) {
var toSend = [];
if (typeof (desiredMessages) === "number") {
toSend = messages.slice(-desiredMessages);
} else {
let cpCount = 0;
for (var i = messages.length - 1; i >= 0; i--) {
if (/^cp\|/.test(messages[i][4]) && i !== (messages.length - 1)) {
cpCount++;
}
toSend.unshift(messages[i]);
if (cpCount >= desiredCheckpoint) { break; }
}
}
toSend.forEach(function (msg) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE', txid, msg])]);
});
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id,
JSON.stringify(['HISTORY_RANGE_END', txid, channelName])
]);
});
};
const handleGetFullHistory = function (ctx, seq, user, parsed) {
// parsed[1] is the channel id
// parsed[2] is a validation key (optionnal)
// parsed[3] is the last known hash (optionnal)
sendMsg(ctx, user, [seq, 'ACK']);
// FIXME should we send metadata here too?
// none of the clientside code which uses this API needs metadata, but it won't hurt to send it (2019-08-22)
return void getHistoryAsync(ctx, parsed[1], -1, false, (msg, readMore) => {
if (!msg) { return; }
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(['FULL_HISTORY', msg])], readMore);
}, (err) => {
let parsedMsg = ['FULL_HISTORY_END', parsed[1]];
if (err) {
Log.error('HK_GET_FULL_HISTORY', err.stack);
parsedMsg = ['ERROR', parsed[1], err.message];
}
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(parsedMsg)]);
});
};
const handleRPC = function (ctx, seq, user, parsed) {
if (typeof(rpc) !== 'function') { return; }
/* RPC Calls... */
var rpc_call = parsed.slice(1);
sendMsg(ctx, user, [seq, 'ACK']);
try {
// slice off the sequence number and pass in the rest of the message // slice off the sequence number and pass in the rest of the message
rpc(ctx, rpc_call, function (err, output) { rpc(ctx, rpc_call, function (err, output) {
if (err) { if (err) {
@ -992,12 +969,45 @@ module.exports.create = function (cfg) {
// finally, send a response to the client that sent the RPC // finally, send a response to the client that sent the RPC
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0]].concat(output))]);
}); });
} catch (e) { } catch (e) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]); sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify([parsed[0], 'ERROR', 'SERVER_ERROR'])]);
}
} }
}; };
/* onDirectMessage
* exported for use by the netflux-server
* parses and handles all direct messages directed to the history keeper
* check if it's expired and execute all the associated side-effects
* routes queries to the appropriate handlers
*/
const onDirectMessage = function (ctx, seq, user, json) {
Log.silly('HK_MESSAGE', json);
let parsed;
try {
parsed = JSON.parse(json[2]);
} catch (err) {
Log.error("HK_PARSE_CLIENT_MESSAGE", json);
return;
}
// If the requested history is for an expired channel, abort
// Note the if we don't have the keys for that channel in metadata_cache, we'll
// have to abort later (once we know the expiration time)
if (checkExpired(ctx, parsed[1])) { return; }
if (parsed[0] === 'GET_HISTORY') {
return void handleGetHistory(ctx, seq, user, parsed);
}
if (parsed[0] === 'GET_HISTORY_RANGE') {
return void handleGetHistoryRange(ctx, seq, user, parsed);
}
if (parsed[0] === 'GET_FULL_HISTORY') {
return void handleGetFullHistory(ctx, seq, user, parsed);
}
return void handleRPC(ctx, seq, user, parsed);
};
return { return {
id: HISTORY_KEEPER_ID, id: HISTORY_KEEPER_ID,
setConfig: setConfig, setConfig: setConfig,