implement per-channel fifo queues for metadata and channel writes

This commit is contained in:
ansuz
2019-09-04 11:51:33 +02:00
parent 41c86bb6bc
commit 4902554a61
3 changed files with 107 additions and 96 deletions

View File

@@ -7,6 +7,7 @@ const Nacl = require('tweetnacl');
const Crypto = require('crypto');
const Once = require("./lib/once");
const Meta = require("./lib/metadata");
const WriteQueue = require("./lib/write-queue");
let Log;
const now = function () { return (new Date()).getTime(); };
@@ -302,88 +303,59 @@ module.exports.create = function (cfg) {
* the fix is to use callbacks and implement queueing for writes
* to guarantee that offset computation is always atomic with writes
*/
const storageQueues = {};
const storeQueuedMessage = function (ctx, queue, id) {
if (queue.length === 0) {
delete storageQueues[id];
return;
}
const first = queue.shift();
const msgBin = first.msg;
const optionalMessageHash = first.hash;
const isCp = first.isCp;
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
nThen((waitFor) => {
store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
waitFor.abort();
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
storeQueuedMessage(ctx, queue, id);
return;
}
}));
}).nThen((waitFor) => {
getIndex(ctx, id, waitFor((err, index) => {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
// proceed to the next message in the queue
storeQueuedMessage(ctx, queue, id);
return;
}
if (typeof (index.line) === "number") { index.line++; }
if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) {
if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k];
}
}
index.cpIndex.push(({
offset: index.size,
line: ((index.line || 0) + 1)
} /*:cp_index_item*/));
}
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length;
// handle the next element in the queue
storeQueuedMessage(ctx, queue, id);
}));
});
};
const queueStorage = WriteQueue();
const storeMessage = function (ctx, channel, msg, isCp, optionalMessageHash) {
const id = channel.id;
const msgBin = new Buffer(msg + '\n', 'utf8');
if (Array.isArray(storageQueues[id])) {
return void storageQueues[id].push({
msg: msgBin,
hash: optionalMessageHash,
isCp: isCp,
});
}
const queue = storageQueues[id] = (storageQueues[id] || [{
msg: msgBin,
hash: optionalMessageHash,
}]);
storeQueuedMessage(ctx, queue, id);
queueStorage(id, function (next) {
// Store the message first, and update the index only once it's stored.
// store.messageBin can be async so updating the index first may
// result in a wrong cpIndex
nThen((waitFor) => {
store.messageBin(id, msgBin, waitFor(function (err) {
if (err) {
waitFor.abort();
Log.error("HK_STORE_MESSAGE_ERROR", err.message);
// this error is critical, but there's not much we can do at the moment
// proceed with more messages, but they'll probably fail too
// at least you won't have a memory leak
// TODO make it possible to respond to clients with errors so they know
// their message wasn't stored
return void next();
}
}));
}).nThen((waitFor) => {
getIndex(ctx, id, waitFor((err, index) => {
if (err) {
Log.warn("HK_STORE_MESSAGE_INDEX", err.stack);
// non-critical, we'll be able to get the channel index later
return void next();
}
if (typeof (index.line) === "number") { index.line++; }
if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) {
if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k];
}
}
index.cpIndex.push(({
offset: index.size,
line: ((index.line || 0) + 1)
} /*:cp_index_item*/));
}
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
index.size += msgBin.length;
// handle the next element in the queue
next();
}));
});
});
};
var CHECKPOINT_PATTERN = /^cp\|(([A-Za-z0-9+\/=]+)\|)?/;