semi-functional allow-list implementation in historyKeeper

This commit is contained in:
ansuz
2020-02-19 14:22:12 -05:00
parent 521db379a0
commit f579c9b059
6 changed files with 147 additions and 66 deletions

View File

@@ -4,7 +4,7 @@ var HK = module.exports;
const nThen = require('nthen');
const Util = require("./common-util");
const Meta = require("./metadata");
const MetaRPC = require("./commands/metadata");
const Nacl = require('tweetnacl/nacl-fast');
const now = function () { return (new Date()).getTime(); };
@@ -80,6 +80,23 @@ const isChannelRestricted = function (metadata) { // XXX RESTRICT
return false;
};
HK.listAllowedUsers = function (metadata) {
return (metadata.owners || []).concat((metadata.allowed || []));
};
HK.getNetfluxSession = function (Env, netfluxId) {
return Env.netfluxUsers[netfluxId];
};
HK.authenticateNetfluxSession = function (Env, netfluxId, unsafeKey) {
var user = Env.netfluxUsers[netfluxId] = Env.netfluxUsers[netfluxId] || {};
user[unsafeKey] = +new Date();
};
HK.closeNetfluxSession = function (Env, netfluxId) {
delete Env.netfluxUsers[netfluxId];
};
const isUserAllowed = function (metadata, userId) { // XXX RESTRICT
/*
@@ -174,6 +191,23 @@ const checkExpired = function (Env, Server, channel) {
return true;
};
const getMetadata = HK.getMetadata = function (Env, channelName, cb) {
var metadata = Env.metadata_cache[channelName];
if (metadata && typeof(metadata) === 'object') {
return void Util.mkAsync(cb)(undefined, metadata);
}
MetaRPC.getMetadata(Env, channelName, function (err, metadata) {
if (err) {
console.error(err);
return void cb(err);
}
// cache it
Env.metadata_cache[channelName] = metadata;
cb(undefined, metadata);
});
};
/* computeIndex
can call back with an error or a computed index which includes:
* cpIndex:
@@ -203,13 +237,18 @@ const computeIndex = function (Env, channelName, cb) {
let metadata;
let i = 0;
const ref = {};
const CB = Util.once(cb);
const offsetByHash = {};
let size = 0;
nThen(function (w) {
getMetadata(Env, channelName, w(function (err, _metadata) {
if (err) {
throw new Error(err);
}
metadata = _metadata;
}));
}).nThen(function (w) {
// iterate over all messages in the channel log
// old channels can contain metadata as the first message of the log
// remember metadata the first time you encounter it
@@ -218,14 +257,15 @@ const computeIndex = function (Env, channelName, cb) {
let msg;
// keep an eye out for the metadata line if you haven't already seen it
// but only check for metadata on the first line
if (!i && !metadata && msgObj.buff.indexOf('{') === 0) {
if (!i && msgObj.buff.indexOf('{') === 0) { // XXX RESTRICT metadata...
i++; // always increment the message counter
msg = tryParse(Env, msgObj.buff.toString('utf8'));
if (typeof msg === "undefined") { return readMore(); }
// validate that the current line really is metadata before storing it as such
if (isMetadataMessage(msg)) {
metadata = msg;
if (isMetadataMessage(msg)) { // XXX RESTRICT
//metadata = msg; // XXX RESTRICT
// skip this, as you already have metadata...
return readMore();
}
}
@@ -268,26 +308,8 @@ const computeIndex = function (Env, channelName, cb) {
size = msgObj.offset + msgObj.buff.length + 1;
});
}));
}).nThen(function (w) {
// create a function which will iterate over amendments to the metadata
const handler = Meta.createLineHandler(ref, Log.error);
// initialize the accumulator in case there was a foundational metadata line in the log content
if (metadata) { handler(void 0, metadata); }
// iterate over the dedicated metadata log (if it exists)
// proceed even in the event of a stream error on the metadata log
store.readDedicatedMetadata(channelName, handler, w(function (err) {
if (err) {
return void Log.error("DEDICATED_METADATA_ERROR", err);
}
}));
}).nThen(function () {
// when all is done, cache the metadata in memory
if (ref.index) { // but don't bother if no metadata was found...
metadata = Env.metadata_cache[channelName] = ref.meta;
}
// and return the computed index
// return the computed index
CB(null, {
// Only keep the checkpoints included in the last 100 messages
cpIndex: sliceCpIndex(cpIndex, i),
@@ -316,9 +338,7 @@ const getIndex = (Env, channelName, cb) => {
// if there is a channel in memory and it has an index cached, return it
if (chan && chan.index) {
// enforce async behaviour
return void setTimeout(function () {
cb(undefined, chan.index);
});
return void Util.mkAsync(cb)(undefined, chan.index);
}
Env.batchIndexReads(channelName, cb, function (done) {
@@ -592,7 +612,7 @@ const handleRPC = function (Env, Server, seq, userId, parsed) {
Server.send(userId, [seq, 'ACK']);
try {
// slice off the sequence number and pass in the rest of the message
Env.rpc(Server, rpc_call, function (err, output) {
Env.rpc(Server, userId, rpc_call, function (err, output) {
if (err) {
Server.send(userId, [0, HISTORY_KEEPER_ID, 'MSG', userId, JSON.stringify([parsed[0], 'ERROR', err])]);
return;