Merge branch 'staging' into communities-trim

This commit is contained in:
yflory
2020-02-05 17:05:45 +01:00
51 changed files with 3274 additions and 2442 deletions

75
lib/api.js Normal file
View File

@@ -0,0 +1,75 @@
/* jshint esversion: 6 */
const nThen = require("nthen");
const WebSocketServer = require('ws').Server;
const NetfluxSrv = require('chainpad-server');
module.exports.create = function (config) {
const wsConfig = {
server: config.httpServer,
};
nThen(function (w) {
require('../storage/file').create(config, w(function (_store) {
config.store = _store;
}));
}).nThen(function (w) {
// XXX embed this in historyKeeper
require("../storage/tasks").create(config, w(function (e, tasks) {
if (e) {
throw e;
}
config.tasks = tasks;
if (config.disableIntegratedTasks) { return; }
config.intervals = config.intervals || {};
config.intervals.taskExpiration = setInterval(function () {
tasks.runAll(function (err) {
if (err) {
// either TASK_CONCURRENCY or an error with tasks.list
// in either case it is already logged.
}
});
}, 1000 * 60 * 5); // run every five minutes
}));
}).nThen(function () {
// asynchronously create a historyKeeper and RPC together
require('./historyKeeper.js').create(config, function (err, historyKeeper) {
if (err) { throw err; }
var log = config.log;
// spawn ws server and attach netflux event handlers
NetfluxSrv.create(new WebSocketServer(wsConfig))
.on('channelClose', historyKeeper.channelClose)
.on('channelMessage', historyKeeper.channelMessage)
.on('channelOpen', historyKeeper.channelOpen)
.on('sessionClose', function (userId, reason) {
if (['BAD_MESSAGE', 'SOCKET_ERROR', 'SEND_MESSAGE_FAIL_2'].indexOf(reason) !== -1) {
return void log.error('SESSION_CLOSE_WITH_ERROR', {
userId: userId,
reason: reason,
});
}
log.verbose('SESSION_CLOSE_ROUTINE', {
userId: userId,
reason: reason,
});
})
.on('error', function (error, label, info) {
if (!error) { return; }
/* labels:
SEND_MESSAGE_FAIL, SEND_MESSAGE_FAIL_2, FAIL_TO_DISCONNECT,
FAIL_TO_TERMINATE, HANDLE_CHANNEL_LEAVE, NETFLUX_BAD_MESSAGE,
NETFLUX_WEBSOCKET_ERROR
*/
log.error(label, {
code: error.code,
message: error.message,
stack: error.stack,
info: info,
});
})
.register(historyKeeper.id, historyKeeper.directMessage);
});
});
};

119
lib/commands/admin-rpc.js Normal file
View File

@@ -0,0 +1,119 @@
/*jshint esversion: 6 */
const BatchRead = require("../batch-read");
const nThen = require("nthen");
const getFolderSize = require("get-folder-size");
var Fs = require("fs");
var Admin = module.exports;
var getActiveSessions = function (Env, Server, cb) {
var stats = Server.getSessionStats();
cb(void 0, [
stats.total,
stats.unique
]);
};
var shutdown = function (Env, Server, cb) {
if (true) {
return void cb('E_NOT_IMPLEMENTED');
}
// disconnect all users and reject new connections
Server.shutdown();
// stop all intervals that may be running
Object.keys(Env.intervals).forEach(function (name) {
clearInterval(Env.intervals[name]);
});
// set a flag to prevent incoming database writes
// wait until all pending writes are complete
// then process.exit(0);
// and allow system functionality to restart the server
};
const batchRegisteredUsers = BatchRead("GET_REGISTERED_USERS");
var getRegisteredUsers = function (Env, cb) {
batchRegisteredUsers('', cb, function (done) {
var dir = Env.paths.pin;
var folders;
var users = 0;
nThen(function (waitFor) {
Fs.readdir(dir, waitFor(function (err, list) {
if (err) {
waitFor.abort();
return void done(err);
}
folders = list;
}));
}).nThen(function (waitFor) {
folders.forEach(function (f) {
var dir = Env.paths.pin + '/' + f;
Fs.readdir(dir, waitFor(function (err, list) {
if (err) { return; }
users += list.length;
}));
});
}).nThen(function () {
done(void 0, users);
});
});
};
const batchDiskUsage = BatchRead("GET_DISK_USAGE");
var getDiskUsage = function (Env, cb) {
batchDiskUsage('', cb, function (done) {
var data = {};
nThen(function (waitFor) {
getFolderSize('./', waitFor(function(err, info) {
data.total = info;
}));
getFolderSize(Env.paths.pin, waitFor(function(err, info) {
data.pin = info;
}));
getFolderSize(Env.paths.blob, waitFor(function(err, info) {
data.blob = info;
}));
getFolderSize(Env.paths.staging, waitFor(function(err, info) {
data.blobstage = info;
}));
getFolderSize(Env.paths.block, waitFor(function(err, info) {
data.block = info;
}));
getFolderSize(Env.paths.data, waitFor(function(err, info) {
data.datastore = info;
}));
}).nThen(function () {
done(void 0, data);
});
});
};
Admin.command = function (Env, Server, publicKey, data, cb) {
var admins = Env.admins;
if (admins.indexOf(publicKey) === -1) {
return void cb("FORBIDDEN");
}
// Handle commands here
switch (data[0]) {
case 'ACTIVE_SESSIONS':
return getActiveSessions(Env, Server, cb);
case 'ACTIVE_PADS':
return cb(void 0, Server.getActiveChannelCount());
case 'REGISTERED_USERS':
return getRegisteredUsers(Env, cb);
case 'DISK_USAGE':
return getDiskUsage(Env, cb);
case 'FLUSH_CACHE':
Env.flushCache();
return cb(void 0, true);
case 'SHUTDOWN':
return shutdown(Env, Server, cb);
default:
return cb('UNHANDLED_ADMIN_COMMAND');
}
};

172
lib/commands/block.js Normal file
View File

@@ -0,0 +1,172 @@
/*jshint esversion: 6 */
/* globals Buffer*/
var Block = module.exports;
const Fs = require("fs");
const Fse = require("fs-extra");
const Path = require("path");
const Nacl = require("tweetnacl/nacl-fast");
const nThen = require("nthen");
const Util = require("../common-util");
/*
We assume that the server is secured against MitM attacks
via HTTPS, and that malicious actors do not have code execution
capabilities. If they do, we have much more serious problems.
The capability to replay a block write or remove results in either
a denial of service for the user whose block was removed, or in the
case of a write, a rollback to an earlier password.
Since block modification is destructive, this can result in loss
of access to the user's drive.
So long as the detached signature is never observed by a malicious
party, and the server discards it after proof of knowledge, replays
are not possible. However, this precludes verification of the signature
at a later time.
Despite this, an integrity check is still possible by the original
author of the block, since we assume that the block will have been
encrypted with xsalsa20-poly1305 which is authenticated.
*/
Block.validateLoginBlock = function (Env, publicKey, signature, block, cb) { // FIXME BLOCKS
// convert the public key to a Uint8Array and validate it
if (typeof(publicKey) !== 'string') { return void cb('E_INVALID_KEY'); }
var u8_public_key;
try {
u8_public_key = Nacl.util.decodeBase64(publicKey);
} catch (e) {
return void cb('E_INVALID_KEY');
}
var u8_signature;
try {
u8_signature = Nacl.util.decodeBase64(signature);
} catch (e) {
Env.Log.error('INVALID_BLOCK_SIGNATURE', e);
return void cb('E_INVALID_SIGNATURE');
}
// convert the block to a Uint8Array
var u8_block;
try {
u8_block = Nacl.util.decodeBase64(block);
} catch (e) {
return void cb('E_INVALID_BLOCK');
}
// take its hash
var hash = Nacl.hash(u8_block);
// validate the signature against the hash of the content
var verified = Nacl.sign.detached.verify(hash, u8_signature, u8_public_key);
// existing authentication ensures that users cannot replay old blocks
// call back with (err) if unsuccessful
if (!verified) { return void cb("E_COULD_NOT_VERIFY"); }
return void cb(null, u8_block);
};
var createLoginBlockPath = function (Env, publicKey) { // FIXME BLOCKS
// prepare publicKey to be used as a file name
var safeKey = Util.escapeKeyCharacters(publicKey);
// validate safeKey
if (typeof(safeKey) !== 'string') {
return;
}
// derive the full path
// /home/cryptpad/cryptpad/block/fg/fg32kefksjdgjkewrjksdfksjdfsdfskdjfsfd
return Path.join(Env.paths.block, safeKey.slice(0, 2), safeKey);
};
Block.writeLoginBlock = function (Env, msg, cb) { // FIXME BLOCKS
//console.log(msg);
var publicKey = msg[0];
var signature = msg[1];
var block = msg[2];
Block.validateLoginBlock(Env, publicKey, signature, block, function (e, validatedBlock) {
if (e) { return void cb(e); }
if (!(validatedBlock instanceof Uint8Array)) { return void cb('E_INVALID_BLOCK'); }
// derive the filepath
var path = createLoginBlockPath(Env, publicKey);
// make sure the path is valid
if (typeof(path) !== 'string') {
return void cb('E_INVALID_BLOCK_PATH');
}
var parsed = Path.parse(path);
if (!parsed || typeof(parsed.dir) !== 'string') {
return void cb("E_INVALID_BLOCK_PATH_2");
}
nThen(function (w) {
// make sure the path to the file exists
Fse.mkdirp(parsed.dir, w(function (e) {
if (e) {
w.abort();
cb(e);
}
}));
}).nThen(function () {
// actually write the block
// flow is dumb and I need to guard against this which will never happen
/*:: if (typeof(validatedBlock) === 'undefined') { throw new Error('should never happen'); } */
/*:: if (typeof(path) === 'undefined') { throw new Error('should never happen'); } */
Fs.writeFile(path, Buffer.from(validatedBlock), { encoding: "binary", }, function (err) {
if (err) { return void cb(err); }
cb();
});
});
});
};
/*
When users write a block, they upload the block, and provide
a signature proving that they deserve to be able to write to
the location determined by the public key.
When removing a block, there is nothing to upload, but we need
to sign something. Since the signature is considered sensitive
information, we can just sign some constant and use that as proof.
*/
Block.removeLoginBlock = function (Env, msg, cb) { // FIXME BLOCKS
var publicKey = msg[0];
var signature = msg[1];
var block = Nacl.util.decodeUTF8('DELETE_BLOCK'); // clients and the server will have to agree on this constant
Block.validateLoginBlock(Env, publicKey, signature, block, function (e /*::, validatedBlock */) {
if (e) { return void cb(e); }
// derive the filepath
var path = createLoginBlockPath(Env, publicKey);
// make sure the path is valid
if (typeof(path) !== 'string') {
return void cb('E_INVALID_BLOCK_PATH');
}
// FIXME COLDSTORAGE
Fs.unlink(path, function (err) {
Env.Log.info('DELETION_BLOCK_BY_OWNER_RPC', {
publicKey: publicKey,
path: path,
status: err? String(err): 'SUCCESS',
});
if (err) { return void cb(err); }
cb();
});
});
};

199
lib/commands/channel.js Normal file
View File

@@ -0,0 +1,199 @@
/*jshint esversion: 6 */
const Channel = module.exports;
const Util = require("../common-util");
const nThen = require("nthen");
const Core = require("./core");
const Metadata = require("./metadata");
Channel.clearOwnedChannel = function (Env, safeKey, channelId, cb) {
if (typeof(channelId) !== 'string' || channelId.length !== 32) {
return cb('INVALID_ARGUMENTS');
}
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
Metadata.getMetadata(Env, channelId, function (err, metadata) {
if (err) { return void cb(err); }
if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); }
// Confirm that the channel is owned by the user in question
if (!Core.isOwner(metadata, unsafeKey)) {
return void cb('INSUFFICIENT_PERMISSIONS');
}
return void Env.msgStore.clearChannel(channelId, function (e) {
cb(e);
});
});
};
Channel.removeOwnedChannel = function (Env, safeKey, channelId, cb) {
if (typeof(channelId) !== 'string' || !Core.isValidId(channelId)) {
return cb('INVALID_ARGUMENTS');
}
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
if (Env.blobStore.isFileId(channelId)) {
//var safeKey = Util.escapeKeyCharacters(unsafeKey);
var blobId = channelId;
return void nThen(function (w) {
// check if you have permissions
Env.blobStore.isOwnedBy(safeKey, blobId, w(function (err, owned) {
if (err || !owned) {
w.abort();
return void cb("INSUFFICIENT_PERMISSIONS");
}
}));
}).nThen(function (w) {
// remove the blob
return void Env.blobStore.archive.blob(blobId, w(function (err) {
Env.Log.info('ARCHIVAL_OWNED_FILE_BY_OWNER_RPC', {
safeKey: safeKey,
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
if (err) {
w.abort();
return void cb(err);
}
}));
}).nThen(function () {
// archive the proof
return void Env.blobStore.archive.proof(safeKey, blobId, function (err) {
Env.Log.info("ARCHIVAL_PROOF_REMOVAL_BY_OWNER_RPC", {
safeKey: safeKey,
blobId: blobId,
status: err? String(err): 'SUCCESS',
});
if (err) {
return void cb("E_PROOF_REMOVAL");
}
cb(void 0, 'OK');
});
});
}
Metadata.getMetadata(Env, channelId, function (err, metadata) {
if (err) { return void cb(err); }
if (!Core.hasOwners(metadata)) { return void cb('E_NO_OWNERS'); }
if (!Core.isOwner(metadata, unsafeKey)) {
return void cb('INSUFFICIENT_PERMISSIONS');
}
// temporarily archive the file
return void Env.msgStore.archiveChannel(channelId, function (e) {
Env.Log.info('ARCHIVAL_CHANNEL_BY_OWNER_RPC', {
unsafeKey: unsafeKey,
channelId: channelId,
status: e? String(e): 'SUCCESS',
});
if (e) {
return void cb(e);
}
cb(void 0, 'OK');
});
});
};
Channel.removeOwnedChannelHistory = function (Env, channelId, unsafeKey, hash, cb) { // XXX UNSAFE
nThen(function (w) {
Metadata.getMetadata(Env, channelId, w(function (err, metadata) {
if (err) { return void cb(err); }
if (!Core.hasOwners(metadata)) {
w.abort();
return void cb('E_NO_OWNERS');
}
if (!Core.isOwner(metadata, unsafeKey)) {
w.abort();
return void cb("INSUFFICIENT_PERMISSIONS");
}
// else fall through to the next block
}));
}).nThen(function () {
Env.msgStore.trimChannel(channelId, hash, function (err) {
if (err) { return void cb(err); }
// clear historyKeeper's cache for this channel
Env.historyKeeper.channelClose(channelId);
cb(void 0, 'OK');
});
});
};
var ARRAY_LINE = /^\[/;
/* Files can contain metadata but not content
call back with true if the channel log has no content other than metadata
otherwise false
*/
Channel.isNewChannel = function (Env, channel, cb) {
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return void cb('INVALID_CHAN'); }
var done = false;
Env.msgStore.getMessages(channel, function (msg) {
if (done) { return; }
try {
if (typeof(msg) === 'string' && ARRAY_LINE.test(msg)) {
done = true;
return void cb(void 0, false);
}
} catch (e) {
Env.WARN('invalid message read from store', e);
}
}, function () {
if (done) { return; }
// no more messages...
cb(void 0, true);
});
};
/* writePrivateMessage
allows users to anonymously send a message to the channel
prevents their netflux-id from being stored in history
and from being broadcast to anyone that might currently be in the channel
Otherwise behaves the same as sending to a channel
*/
Channel.writePrivateMessage = function (Env, args, Server, cb) {
var channelId = args[0];
var msg = args[1];
// don't bother handling empty messages
if (!msg) { return void cb("INVALID_MESSAGE"); }
// don't support anything except regular channels
if (!Core.isValidId(channelId) || channelId.length !== 32) {
return void cb("INVALID_CHAN");
}
// We expect a modern netflux-websocket-server instance
// if this API isn't here everything will fall apart anyway
if (!(Server && typeof(Server.send) === 'function')) {
return void cb("NOT_IMPLEMENTED");
}
// historyKeeper expects something with an 'id' attribute
// it will fail unless you provide it, but it doesn't need anything else
var channelStruct = {
id: channelId,
};
// construct a message to store and broadcast
var fullMessage = [
0, // idk
null, // normally the netflux id, null isn't rejected, and it distinguishes messages written in this way
"MSG", // indicate that this is a MSG
channelId, // channel id
msg // the actual message content. Generally a string
];
// historyKeeper already knows how to handle metadata and message validation, so we just pass it off here
// if the message isn't valid it won't be stored.
Env.historyKeeper.channelMessage(Server, channelStruct, fullMessage);
// call back with the message and the target channel.
// historyKeeper will take care of broadcasting it if anyone is in the channel
cb(void 0, {
channel: channelId,
message: fullMessage
});
};

188
lib/commands/core.js Normal file
View File

@@ -0,0 +1,188 @@
/*jshint esversion: 6 */
/* globals process */
const Core = module.exports;
const Util = require("../common-util");
const escapeKeyCharacters = Util.escapeKeyCharacters;
/* Use Nacl for checking signatures of messages */
const Nacl = require("tweetnacl/nacl-fast");
Core.DEFAULT_LIMIT = 50 * 1024 * 1024;
Core.SESSION_EXPIRATION_TIME = 60 * 1000;
Core.isValidId = function (chan) {
return chan && chan.length && /^[a-zA-Z0-9=+-]*$/.test(chan) &&
[32, 48].indexOf(chan.length) > -1;
};
var makeToken = Core.makeToken = function () {
return Number(Math.floor(Math.random() * Number.MAX_SAFE_INTEGER))
.toString(16);
};
Core.makeCookie = function (token) {
var time = (+new Date());
time -= time % 5000;
return [
time,
process.pid,
token
];
};
var parseCookie = function (cookie) {
if (!(cookie && cookie.split)) { return null; }
var parts = cookie.split('|');
if (parts.length !== 3) { return null; }
var c = {};
c.time = new Date(parts[0]);
c.pid = Number(parts[1]);
c.seq = parts[2];
return c;
};
Core.getSession = function (Sessions, key) {
var safeKey = escapeKeyCharacters(key);
if (Sessions[safeKey]) {
Sessions[safeKey].atime = +new Date();
return Sessions[safeKey];
}
var user = Sessions[safeKey] = {};
user.atime = +new Date();
user.tokens = [
makeToken()
];
return user;
};
Core.expireSession = function (Sessions, safeKey) {
var session = Sessions[safeKey];
if (!session) { return; }
if (session.blobstage) {
session.blobstage.close();
}
delete Sessions[safeKey];
};
Core.expireSessionAsync = function (Env, safeKey, cb) {
setTimeout(function () {
Core.expireSession(Sessions, safeKey);
cb(void 0, 'OK');
});
};
var isTooOld = function (time, now) {
return (now - time) > 300000;
};
Core.expireSessions = function (Sessions) {
var now = +new Date();
Object.keys(Sessions).forEach(function (safeKey) {
var session = Sessions[safeKey];
if (session && isTooOld(session.atime, now)) {
Core.expireSession(Sessions, safeKey);
}
});
};
var addTokenForKey = function (Sessions, publicKey, token) {
if (!Sessions[publicKey]) { throw new Error('undefined user'); }
var user = Core.getSession(Sessions, publicKey);
user.tokens.push(token);
user.atime = +new Date();
if (user.tokens.length > 2) { user.tokens.shift(); }
};
Core.isValidCookie = function (Sessions, publicKey, cookie) {
var parsed = parseCookie(cookie);
if (!parsed) { return false; }
var now = +new Date();
if (!parsed.time) { return false; }
if (isTooOld(parsed.time, now)) {
return false;
}
// different process. try harder
if (process.pid !== parsed.pid) {
return false;
}
var user = Core.getSession(Sessions, publicKey);
if (!user) { return false; }
var idx = user.tokens.indexOf(parsed.seq);
if (idx === -1) { return false; }
if (idx > 0) {
// make a new token
addTokenForKey(Sessions, publicKey, Core.makeToken());
}
return true;
};
Core.checkSignature = function (Env, signedMsg, signature, publicKey) {
if (!(signedMsg && publicKey)) { return false; }
var signedBuffer;
var pubBuffer;
var signatureBuffer;
try {
signedBuffer = Nacl.util.decodeUTF8(signedMsg);
} catch (e) {
Env.Log.error('INVALID_SIGNED_BUFFER', signedMsg);
return null;
}
try {
pubBuffer = Nacl.util.decodeBase64(publicKey);
} catch (e) {
return false;
}
try {
signatureBuffer = Nacl.util.decodeBase64(signature);
} catch (e) {
return false;
}
if (pubBuffer.length !== 32) {
Env.Log.error('PUBLIC_KEY_LENGTH', publicKey);
return false;
}
if (signatureBuffer.length !== 64) {
return false;
}
return Nacl.sign.detached.verify(signedBuffer, signatureBuffer, pubBuffer);
};
// E_NO_OWNERS
Core.hasOwners = function (metadata) {
return Boolean(metadata && Array.isArray(metadata.owners));
};
Core.hasPendingOwners = function (metadata) {
return Boolean(metadata && Array.isArray(metadata.pending_owners));
};
// INSUFFICIENT_PERMISSIONS
Core.isOwner = function (metadata, unsafeKey) {
return metadata.owners.indexOf(unsafeKey) !== -1;
};
Core.isPendingOwner = function (metadata, unsafeKey) {
return metadata.pending_owners.indexOf(unsafeKey) !== -1;
};

116
lib/commands/metadata.js Normal file
View File

@@ -0,0 +1,116 @@
/*jshint esversion: 6 */
const Data = module.exports;
const Meta = require("../metadata");
const BatchRead = require("../batch-read");
const WriteQueue = require("../write-queue");
const Core = require("./core");
const Util = require("../common-util");
const batchMetadata = BatchRead("GET_METADATA");
Data.getMetadata = function (Env, channel, cb) {
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length !== 32) { return cb("INVALID_CHAN_LENGTH"); }
batchMetadata(channel, cb, function (done) {
var ref = {};
var lineHandler = Meta.createLineHandler(ref, Env.Log.error);
return void Env.msgStore.readChannelMetadata(channel, lineHandler, function (err) {
if (err) {
// stream errors?
return void done(err);
}
done(void 0, ref.meta);
});
});
};
/* setMetadata
- write a new line to the metadata log if a valid command is provided
- data is an object: {
channel: channelId,
command: metadataCommand (string),
value: value
}
*/
var queueMetadata = WriteQueue();
Data.setMetadata = function (Env, safeKey, data, cb) {
var unsafeKey = Util.unescapeKeyCharacters(safeKey);
var channel = data.channel;
var command = data.command;
if (!channel || !Core.isValidId(channel)) { return void cb ('INVALID_CHAN'); }
if (!command || typeof (command) !== 'string') { return void cb ('INVALID_COMMAND'); }
if (Meta.commands.indexOf(command) === -1) { return void('UNSUPPORTED_COMMAND'); }
queueMetadata(channel, function (next) {
Data.getMetadata(Env, channel, function (err, metadata) {
if (err) {
cb(err);
return void next();
}
if (!Core.hasOwners(metadata)) {
cb('E_NO_OWNERS');
return void next();
}
// if you are a pending owner and not an owner
// you can either ADD_OWNERS, or RM_PENDING_OWNERS
// and you should only be able to add yourself as an owner
// everything else should be rejected
// else if you are not an owner
// you should be rejected
// else write the command
// Confirm that the channel is owned by the user in question
// or the user is accepting a pending ownership offer
if (Core.hasPendingOwners(metadata) &&
Core.isPendingOwner(metadata, unsafeKey) &&
!Core.isOwner(metadata, unsafeKey)) {
// If you are a pending owner, make sure you can only add yourelf as an owner
if ((command !== 'ADD_OWNERS' && command !== 'RM_PENDING_OWNERS')
|| !Array.isArray(data.value)
|| data.value.length !== 1
|| data.value[0] !== unsafeKey) {
cb('INSUFFICIENT_PERMISSIONS');
return void next();
}
// FIXME wacky fallthrough is hard to read
// we could pass this off to a writeMetadataCommand function
// and make the flow easier to follow
} else if (!Core.isOwner(metadata, unsafeKey)) {
cb('INSUFFICIENT_PERMISSIONS');
return void next();
}
// Add the new metadata line
var line = [command, data.value, +new Date()];
var changed = false;
try {
changed = Meta.handleCommand(metadata, line);
} catch (e) {
cb(e);
return void next();
}
// if your command is valid but it didn't result in any change to the metadata,
// call back now and don't write any "useless" line to the log
if (!changed) {
cb(void 0, metadata);
return void next();
}
Env.msgStore.writeMetadata(channel, JSON.stringify(line), function (e) {
if (e) {
cb(e);
return void next();
}
cb(void 0, metadata);
next();
});
});
});
};

464
lib/commands/pin-rpc.js Normal file
View File

@@ -0,0 +1,464 @@
/*jshint esversion: 6 */
const Core = require("./core");
const BatchRead = require("../batch-read");
const Pins = require("../pins");
const Pinning = module.exports;
const Nacl = require("tweetnacl/nacl-fast");
const Util = require("../common-util");
const nThen = require("nthen");
const Saferphore = require("saferphore");
const Pinned = require('../../scripts/pinned');
//const escapeKeyCharacters = Util.escapeKeyCharacters;
const unescapeKeyCharacters = Util.unescapeKeyCharacters;
var sumChannelSizes = function (sizes) {
return Object.keys(sizes).map(function (id) { return sizes[id]; })
.filter(function (x) {
// only allow positive numbers
return !(typeof(x) !== 'number' || x <= 0);
})
.reduce(function (a, b) { return a + b; }, 0);
};
// XXX it's possible for this to respond before the server has had a chance
// to fetch the limits. Maybe we should respond with an error...
// or wait until we actually know the limits before responding
var getLimit = Pinning.getLimit = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
var toSend = limit && typeof(limit.limit) === "number"?
[limit.limit, limit.plan, limit.note] : [defaultLimit, '', ''];
cb(void 0, toSend);
};
var addPinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {};
x[publicKey] = 1;
});
cb();
});
};
var removePinned = function (
Env,
publicKey /*:string*/,
channelList /*Array<string>*/,
cb /*:()=>void*/)
{
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => {
const x = Env.pinnedPads[c];
if (!x) { return; }
delete x[publicKey];
});
cb();
});
};
var getMultipleFileSize = function (Env, channels, cb) {
if (!Array.isArray(channels)) { return cb('INVALID_PIN_LIST'); }
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
var i = channels.length;
var counts = {};
var done = function () {
i--;
if (i === 0) { return cb(void 0, counts); }
};
channels.forEach(function (channel) {
Pinning.getFileSize(Env, channel, function (e, size) {
if (e) {
// most likely error here is that a file no longer exists
// but a user still has it in their drive, and wants to know
// its size. We should find a way to inform them of this in
// the future. For now we can just tell them it has no size.
//WARN('getFileSize', e);
counts[channel] = 0;
return done();
}
counts[channel] = size;
done();
});
});
};
const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, publicKey, cb) {
var session = Core.getSession(Env.Sessions, publicKey);
if (session.channels) {
return cb(session.channels);
}
batchUserPins(publicKey, cb, function (done) {
var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Env.Log.error(label, {
log: publicKey,
data: data,
});
});
// if channels aren't in memory. load them from disk
Env.pinStore.getMessages(publicKey, lineHandler, function () {
// no more messages
// only put this into the cache if it completes
session.channels = ref.pins;
done(ref.pins); // FIXME no error handling?
});
});
};
var truthyKeys = function (O) {
return Object.keys(O).filter(function (k) {
return O[k];
});
};
var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
loadUserPins(Env, publicKey, function (pins) {
cb(truthyKeys(pins));
});
};
const batchTotalSize = BatchRead("GET_TOTAL_SIZE");
Pinning.getTotalSize = function (Env, publicKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey);
var limit = Env.limits[unescapedKey];
// Get a common key if multiple users share the same quota, otherwise take the public key
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey;
batchTotalSize(batchKey, cb, function (done) {
var channels = [];
var bytes = 0;
nThen(function (waitFor) {
// Get the channels list for our user account
Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) {
if (!_channels) {
waitFor.abort();
return done('INVALID_PIN_LIST');
}
Array.prototype.push.apply(channels, _channels);
}));
// Get the channels list for users sharing our quota
if (limit && Array.isArray(limit.users) && limit.users.length > 1) {
limit.users.forEach(function (key) {
if (key === unescapedKey) { return; } // Don't count ourselves twice
getChannelList(Env, key, waitFor(function (_channels) {
if (!_channels) { return; } // Broken user, don't count their quota
Array.prototype.push.apply(channels, _channels);
}));
});
}
}).nThen(function (waitFor) {
// Get size of the channels
var list = []; // Contains the channels already counted in the quota to avoid duplicates
channels.forEach(function (channel) { // TODO semaphore?
if (list.indexOf(channel) !== -1) { return; }
list.push(channel);
Pinning.getFileSize(Env, channel, waitFor(function (e, size) {
if (!e) { bytes += size; }
}));
});
}).nThen(function () {
done(void 0, bytes);
});
});
};
/* Users should be able to clear their own pin log with an authenticated RPC
*/
Pinning.removePins = function (Env, safeKey, cb) {
if (typeof(Env.pinStore.removeChannel) !== 'function') {
return void cb("E_NOT_IMPLEMENTED");
}
Env.pinStore.removeChannel(safeKey, function (err) {
Env.Log.info('DELETION_PIN_BY_OWNER_RPC', {
safeKey: safeKey,
status: err? String(err): 'SUCCESS',
});
cb(err);
});
};
Pinning.trimPins = function (Env, safeKey, cb) {
// XXX trim to latest pin checkpoint
cb("NOT_IMPLEMENTED");
};
var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) {
getLimit(Env, publicKey, function (e, limit) {
if (e) { return void cb(e); }
Pinning.getTotalSize(Env, publicKey, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
var rem = limit[0] - size;
if (typeof(rem) !== 'number') {
return void cb('invalid_response');
}
cb(void 0, rem);
});
});
};
var hashChannelList = function (A) {
var uniques = [];
A.forEach(function (a) {
if (uniques.indexOf(a) === -1) { uniques.push(a); }
});
uniques.sort();
var hash = Nacl.util.encodeBase64(Nacl.hash(Nacl
.util.decodeUTF8(JSON.stringify(uniques))));
return hash;
};
var getHash = Pinning.getHash = function (Env, publicKey, cb) {
getChannelList(Env, publicKey, function (channels) {
cb(void 0, hashChannelList(channels));
});
};
Pinning.pinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
return void cb('INVALID_PIN_LIST');
}
// get channel list ensures your session has a cached channel list
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
// only pin channels which are not already pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) === -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
getMultipleFileSize(Env, toStore, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getFreeSpace(Env, publicKey, function (e, free) {
if (typeof(free) === 'undefined') {
Env.WARN('getFreeSpace', e);
return void cb(e);
}
if (pinSize > free) { return void cb('E_OVER_LIMIT'); }
Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
session.channels[channel] = true;
});
addPinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
});
});
};
Pinning.unpinChannel = function (Env, publicKey, channels, cb) {
if (!channels && channels.filter) {
// expected array
return void cb('INVALID_PIN_LIST');
}
getChannelList(Env, publicKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey);
// only unpin channels which are pinned
var toStore = channels.filter(function (channel) {
return pinned.indexOf(channel) !== -1;
});
if (toStore.length === 0) {
return void getHash(Env, publicKey, cb);
}
Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
function (e) {
if (e) { return void cb(e); }
toStore.forEach(function (channel) {
delete session.channels[channel];
});
removePinned(Env, publicKey, toStore, () => {});
getHash(Env, publicKey, cb);
});
});
};
Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); }
var session = Core.getSession(Env.Sessions, publicKey);
if (!channelList.length) {
return void getHash(Env, publicKey, function (e, hash) {
if (e) { return cb(e); }
cb(void 0, hash);
});
}
var pins = {};
getMultipleFileSize(Env, channelList, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes);
getLimit(Env, publicKey, function (e, limit) {
if (e) {
Env.WARN('[RESET_ERR]', e);
return void cb(e);
}
/* we want to let people pin, even if they are over their limit,
but they should only be able to do this once.
This prevents data loss in the case that someone registers, but
does not have enough free space to pin their migrated data.
They will not be able to pin additional pads until they upgrade
or delete enough files to go back under their limit. */
if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); }
Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]),
function (e) {
if (e) { return void cb(e); }
channelList.forEach(function (channel) {
pins[channel] = true;
});
var oldChannels;
if (session.channels && typeof(session.channels) === 'object') {
oldChannels = Object.keys(session.channels);
} else {
oldChannels = [];
}
removePinned(Env, publicKey, oldChannels, () => {
addPinned(Env, publicKey, channelList, ()=>{});
});
// update in-memory cache IFF the reset was allowed.
session.channels = pins;
getHash(Env, publicKey, function (e, hash) {
cb(e, hash);
});
});
});
});
};
Pinning.getFileSize = function (Env, channel, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!Core.isValidId(channel)) { return void cb('INVALID_CHAN'); }
if (channel.length === 32) {
if (typeof(Env.msgStore.getChannelSize) !== 'function') {
return cb('GET_CHANNEL_SIZE_UNSUPPORTED');
}
return void Env.msgStore.getChannelSize(channel, function (e, size /*:number*/) {
if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, 0); }
return void cb(e.code);
}
cb(void 0, size);
});
}
// 'channel' refers to a file, so you need another API
Env.blobStore.size(channel, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); }
cb(void 0, size);
});
};
/* accepts a list, and returns a sublist of channel or file ids which seem
to have been deleted from the server (file size 0)
we might consider that we should only say a file is gone if fs.stat returns
ENOENT, but for now it's simplest to just rely on getFileSize...
*/
Pinning.getDeletedPads = function (Env, channels, cb) {
if (!Array.isArray(channels)) { return cb('INVALID_LIST'); }
var L = channels.length;
var sem = Saferphore.create(10);
var absentees = [];
var job = function (channel, wait) {
return function (give) {
Pinning.getFileSize(Env, channel, wait(give(function (e, size) {
if (e) { return; }
if (size === 0) { absentees.push(channel); }
})));
};
};
nThen(function (w) {
for (var i = 0; i < L; i++) {
sem.take(job(channels[i], w));
}
}).nThen(function () {
cb(void 0, absentees);
});
};
// inform that the
Pinning.loadChannelPins = function (Env) {
Pinned.load(function (err, data) {
if (err) {
Env.Log.error("LOAD_CHANNEL_PINS", err);
// FIXME not sure what should be done here instead
Env.pinnedPads = {};
Env.evPinnedPadsReady.fire();
return;
}
Env.pinnedPads = data;
Env.evPinnedPadsReady.fire();
}, {
pinPath: Env.paths.pin,
});
};
Pinning.isChannelPinned = function (Env, channel, cb) {
Env.evPinnedPadsReady.reg(() => {
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) {
cb(true);
} else {
delete Env.pinnedPads[channel];
cb(false);
}
});
};

112
lib/commands/quota.js Normal file
View File

@@ -0,0 +1,112 @@
/*jshint esversion: 6 */
/* globals Buffer*/
const Quota = module.exports;
const Core = require("./core");
const Util = require("../common-util");
const Package = require('../../package.json');
const Https = require("https");
Quota.applyCustomLimits = function (Env) {
var isLimit = function (o) {
var valid = o && typeof(o) === 'object' &&
typeof(o.limit) === 'number' &&
typeof(o.plan) === 'string' &&
typeof(o.note) === 'string';
return valid;
};
// read custom limits from the Environment (taken from config)
var customLimits = (function (custom) {
var limits = {};
Object.keys(custom).forEach(function (k) {
k.replace(/\/([^\/]+)$/, function (all, safeKey) {
var id = Util.unescapeKeyCharacters(safeKey || '');
limits[id] = custom[k];
return '';
});
});
return limits;
}(Env.customLimits || {}));
Object.keys(customLimits).forEach(function (k) {
if (!isLimit(customLimits[k])) { return; }
Env.limits[k] = customLimits[k];
});
};
// The limits object contains storage limits for all the publicKey that have paid
// To each key is associated an object containing the 'limit' value and a 'note' explaining that limit
// XXX maybe the use case with a publicKey should be a different command that calls this?
Quota.updateLimits = function (Env, publicKey, cb) { // FIXME BATCH?S
if (Env.adminEmail === false) {
Quota.applyCustomLimits(Env);
if (Env.allowSubscriptions === false) { return; }
throw new Error("allowSubscriptions must be false if adminEmail is false");
}
if (typeof cb !== "function") { cb = function () {}; }
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
var userId;
if (publicKey) {
userId = Util.unescapeKeyCharacters(publicKey);
}
var body = JSON.stringify({
domain: Env.myDomain,
subdomain: Env.mySubdomain || null,
adminEmail: Env.adminEmail,
version: Package.version
});
var options = {
host: 'accounts.cryptpad.fr',
path: '/api/getauthorized',
method: 'POST',
headers: {
"Content-Type": "application/json",
"Content-Length": Buffer.byteLength(body)
}
};
var req = Https.request(options, function (response) {
if (!('' + response.statusCode).match(/^2\d\d$/)) {
return void cb('SERVER ERROR ' + response.statusCode);
}
var str = '';
response.on('data', function (chunk) {
str += chunk;
});
response.on('end', function () {
try {
var json = JSON.parse(str);
Env.limits = json;
Quota.applyCustomLimits(Env);
var l;
if (userId) {
var limit = Env.limits[userId];
l = limit && typeof limit.limit === "number" ?
[limit.limit, limit.plan, limit.note] : [defaultLimit, '', ''];
}
cb(void 0, l);
} catch (e) {
cb(e);
}
});
});
req.on('error', function (e) {
Quota.applyCustomLimits(Env);
if (!Env.domain) { return cb(); } // XXX
cb(e);
});
req.end(body);
};

57
lib/commands/upload.js Normal file
View File

@@ -0,0 +1,57 @@
/*jshint esversion: 6 */
const Upload = module.exports;
const Util = require("../common-util");
const Pinning = require("./pin-rpc");
const nThen = require("nthen");
const Core = require("./core");
Upload.status = function (Env, safeKey, filesize, _cb) { // FIXME FILES
var cb = Util.once(Util.mkAsync(_cb));
// validate that the provided size is actually a positive number
if (typeof(filesize) !== 'number' &&
filesize >= 0) { return void cb('E_INVALID_SIZE'); }
if (filesize >= Env.maxUploadSize) { return cb('TOO_LARGE'); }
nThen(function (w) {
var abortAndCB = Util.both(w.abort, cb);
Env.blobStore.status(safeKey, w(function (err, inProgress) {
// if there's an error something is weird
if (err) { return void abortAndCB(err); }
// we cannot upload two things at once
if (inProgress) { return void abortAndCB(void 0, true); }
}));
}).nThen(function () {
// if yuo're here then there are no pending uploads
// check if you have space in your quota to upload something of this size
Pinning.getFreeSpace(Env, safeKey, function (e, free) {
if (e) { return void cb(e); }
if (filesize >= free) { return cb('NOT_ENOUGH_SPACE'); }
var user = Core.getSession(Env.Sessions, safeKey);
user.pendingUploadSize = filesize;
user.currentUploadSize = 0;
cb(void 0, false);
});
});
};
Upload.upload = function (Env, safeKey, chunk, cb) {
Env.blobStore.upload(safeKey, chunk, cb);
};
Upload.complete = function (Env, safeKey, arg, cb) {
Env.blobStore.complete(safeKey, arg, cb);
};
Upload.cancel = function (Env, safeKey, arg, cb) {
Env.blobStore.cancel(safeKey, arg, cb);
};
Upload.complete_owned = function (Env, safeKey, arg, cb) {
Env.blobStore.completeOwned(safeKey, arg, cb);
};

1008
lib/historyKeeper.js Normal file

File diff suppressed because it is too large Load Diff

33
lib/hk-util.js Normal file
View File

@@ -0,0 +1,33 @@
var HK = module.exports;
/* getHash
* this function slices off the leading portion of a message which is
most likely unique
* these "hashes" are used to identify particular messages in a channel's history
* clients store "hashes" either in memory or in their drive to query for new messages:
* when reconnecting to a pad
* when connecting to chat or a mailbox
* thus, we can't change this function without invalidating client data which:
* is encrypted clientside
* can't be easily migrated
* don't break it!
*/
HK.getHash = function (msg, Log) {
if (typeof(msg) !== 'string') {
if (Log) {
Log.warn('HK_GET_HASH', 'getHash() called on ' + typeof(msg) + ': ' + msg);
}
return '';
}
return msg.slice(0,64);
};
// historyKeeper should explicitly store any channel
// with a 32 character id
HK.STANDARD_CHANNEL_LENGTH = 32;
// historyKeeper should not store messages sent to any channel
// with a 34 character id
HK.EPHEMERAL_CHANNEL_LENGTH = 34;

View File

@@ -211,12 +211,14 @@ Meta.createLineHandler = function (ref, errorHandler) {
line: JSON.stringify(line),
});
}
// the case above is special, everything else should increment the index
var index = ref.index++;
if (typeof(line) === 'undefined') { return; }
if (Array.isArray(line)) {
try {
handleCommand(ref.meta, line);
ref.index++;
} catch (err2) {
errorHandler("METADATA_COMMAND_ERR", {
error: err2.stack,
@@ -226,8 +228,15 @@ Meta.createLineHandler = function (ref, errorHandler) {
return;
}
if (ref.index === 0 && typeof(line) === 'object') {
ref.index++;
// the first line of a channel is processed before the dedicated metadata log.
// it can contain a map, in which case it should be used as the initial state.
// it's possible that a trim-history command was interrupted, in which case
// this first message might exist in parallel with the more recent metadata log
// which will contain the computed state of the previous metadata log
// which has since been archived.
// Thus, accept both the first and second lines you process as valid initial state
// preferring the second if it exists
if (index < 2 && line && typeof(line) === 'object') {
// special case!
ref.meta = line;
return;
@@ -235,7 +244,7 @@ Meta.createLineHandler = function (ref, errorHandler) {
errorHandler("METADATA_HANDLER_WEIRDLINE", {
line: line,
index: ref.index++,
index: index,
});
};
};

399
lib/rpc.js Normal file
View File

@@ -0,0 +1,399 @@
/*jshint esversion: 6 */
const nThen = require("nthen");
const Util = require("./common-util");
const mkEvent = Util.mkEvent;
const Core = require("./commands/core");
const Admin = require("./commands/admin-rpc");
const Pinning = require("./commands/pin-rpc");
const Quota = require("./commands/quota");
const Block = require("./commands/block");
const Metadata = require("./commands/metadata");
const Channel = require("./commands/channel");
const Upload = require("./commands/upload");
var RPC = module.exports;
const Store = require("../storage/file");
const BlobStore = require("../storage/blob");
const UNAUTHENTICATED_CALLS = [
'GET_FILE_SIZE',
'GET_METADATA',
'GET_MULTIPLE_FILE_SIZE',
'IS_CHANNEL_PINNED',
'IS_NEW_CHANNEL',
'GET_DELETED_PADS',
'WRITE_PRIVATE_MESSAGE',
];
var isUnauthenticatedCall = function (call) {
return UNAUTHENTICATED_CALLS.indexOf(call) !== -1;
};
const AUTHENTICATED_CALLS = [
'COOKIE',
'RESET',
'PIN',
'UNPIN',
'GET_HASH',
'GET_TOTAL_SIZE',
'UPDATE_LIMITS',
'GET_LIMIT',
'UPLOAD_STATUS',
'UPLOAD_COMPLETE',
'OWNED_UPLOAD_COMPLETE',
'UPLOAD_CANCEL',
'EXPIRE_SESSION',
'TRIM_OWNED_CHANNEL_HISTORY',
'CLEAR_OWNED_CHANNEL',
'REMOVE_OWNED_CHANNEL',
'REMOVE_PINS',
'TRIM_PINS',
'WRITE_LOGIN_BLOCK',
'REMOVE_LOGIN_BLOCK',
'ADMIN',
'SET_METADATA'
];
var isAuthenticatedCall = function (call) {
return AUTHENTICATED_CALLS.indexOf(call) !== -1;
};
var isUnauthenticateMessage = function (msg) {
return msg && msg.length === 2 && isUnauthenticatedCall(msg[0]);
};
var handleUnauthenticatedMessage = function (Env, msg, respond, Server) {
Env.Log.silly('LOG_RPC', msg[0]);
switch (msg[0]) {
case 'GET_FILE_SIZE':
return void Pinning.getFileSize(Env, msg[1], function (e, size) {
Env.WARN(e, msg[1]);
respond(e, [null, size, null]);
});
case 'GET_METADATA':
return void Metadata.getMetadata(Env, msg[1], function (e, data) {
Env.WARN(e, msg[1]);
respond(e, [null, data, null]);
});
case 'GET_MULTIPLE_FILE_SIZE': // XXX not actually used on the client?
return void Pinning.getMultipleFileSize(Env, msg[1], function (e, dict) {
if (e) {
Env.WARN(e, dict);
return respond(e);
}
respond(e, [null, dict, null]);
});
case 'GET_DELETED_PADS':
return void Pinning.getDeletedPads(Env, msg[1], function (e, list) {
if (e) {
Env.WARN(e, msg[1]);
return respond(e);
}
respond(e, [null, list, null]);
});
case 'IS_CHANNEL_PINNED':
return void Pinning.isChannelPinned(Env, msg[1], function (isPinned) {
respond(null, [null, isPinned, null]);
});
case 'IS_NEW_CHANNEL':
return void Channel.isNewChannel(Env, msg[1], function (e, isNew) {
respond(e, [null, isNew, null]);
});
case 'WRITE_PRIVATE_MESSAGE':
return void Channel.writePrivateMessage(Env, msg[1], Server, function (e, output) {
respond(e, output);
});
default:
Env.Log.warn("UNSUPPORTED_RPC_CALL", msg);
return respond('UNSUPPORTED_RPC_CALL', msg);
}
};
const AUTHENTICATED_USER_TARGETED = {
RESET: Pinning.resetUserPins,
PIN: Pinning.pinChannel,
UNPIN: Pinning.unpinChannel,
CLEAR_OWNED_CHANNEL: Channel.clearOwnedChannel,
REMOVE_OWNED_CHANNEL: Channel.removeOwnedChannel,
UPLOAD_STATUS: Upload.status,
UPLOAD: Upload.upload,
UPLOAD_COMPLETE: Upload.complete,
UPLOAD_CANCEL: Upload.cancel,
OWNED_UPLOAD_COMPLETE: Upload.complete_owned,
};
const AUTHENTICATED_USER_SCOPED = {
GET_HASH: Pinning.getHash,
GET_TOTAL_SIZE: Pinning.getTotalSize,
UPDATE_LIMITS: Quota.updateLimits,
GET_LIMIT: Pinning.getLimit,
EXPIRE_SESSION: Core.expireSessionAsync,
REMOVE_PINS: Pinning.removePins,
TRIM_PINS: Pinning.trimPins,
SET_METADATA: Metadata.setMetadata,
};
var handleAuthenticatedMessage = function (Env, map) {
var msg = map.msg;
var safeKey = map.safeKey;
var publicKey = map.publicKey;
var Respond = map.Respond;
var Server = map.Server;
var TYPE = msg[0];
Env.Log.silly('LOG_RPC', TYPE);
if (typeof(AUTHENTICATED_USER_TARGETED[TYPE]) === 'function') {
return void AUTHENTICATED_USER_TARGETED[TYPE](Env, safeKey, msg[1], function (e, value) {
Env.WARN(e, value);
return void Respond(e, value);
});
}
if (typeof(AUTHENTICATED_USER_SCOPED[TYPE]) === 'function') {
return void AUTHENTICATED_USER_SCOPED[TYPE](Env, safeKey, function (e, value) {
if (e) {
Env.WARN(e, safeKey);
return void Respond(e);
}
Respond(e, value);
});
}
switch (msg[0]) {
case 'COOKIE': return void Respond(void 0);
case 'TRIM_OWNED_CHANNEL_HISTORY':
return void Channel.removeOwnedChannelHistory(Env, msg[1], publicKey, msg[2], function (e) { // XXX USER_TARGETED_DOUBLE
if (e) { return void Respond(e); }
Respond(void 0, 'OK');
});
case 'WRITE_LOGIN_BLOCK':
return void Block.writeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL
if (e) {
Env.WARN(e, 'WRITE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'REMOVE_LOGIN_BLOCK':
return void Block.removeLoginBlock(Env, msg[1], function (e) { // XXX SPECIAL
if (e) {
Env.WARN(e, 'REMOVE_LOGIN_BLOCK');
return void Respond(e);
}
Respond(e);
});
case 'ADMIN':
return void Admin.command(Env, Server, safeKey, msg[1], function (e, result) { // XXX SPECIAL
if (e) {
Env.WARN(e, result);
return void Respond(e);
}
Respond(void 0, result);
});
default:
console.log(msg);
throw new Error("OOPS");
return void Respond('UNSUPPORTED_RPC_CALL', msg);
}
};
var rpc = function (Env, Server, data, respond) {
if (!Array.isArray(data)) {
Env.Log.debug('INVALID_ARG_FORMET', data);
return void respond('INVALID_ARG_FORMAT');
}
if (!data.length) {
return void respond("INSUFFICIENT_ARGS");
} else if (data.length !== 1) {
Env.Log.debug('UNEXPECTED_ARGUMENTS_LENGTH', data);
}
var msg = data[0].slice(0);
if (!Array.isArray(msg)) {
return void respond('INVALID_ARG_FORMAT');
}
if (isUnauthenticateMessage(msg)) {
return handleUnauthenticatedMessage(Env, msg, respond, Server);
}
var signature = msg.shift();
var publicKey = msg.shift();
// make sure a user object is initialized in the cookie jar
if (publicKey) {
Core.getSession(Env.Sessions, publicKey);
} else {
Env.Log.debug("NO_PUBLIC_KEY_PROVIDED", publicKey);
}
var cookie = msg[0];
if (!Core.isValidCookie(Env.Sessions, publicKey, cookie)) {
// no cookie is fine if the RPC is to get a cookie
if (msg[1] !== 'COOKIE') {
return void respond('NO_COOKIE');
}
}
var serialized = JSON.stringify(msg);
if (!(serialized && typeof(publicKey) === 'string')) {
return void respond('INVALID_MESSAGE_OR_PUBLIC_KEY');
}
if (isAuthenticatedCall(msg[1])) {
if (Core.checkSignature(Env, serialized, signature, publicKey) !== true) {
return void respond("INVALID_SIGNATURE_OR_PUBLIC_KEY");
}
} else if (msg[1] !== 'UPLOAD') {
Env.Log.warn('INVALID_RPC_CALL', msg[1]);
return void respond("INVALID_RPC_CALL");
}
var safeKey = Util.escapeKeyCharacters(publicKey);
/* If you have gotten this far, you have signed the message with the
public key which you provided.
We can safely modify the state for that key
OR it's an unauthenticated call, which must not modify the state
for that key in a meaningful way.
*/
// discard validated cookie from message
msg.shift();
var Respond = function (e, msg) {
var session = Env.Sessions[safeKey];
var token = session? session.tokens.slice(-1)[0]: '';
var cookie = Core.makeCookie(token).join('|');
respond(e ? String(e): e, [cookie].concat(typeof(msg) !== 'undefined' ?msg: []));
};
if (typeof(msg) !== 'object' || !msg.length) {
return void Respond('INVALID_MSG');
}
handleAuthenticatedMessage(Env, {
msg: msg,
safeKey: safeKey,
publicKey: publicKey,
Respond: Respond,
Server: Server,
});
};
RPC.create = function (config, cb) {
var Log = config.log;
// load pin-store...
Log.silly('LOADING RPC MODULE');
var keyOrDefaultString = function (key, def) {
return typeof(config[key]) === 'string'? config[key]: def;
};
var WARN = function (e, output) {
if (e && output) {
Log.warn(e, {
output: output,
message: String(e),
stack: new Error(e).stack,
});
}
};
var Env = {
historyKeeper: config.historyKeeper,
intervals: config.intervals || {},
defaultStorageLimit: config.defaultStorageLimit,
maxUploadSize: config.maxUploadSize || (20 * 1024 * 1024),
Sessions: {},
paths: {},
msgStore: config.store,
pinStore: undefined,
pinnedPads: {},
evPinnedPadsReady: mkEvent(true),
limits: {},
admins: [],
Log: Log,
WARN: WARN,
flushCache: config.flushCache,
adminEmail: config.adminEmail,
allowSubscriptions: config.allowSubscriptions,
myDomain: config.myDomain,
mySubdomain: config.mySubdomain,
customLimits: config.customLimits,
domain: config.domain // XXX
};
try {
Env.admins = (config.adminKeys || []).map(function (k) {
k = k.replace(/\/+$/, '');
var s = k.split('/');
return s[s.length-1];
});
} catch (e) {
console.error("Can't parse admin keys. Please update or fix your config.js file!");
}
var Sessions = Env.Sessions;
var paths = Env.paths;
var pinPath = paths.pin = keyOrDefaultString('pinPath', './pins');
paths.block = keyOrDefaultString('blockPath', './block');
paths.data = keyOrDefaultString('filePath', './datastore');
paths.staging = keyOrDefaultString('blobStagingPath', './blobstage');
paths.blob = keyOrDefaultString('blobPath', './blob');
var updateLimitDaily = function () {
Quota.updateLimits(Env, undefined, function (e) {
if (e) {
WARN('limitUpdate', e);
}
});
};
Quota.applyCustomLimits(Env);
updateLimitDaily();
Env.intervals.dailyLimitUpdate = setInterval(updateLimitDaily, 24*3600*1000);
Pinning.loadChannelPins(Env);
nThen(function (w) {
Store.create({
filePath: pinPath,
}, w(function (s) {
Env.pinStore = s;
}));
BlobStore.create({
blobPath: config.blobPath,
blobStagingPath: config.blobStagingPath,
archivePath: config.archivePath,
getSession: function (safeKey) {
return Core.getSession(Sessions, safeKey);
},
}, w(function (err, blob) {
if (err) { throw new Error(err); }
Env.blobStore = blob;
}));
}).nThen(function () {
cb(void 0, function (Server, data, respond) {
try {
return rpc(Env, Server, data, respond);
} catch (e) {
console.log("Error from RPC with data " + JSON.stringify(data));
console.log(e.stack);
}
});
// expire old sessions once per minute
Env.intervals.sessionExpirationInterval = setInterval(function () {
Core.expireSessions(Sessions);
}, Core.SESSION_EXPIRATION_TIME);
});
};

172
lib/schedule.js Normal file
View File

@@ -0,0 +1,172 @@
var WriteQueue = require("./write-queue");
var Util = require("./common-util");
/* This module provides implements a FIFO scheduler
which assumes the existence of three types of async tasks:
1. ordered tasks which must be executed sequentially
2. unordered tasks which can be executed in parallel
3. blocking tasks which must block the execution of all other tasks
The scheduler assumes there will be many resources identified by strings,
and that the constraints described above will only apply in the context
of identical string ids.
Many blocking tasks may be executed in parallel so long as they
concern resources identified by different ids.
USAGE:
const schedule = require("./schedule")();
// schedule two sequential tasks using the resource 'pewpew'
schedule.ordered('pewpew', function (next) {
appendToFile('beep\n', next);
});
schedule.ordered('pewpew', function (next) {
appendToFile('boop\n', next);
});
// schedule a task that can happen whenever
schedule.unordered('pewpew', function (next) {
displayFileSize(next);
});
// schedule a blocking task which will wait
// until the all unordered tasks have completed before commencing
schedule.blocking('pewpew', function (next) {
deleteFile(next);
});
// this will be queued for after the blocking task
schedule.ordered('pewpew', function (next) {
appendFile('boom', next);
});
*/
// return a uid which is not already in a map
var unusedUid = function (set) {
var uid = Util.uid();
if (set[uid]) { return unusedUid(); }
return uid;
};
// return an existing session, creating one if it does not already exist
var lookup = function (map, id) {
return (map[id] = map[id] || {
//blocking: [],
active: {},
blocked: {},
});
};
var isEmpty = function (map) {
for (var key in map) {
if (map.hasOwnProperty(key)) { return false; }
}
return true;
};
module.exports = function () {
// every scheduler instance has its own queue
var queue = WriteQueue();
// ordered tasks don't require any extra logic
var Ordered = function (id, task) {
queue(id, task);
};
// unordered and blocking tasks need a little extra state
var map = {};
// regular garbage collection keeps memory consumption low
var collectGarbage = function (id) {
// avoid using 'lookup' since it creates a session implicitly
var local = map[id];
// bail out if no session
if (!local) { return; }
// bail out if there are blocking or active tasks
if (local.lock) { return; }
if (!isEmpty(local.active)) { return; }
// if there are no pending actions then delete the session
delete map[id];
};
// unordered tasks run immediately if there are no blocking tasks scheduled
// or immediately after blocking tasks finish
var runImmediately = function (local, task) {
// set a flag in the map of active unordered tasks
// to prevent blocking tasks from running until you finish
var uid = unusedUid(local.active);
local.active[uid] = true;
task(function () {
// remove the flag you set to indicate that your task completed
delete local.active[uid];
// don't do anything if other unordered tasks are still running
if (!isEmpty(local.active)) { return; }
// bail out if there are no blocking tasks scheduled or ready
if (typeof(local.waiting) !== 'function') {
return void collectGarbage();
}
setTimeout(local.waiting);
});
};
var runOnceUnblocked = function (local, task) {
var uid = unusedUid(local.blocked);
local.blocked[uid] = function () {
runImmediately(local, task);
};
};
// 'unordered' tasks are scheduled to run in after the most recently received blocking task
// or immediately and in parallel if there are no blocking tasks scheduled.
var Unordered = function (id, task) {
var local = lookup(map, id);
if (local.lock) { return runOnceUnblocked(local, task); }
runImmediately(local, task);
};
var runBlocked = function (local) {
for (var task in local.blocked) {
runImmediately(local, local.blocked[task]);
}
};
// 'blocking' tasks must be run alone.
// They are queued alongside ordered tasks,
// and wait until any running 'unordered' tasks complete before commencing.
var Blocking = function (id, task) {
var local = lookup(map, id);
queue(id, function (next) {
// start right away if there are no running unordered tasks
if (isEmpty(local.active)) {
local.lock = true;
return void task(function () {
delete local.lock;
runBlocked(local);
next();
});
}
// otherwise wait until the running tasks have completed
local.waiting = function () {
local.lock = true;
task(function () {
delete local.lock;
delete local.waiting;
runBlocked(local);
next();
});
};
});
};
return {
ordered: Ordered,
unordered: Unordered,
blocking: Blocking,
};
};