add clear channel rpc
This commit is contained in:
@@ -6,6 +6,65 @@ var mkPath = function (env, channelId) {
|
||||
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
|
||||
};
|
||||
|
||||
var getMetadataAtPath = function (Env, path, cb) {
|
||||
var remainder = '';
|
||||
var stream = Fs.createReadStream(path, 'utf8');
|
||||
var complete = function (err, data) {
|
||||
var _cb = cb;
|
||||
cb = undefined;
|
||||
if (_cb) { _cb(err, data); }
|
||||
};
|
||||
stream.on('data', function (chunk) {
|
||||
if (!/\n/.test(chunk)) {
|
||||
remainder += chunk;
|
||||
return;
|
||||
}
|
||||
stream.close();
|
||||
var metadata = chunk.split('\n')[0];
|
||||
|
||||
var parsed = null;
|
||||
try {
|
||||
parsed = JSON.parse(metadata);
|
||||
complete(void 0, parsed);
|
||||
}
|
||||
catch (e) {
|
||||
console.log();
|
||||
console.error(e);
|
||||
complete('INVALID_METADATA');
|
||||
}
|
||||
});
|
||||
stream.on('end', function () {
|
||||
complete(null);
|
||||
});
|
||||
stream.on('error', function (e) { complete(e); });
|
||||
};
|
||||
|
||||
var getChannelMetadata = function (Env, channelId, cb) {
|
||||
var path = mkPath(Env, channelId);
|
||||
getMetadataAtPath(Env, path, cb);
|
||||
};
|
||||
|
||||
var clearChannel = function (Env, channelId, cb) {
|
||||
var path = mkPath(Env, channelId);
|
||||
getMetadataAtPath(Env, path, function (e, metadata) {
|
||||
if (e) { return cb(e); }
|
||||
if (!metadata) {
|
||||
return void Fs.truncate(path, 0, function (err) {
|
||||
if (err) {
|
||||
return cb(err);
|
||||
}
|
||||
cb(void 0);
|
||||
});
|
||||
}
|
||||
|
||||
var len = JSON.stringify(metadata).length + 1;
|
||||
Fs.truncate(path, len, function (err) {
|
||||
if (err) { return cb(err); }
|
||||
closeChannel(Env, channelId, cb);
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
var readMessages = function (path, msgHandler, cb) {
|
||||
var remainder = '';
|
||||
var stream = Fs.createReadStream(path, 'utf8');
|
||||
@@ -260,6 +319,12 @@ module.exports.create = function (conf, cb) {
|
||||
getChannelSize: function (chanName, cb) {
|
||||
channelBytes(env, chanName, cb);
|
||||
},
|
||||
getChannelMetadata: function (channelName, cb) {
|
||||
getChannelMetadata(env, channelName, cb);
|
||||
},
|
||||
clearChannel: function (channelName, cb) {
|
||||
clearChannel(env, channelName, cb);
|
||||
},
|
||||
});
|
||||
});
|
||||
setInterval(function () {
|
||||
|
||||
Reference in New Issue
Block a user