resolve silly conflict and merge staging

This commit is contained in:
ansuz
2020-04-09 16:35:24 -04:00
29 changed files with 273 additions and 357 deletions

View File

@@ -54,16 +54,8 @@ Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
});
};
Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) {
return cb('INVALID_ARGUMENTS');
}
var archiveOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
if (Env.blobStore.isFileId(channelId)) {
return void Env.removeOwnedBlob(channelId, safeKey, cb);
}
Metadata.getMetadata(Env, channelId, function (err, metadata) {
if (err) { return void cb(err); }
if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); }
@@ -124,6 +116,24 @@ Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb, Server) {
});
};
Channel.removeOwnedChannel = function (Env, safeKey, channelId, __cb, Server) {
var _cb = Util.once(Util.mkAsync(__cb));
if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) {
return _cb('INVALID_ARGUMENTS');
}
// archiving large channels or files can be expensive, so do it one at a time
// for any given user to ensure that nobody can use too much of the server's resources
Env.queueDeletes(safeKey, function (next) {
var cb = Util.both(_cb, next);
if (Env.blobStore.isFileId(channelId)) {
return void Env.removeOwnedBlob(channelId, safeKey, cb);
}
archiveOwnedChannel(Env, safeKey, channelId, cb, Server);
});
};
Channel.trimHistory = function (Env, safeKey, data, cb) {
if (!(data && typeof(data.channel) === 'string' && typeof(data.hash) === 'string' && data.hash.length === 64)) {
return void cb('INVALID_ARGS');

View File

@@ -49,16 +49,19 @@ var loadUserPins = function (Env, safeKey, cb) {
// only put this into the cache if it completes
session.channels = value;
}
session.channels = value;
done(value);
});
});
};
var truthyKeys = function (O) {
return Object.keys(O).filter(function (k) {
return O[k];
});
try {
return Object.keys(O).filter(function (k) {
return O[k];
});
} catch (err) {
return [];
}
};
var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) {

View File

@@ -38,6 +38,7 @@ module.exports.create = function (config, cb) {
metadata_cache: {},
channel_cache: {},
queueStorage: WriteQueue(),
queueDeletes: WriteQueue(),
batchIndexReads: BatchRead("HK_GET_INDEX"),
batchMetadata: BatchRead('GET_METADATA'),

View File

@@ -7,6 +7,9 @@ const Path = require("path");
const Util = require("./common-util");
const Plan = require("./plan");
const Semaphore = require('saferphore');
const nThen = require('nthen');
/* Accepts a reference to an object, and...
either a string describing which log is being processed (backwards compatibility),
or a function which will log the error with all relevant data
@@ -194,3 +197,63 @@ Pins.list = function (_done, config) {
}).start();
});
};
Pins.load = function (cb, config) {
const sema = Semaphore.create(config.workers || 5);
let dirList;
const fileList = [];
const pinned = {};
var pinPath = config.pinPath || './pins';
var done = Util.once(cb);
nThen((waitFor) => {
// recurse over the configured pinPath, or the default
Fs.readdir(pinPath, waitFor((err, list) => {
if (err) {
if (err.code === 'ENOENT') {
dirList = [];
return; // this ends up calling back with an empty object
}
waitFor.abort();
return void done(err);
}
dirList = list;
}));
}).nThen((waitFor) => {
dirList.forEach((f) => {
sema.take((returnAfter) => {
// iterate over all the subdirectories in the pin store
Fs.readdir(Path.join(pinPath, f), waitFor(returnAfter((err, list2) => {
if (err) {
waitFor.abort();
return void done(err);
}
list2.forEach((ff) => {
if (config && config.exclude && config.exclude.indexOf(ff) > -1) { return; }
fileList.push(Path.join(pinPath, f, ff));
});
})));
});
});
}).nThen((waitFor) => {
fileList.forEach((f) => {
sema.take((returnAfter) => {
Fs.readFile(f, waitFor(returnAfter((err, content) => {
if (err) {
waitFor.abort();
return void done(err);
}
const hashes = Pins.calculateFromLog(content.toString('utf8'), f);
hashes.forEach((x) => {
(pinned[x] = pinned[x] || {})[f.replace(/.*\/([^/]*).ndjson$/, (x, y)=>y)] = 1;
});
})));
});
});
}).nThen(() => {
done(void 0, pinned);
});
};

View File

@@ -14,6 +14,28 @@ const readFileBin = require("../stream-file").readFileBin;
const BatchRead = require("../batch-read");
const Schedule = require("../schedule");
/* Each time you write to a channel it will either use an open file descriptor
for that channel or open a new descriptor if one is not available. These are
automatically closed after this window to prevent a file descriptor leak, so
writes that take longer than this time may be dropped! */
const CHANNEL_WRITE_WINDOW = 300000;
/* Each time you read a channel it will have this many milliseconds to complete
otherwise it will be closed to prevent a file descriptor leak. The server will
lock up if it uses all available file descriptors, so it's important to close
them. The tradeoff with this timeout is that some functions, the stream, and
and the timeout itself are stored in memory. A longer timeout uses more memory
and running out of memory will also kill the server. */
const STREAM_CLOSE_TIMEOUT = 300000;
/* The above timeout closes the stream, but apparently that doesn't always work.
We set yet another timeout to allow the runtime to gracefully close the stream
(flushing all pending writes/reads and doing who knows what else). After this timeout
it will be MERCILESSLY DESTROYED. This isn't graceful, but again, file descriptor
leaks are bad. */
const STREAM_DESTROY_TIMEOUT = 30000;
const isValidChannelId = function (id) {
return typeof(id) === 'string' &&
id.length >= 32 && id.length < 50 &&
@@ -64,7 +86,7 @@ const destroyStream = function (stream) {
try { stream.close(); } catch (err) { console.error(err); }
setTimeout(function () {
try { stream.destroy(); } catch (err) { console.error(err); }
}, 15000);
}, STREAM_DESTROY_TIMEOUT);
};
const ensureStreamCloses = function (stream, id, ms) {
@@ -74,7 +96,7 @@ const ensureStreamCloses = function (stream, id, ms) {
// this can only be a timeout error...
console.log("stream close error:", err, id);
}
}), ms || 45000), []);
}), ms || STREAM_CLOSE_TIMEOUT), []);
};
// readMessagesBin asynchronously iterates over the messages in a channel log
@@ -729,7 +751,7 @@ var getChannel = function (env, id, _callback) {
delete env.channels[id];
destroyStream(channel.writeStream, path);
//console.log("closing writestream");
}, 120000);
}, CHANNEL_WRITE_WINDOW);
channel.delayClose();
env.channels[id] = channel;
done(void 0, channel);