close streams whenever we finish using them. time out if necessary

This commit is contained in:
ansuz
2020-03-19 16:11:24 -04:00
parent d1b16af160
commit 05a4e86cdb
2 changed files with 57 additions and 114 deletions

View File

@@ -58,6 +58,23 @@ var channelExists = function (filepath, cb) {
});
};
const destroyStream = function (stream) {
stream.close();
setTimeout(function () {
try { stream.destroy(); } catch (err) { console.log(err); }
}, 5000);
};
const ensureStreamCloses = function (stream, id, ms) {
return Util.bake(Util.mkTimeout(Util.once(function (err) {
destroyStream(stream, id);
if (err) {
// this can only be a timeout error...
console.log("stream close error:", err, id);
}
}), ms || 15000), []);
};
// readMessagesBin asynchronously iterates over the messages in a channel log
// the handler for each message must call back to read more, which should mean
// that this function has a lower memory profile than our classic method
@@ -65,9 +82,10 @@ var channelExists = function (filepath, cb) {
// it also allows the handler to abort reading at any time
const readMessagesBin = (env, id, start, msgHandler, cb) => {
const stream = Fs.createReadStream(mkPath(env, id), { start: start });
const finish = ensureStreamCloses(stream, id);
return void readFileBin(stream, msgHandler, function (err) {
try { stream.close(); } catch (err2) { }
cb(err);
finish();
});
};
@@ -75,24 +93,13 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
// returns undefined if the first message was not an object (not an array)
var getMetadataAtPath = function (Env, path, _cb) {
const stream = Fs.createReadStream(path, { start: 0 });
// cb implicitly destroys the stream, if it exists
// and calls back asynchronously no more than once
/*
var cb = Util.once(Util.both(function () {
try {
stream.destroy();
} catch (err) {
return err;
}
}, Util.mkAsync(_cb)));
*/
var cb = Util.once(Util.mkAsync(_cb), function () {
const finish = ensureStreamCloses(stream, path);
var cb = Util.once(Util.mkAsync(Util.both(_cb, finish)), function () {
throw new Error("Multiple Callbacks");
});
var i = 0;
return readFileBin(stream, function (msgObj, readMore, abort) {
const line = msgObj.buff.toString('utf8');
@@ -130,7 +137,8 @@ var closeChannel = function (env, channelName, cb) {
if (!env.channels[channelName]) { return void cb(); }
try {
if (typeof(Util.find(env, [ 'channels', channelName, 'writeStream', 'close'])) === 'function') {
env.channels[channelName].writeStream.close();
var stream = env.channels[channelName].writeStream;
destroyStream(stream, channelName);
}
delete env.channels[channelName];
env.openFiles--;
@@ -172,11 +180,15 @@ var clearChannel = function (env, channelId, _cb) {
*/
var readMessages = function (path, msgHandler, _cb) {
var stream = Fs.createReadStream(path, { start: 0});
var cb = Util.once(Util.mkAsync(_cb));
const finish = ensureStreamCloses(stream, path);
var cb = Util.once(Util.mkAsync(Util.both(finish, _cb)));
return readFileBin(stream, function (msgObj, readMore) {
msgHandler(msgObj.buff.toString('utf8'));
readMore();
}, cb);
}, function (err) {
cb(err);
});
};
/* getChannelMetadata
@@ -192,9 +204,13 @@ var getChannelMetadata = function (Env, channelId, cb) {
};
// low level method for getting just the dedicated metadata channel
var getDedicatedMetadata = function (env, channelId, handler, cb) {
var getDedicatedMetadata = function (env, channelId, handler, _cb) {
var metadataPath = mkMetadataPath(env, channelId);
var stream = Fs.createReadStream(metadataPath, {start: 0});
const finish = ensureStreamCloses(stream, metadataPath);
var cb = Util.both(finish, _cb);
readFileBin(stream, function (msgObj, readMore) {
var line = msgObj.buff.toString('utf8');
try {
@@ -679,11 +695,12 @@ export type ChainPadServer_ChannelInternal_t = {
path: string
};
*/
var getChannel = function (
var getChannel = function ( // XXX BatchRead
env,
id,
_callback /*:(err:?Error, chan:?ChainPadServer_ChannelInternal_t)=>void*/
) {
//console.log("getting channel [%s]", id);
var callback = Util.once(Util.mkAsync(_callback));
if (env.channels[id]) {
var chan = env.channels[id];
@@ -702,6 +719,7 @@ var getChannel = function (
// if you're running out of open files, asynchronously clean up expired files
// do it on a shorter timeframe, though (half of normal)
setTimeout(function () {
//console.log("FLUSHING UNUSED CHANNELS");
flushUnusedChannels(env, function () {
if (env.verbose) {
console.log("Approaching open file descriptor limit. Cleaning up");
@@ -740,7 +758,7 @@ var getChannel = function (
fileExists = exists;
}));
}).nThen(function (waitFor) {
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' });
var stream = channel.writeStream = Fs.createWriteStream(path, { flags: 'a' }); // XXX
env.openFiles++;
stream.on('open', waitFor());
stream.on('error', function (err /*:?Error*/) {
@@ -762,12 +780,12 @@ var getChannel = function (
// write a message to the disk as raw bytes
const messageBin = (env, chanName, msgBin, cb) => {
var complete = Util.once(cb);
getChannel(env, chanName, function (err, chan) {
getChannel(env, chanName, function (err, chan) { // XXX
if (!chan) { return void complete(err); }
chan.onError.push(complete);
chan.writeStream.write(msgBin, function () {
chan.onError.splice(chan.onError.indexOf(complete), 1);
chan.atime = +new Date();
chan.atime = +new Date(); // XXX we should just throttle closing, much simpler and less error prone
complete();
});
});
@@ -781,33 +799,22 @@ var message = function (env, chanName, msg, cb) {
// stream messages from a channel log
// TODO replace getMessages with readFileBin
var getMessages = function (env, chanName, handler, cb) {
getChannel(env, chanName, function (err, chan) {
if (!chan) {
cb(err);
return;
var errorState = false;
var path = mkPath(env, chanName);
readMessages(path, function (msg) {
if (!msg || errorState) { return; }
try {
handler(msg);
} catch (e) {
errorState = true;
return void cb(e);
}
var errorState = false;
readMessages(chan.path, function (msg) {
if (!msg || errorState) { return; }
//console.log(msg);
try {
handler(msg);
} catch (e) {
errorState = true;
return void cb(err);
}
}, function (err) {
if (err) {
errorState = true;
return void cb(err);
}
// is it really, though? what if we hit the limit of open channels
// and 'clean up' in the middle of reading a massive file?
// certainly unlikely
if (!chan) { throw new Error("impossible, flow checking"); }
chan.atime = +new Date();
cb();
});
}, function (err) {
if (err) {
errorState = true;
return void cb(err);
}
cb();
});
};
@@ -832,12 +839,7 @@ var trimChannel = function (env, channelName, hash, _cb) {
var ABORT;
var cleanUp = function (cb) {
if (tempStream && !tempStream.closed) {
try {
tempStream.close();
} catch (err) { }
}
destroyStream(tempStream);
Fse.unlink(tempChannelPath, function (err) {
// proceed if deleted or if there was nothing to delete
if (!err || err.code === 'ENOENT') { return cb(); }