Added a new RPC to get file offsets of messages by hash or of last 2 checkpoints, also improved checking of valid channel names and fixed a pull-stream bug and exposed async-store to the window

This commit is contained in:
Caleb James DeLisle
2018-01-23 16:31:59 +01:00
parent 1f34e47cbf
commit 014aacc76a
3 changed files with 57 additions and 10 deletions

View File

@@ -7,6 +7,12 @@ var nThen = require("nthen");
const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream');
const isValidChannelId = function (id) {
return typeof(id) === 'string' &&
[32, 48].indexOf(id.length) > -1 &&
/^[a-zA-Z0-9=+-]*$/.test(id);
};
var mkPath = function (env, channelId) {
return Path.join(env.root, channelId.slice(0, 2), channelId) + '.ndjson';
};
@@ -161,7 +167,9 @@ const readMessagesBin = (env, id, start, msgHandler, cb) => {
mkBufferSplit(),
mkOffsetCounter(),
Pull.asyncMap((data, moreCb) => { msgHandler(data, moreCb, ()=>{ keepReading = false; moreCb(); }); }),
Pull.drain(()=>(keepReading), cb)
Pull.drain(() => (keepReading), (err) => {
cb((keepReading) ? err : undefined);
})
);
};
@@ -414,35 +422,44 @@ module.exports.create = function (
}
cb({
readMessagesBin: (channelName, start, asyncMsgHandler, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
readMessagesBin(env, channelName, start, asyncMsgHandler, cb);
},
message: function (channelName, content, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
message(env, channelName, content, cb);
},
messageBin: (channelName, content, cb) => {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
messageBin(env, channelName, content, cb);
},
getMessages: function (channelName, msgHandler, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getMessages(env, channelName, msgHandler, cb);
},
removeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
removeChannel(env, channelName, function (err) {
cb(err);
});
},
closeChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
closeChannel(env, channelName, cb);
},
flushUnusedChannels: function (cb) {
flushUnusedChannels(env, cb);
},
getChannelSize: function (chanName, cb) {
channelBytes(env, chanName, cb);
getChannelSize: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
channelBytes(env, channelName, cb);
},
getChannelMetadata: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
getChannelMetadata(env, channelName, cb);
},
clearChannel: function (channelName, cb) {
if (!isValidChannelId(channelName)) { return void cb(new Error('EINVAL')); }
clearChannel(env, channelName, cb);
},
});