Merge branch 'soon' into staging

This commit is contained in:
ansuz
2020-03-30 18:30:34 -04:00
6 changed files with 288 additions and 16 deletions

View File

@@ -235,6 +235,54 @@ const getIndex = (Env, channelName, cb) => {
});
};
/* checkOffsetMap
Sorry for the weird function --ansuz
This should be almost equivalent to `Object.keys(map).length` except
that is will use less memory by not allocating space for the temporary array.
Beyond that, it returns length * -1 if any of the members of the map
are not in ascending order. The function for removing older members of the map
loops over elements in order and deletes them, so ordering is important!
*/
var checkOffsetMap = function (map) {
var prev = 0;
var cur;
var ooo = 0; // out of order
var count = 0;
for (let k in map) {
count++;
cur = map[k];
if (!ooo && prev > cur) { ooo = true; }
prev = cur;
}
return ooo ? count * -1: count;
};
/* Pass the map and the number of elements it contains */
var trimOffsetByOrder = function (map, n) {
var toRemove = Math.max(n - 50, 0);
var i = 0;
for (let k in map) {
if (i >= toRemove) { return; }
i++;
delete map[k];
}
};
/* Remove from the map any byte offsets which are below
the lowest offset you'd like to preserve
(probably the oldest checkpoint */
var trimMapByOffset = function (map, offset) {
if (!offset) { return; }
for (let k in map) {
if (map[k] < offset) {
delete map[k];
}
}
};
/* storeMessage
* channel id
* the message to store
@@ -286,17 +334,28 @@ const storeMessage = function (Env, channel, msg, isCp, optionalMessageHash) {
if (typeof (index.line) === "number") { index.line++; }
if (isCp) {
index.cpIndex = sliceCpIndex(index.cpIndex, index.line || 0);
for (let k in index.offsetByHash) {
if (index.offsetByHash[k] < index.cpIndex[0]) {
delete index.offsetByHash[k];
}
}
trimMapByOffset(index.offsetByHash, index.cpIndex[0]);
index.cpIndex.push({
offset: index.size,
line: ((index.line || 0) + 1)
});
}
if (optionalMessageHash) { index.offsetByHash[optionalMessageHash] = index.size; }
if (optionalMessageHash) {
index.offsetByHash[optionalMessageHash] = index.size;
index.offsets++;
}
if (index.offsets >= 100 && !index.cpIndex.length) {
let offsetCount = checkOffsetMap(index.offsetByHash);
if (offsetCount < 0) {
Log.warn('OFFSET_TRIM_OOO', {
channel: id,
map: index.OffsetByHash
});
} else if (offsetCount > 0) {
trimOffsetByOrder(index.offsetByHash, index.offsets);
index.offsets = checkOffsetMap(index.offsetByHash);
}
}
index.size += msgBin.length;
// handle the next element in the queue