nest storage directory inside './lib'

This commit is contained in:
ansuz
2020-02-14 16:29:30 -05:00
parent 58193a1969
commit 725d10fc60
12 changed files with 18 additions and 18 deletions

59
lib/storage/README.md Normal file
View File

@@ -0,0 +1,59 @@
# Storage Mechanisms
Cryptpad's message API is quite simple and modular, and it isn't especially difficult to write alternative modules that employ your favourite datastore.
There are a few guidelines for creating a module:
Dependencies for your storage engine **should not** be added to Cryptpad.
Instead, write an adaptor, and place it in `cryptpad/storage/yourAdaptor.js`.
Alternatively, storage adaptors can be published to npm, and required from your config (once installed).
## Your adaptor should conform to a simple API.
It must export an object with a single property, `create`, which is a function.
That function must accept two arguments:
1. an object containing configuration values
- any configuration values that you require should be well documented
- they should also be named carefully so as to avoid collisions with other modules
2. a callback
- this callback is used to return an object with (currently) two methods
- even if your storage mechanism can be executed synchronously, we use the callback pattern for portability.
## Methods
### message(channelName, content, handler)
When Cryptpad receives a message, it saves it into its datastore using its equivalent of a table for its channel name, and then relays the message to every other client which is participating in the same channel.
Relaying logic exists outside of the storage module, you simply need to store the message then execute the handler on success.
### getMessages(channelName, handler, callback)
When a new client joins, they request the entire history of messages for a particular channel.
This method retreives those messages, and delivers them in order.
In practice, out of order messages make your clientside application more likely to fail, however, they are generally tolerated.
As a channel accumulates a greater number of messages, the likelihood of the application receiving them in the wrong order becomes greater.
This results in older sessions becoming less reliable.
This function accepts the name of the channel in which the user is interested, the handler for each message, and the callback to be executed when the last message has been fetched and handled.
**Note**, the callback is a new addition to this API.
It is only implemented within the leveldb adaptor, making our latest code incompatible with the other back ends.
While we migrate to our new Netflux API, only the leveldb adaptor will be supported.
## removeChannel(channelName, callback)
This method is called (optionally, see config.example.js for more info) some amount of time after the last client in a channel disconnects.
It should remove any history of that channel, and execute a callback which takes an error message as an argument.
## Documenting your adaptor
Naturally, you should comment your code well before making a PR.
Failing that, you should definitely add notes to `cryptpad/config.example.js` such that people who wish to install your adaptor know how to do so.
Notes on how to install the back end, as well as how to install the client for connecting to the back end (as is the case with many datastores), as well as how to configure cryptpad to use your adaptor.
The current configuration file should serve as an example of what to add, and how to comment.

628
lib/storage/blob.js Normal file
View File

@@ -0,0 +1,628 @@
/* globals Buffer */
var Fs = require("fs");
var Fse = require("fs-extra");
var Path = require("path");
var BlobStore = module.exports;
var nThen = require("nthen");
var Semaphore = require("saferphore");
var Util = require("../common-util");
var isValidSafeKey = function (safeKey) {
return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44;
};
var isValidId = function (id) {
return typeof(id) === 'string' && id.length === 48 && !/[^a-f0-9]/.test(id);
};
// helpers
var prependArchive = function (Env, path) {
return Path.join(Env.archivePath, path);
};
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
var makeBlobPath = function (Env, blobId) {
return Path.join(Env.blobPath, blobId.slice(0, 2), blobId);
};
// /blobstate/<safeKeyPrefix>/<safeKey>
var makeStagePath = function (Env, safeKey) {
return Path.join(Env.blobStagingPath, safeKey.slice(0, 2), safeKey);
};
// /blob/<safeKeyPrefix>/<safeKey>/<blobPrefix>/<blobId>
var makeProofPath = function (Env, safeKey, blobId) {
return Path.join(Env.blobPath, safeKey.slice(0, 3), safeKey, blobId.slice(0, 2), blobId);
};
var parseProofPath = function (path) {
var parts = path.split('/');
return {
blobId: parts[parts.length -1],
safeKey: parts[parts.length - 3],
};
};
// getUploadSize: used by
// getFileSize
var getUploadSize = function (Env, blobId, cb) {
var path = makeBlobPath(Env, blobId);
if (!path) { return cb('INVALID_UPLOAD_ID'); }
Fs.stat(path, function (err, stats) {
if (err) {
// if a file was deleted, its size is 0 bytes
if (err.code === 'ENOENT') { return cb(void 0, 0); }
return void cb(err.code);
}
cb(void 0, stats.size);
});
};
// isFile: used by
// removeOwnedBlob
// uploadComplete
// uploadStatus
var isFile = function (filePath, cb) {
Fs.stat(filePath, function (e, stats) {
if (e) {
if (e.code === 'ENOENT') { return void cb(void 0, false); }
return void cb(e.message);
}
return void cb(void 0, stats.isFile());
});
};
var makeFileStream = function (full, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
Fse.mkdirp(Path.dirname(full), function (e) {
if (e || !full) { // !full for pleasing flow, it's already checked
return void cb(e ? e.message : 'INTERNAL_ERROR');
}
try {
var stream = Fs.createWriteStream(full, {
flags: 'a',
encoding: 'binary',
highWaterMark: Math.pow(2, 16),
});
stream.on('open', function () {
cb(void 0, stream);
});
stream.on('error', function (err) {
cb(err);
});
} catch (err) {
cb('BAD_STREAM');
}
});
};
/********** METHODS **************/
var upload = function (Env, safeKey, content, cb) {
var dec;
try { dec = Buffer.from(content, 'base64'); }
catch (e) { return void cb('DECODE_BUFFER'); }
var len = dec.length;
var session = Env.getSession(safeKey);
if (typeof(session.currentUploadSize) !== 'number' ||
typeof(session.pendingUploadSize) !== 'number') {
// improperly initialized... maybe they didn't check before uploading?
// reject it, just in case
return cb('NOT_READY');
}
if (session.currentUploadSize > session.pendingUploadSize) {
return cb('E_OVER_LIMIT');
}
var stagePath = makeStagePath(Env, safeKey);
if (!session.blobstage) {
makeFileStream(stagePath, function (e, stream) {
if (!stream) { return void cb(e); }
var blobstage = session.blobstage = stream;
blobstage.write(dec);
session.currentUploadSize += len;
cb(void 0, dec.length);
});
} else {
session.blobstage.write(dec);
session.currentUploadSize += len;
cb(void 0, dec.length);
}
};
// upload_cancel
var upload_cancel = function (Env, safeKey, fileSize, cb) {
var session = Env.getSession(safeKey);
session.pendingUploadSize = fileSize;
session.currentUploadSize = 0;
if (session.blobstage) {
session.blobstage.close();
delete session.blobstage;
}
var path = makeStagePath(Env, safeKey);
Fs.unlink(path, function (e) {
if (e) { return void cb('E_UNLINK'); }
cb(void 0);
});
};
// upload_complete
var upload_complete = function (Env, safeKey, id, cb) {
var session = Env.getSession(safeKey);
if (session.blobstage && session.blobstage.close) {
session.blobstage.close();
delete session.blobstage;
}
var oldPath = makeStagePath(Env, safeKey);
var newPath = makeBlobPath(Env, id);
nThen(function (w) {
// make sure the path to your final location exists
Fse.mkdirp(Path.dirname(newPath), function (e) {
if (e) {
w.abort();
return void cb('RENAME_ERR');
}
});
}).nThen(function (w) {
// make sure there's not already something in that exact location
isFile(newPath, function (e, yes) {
if (e) {
w.abort();
return void cb(e);
}
if (yes) {
w.abort();
return void cb('RENAME_ERR');
}
cb(void 0, newPath, id);
});
}).nThen(function () {
// finally, move the old file to the new path
// FIXME we could just move and handle the EEXISTS instead of the above block
Fse.move(oldPath, newPath, function (e) {
if (e) { return void cb('RENAME_ERR'); }
cb(void 0, id);
});
});
};
var tryId = function (path, cb) {
Fs.access(path, Fs.constants.R_OK | Fs.constants.W_OK, function (e) {
if (!e) {
// generate a new id (with the same prefix) and recurse
//WARN('ownedUploadComplete', 'id is already used '+ id);
return void cb('EEXISTS');
} else if (e.code === 'ENOENT') {
// no entry, so it's safe for us to proceed
return void cb();
} else {
// it failed in an unexpected way. log it
//WARN('ownedUploadComplete', e);
return void cb(e.code);
}
});
};
// owned_upload_complete
var owned_upload_complete = function (Env, safeKey, id, cb) {
var session = Env.getSession(safeKey);
// the file has already been uploaded to the staging area
// close the pending writestream
if (session.blobstage && session.blobstage.close) {
session.blobstage.close();
delete session.blobstage;
}
if (!isValidId(id)) {
//WARN('ownedUploadComplete', "id is invalid");
return void cb('EINVAL_ID');
}
var oldPath = makeStagePath(Env, safeKey);
if (typeof(oldPath) !== 'string') {
return void cb('EINVAL_CONFIG');
}
var finalPath = makeBlobPath(Env, id);
var finalOwnPath = makeProofPath(Env, safeKey, id);
// the user wants to move it into blob and create a empty file with the same id
// in their own space:
// /blob/safeKeyPrefix/safeKey/blobPrefix/blobID
nThen(function (w) {
// make the requisite directory structure using Mkdirp
Fse.mkdirp(Path.dirname(finalPath), w(function (e /*, path */) {
if (e) { // does not throw error if the directory already existed
w.abort();
return void cb(e.code);
}
}));
Fse.mkdirp(Path.dirname(finalOwnPath), w(function (e /*, path */) {
if (e) { // does not throw error if the directory already existed
w.abort();
return void cb(e.code);
}
}));
}).nThen(function (w) {
// make sure the id does not collide with another
tryId(finalPath, w(function (e) {
if (e) {
w.abort();
return void cb(e);
}
}));
}).nThen(function (w) {
// Create the empty file proving ownership
Fs.writeFile(finalOwnPath, '', w(function (e) {
if (e) {
w.abort();
return void cb(e.code);
}
// otherwise it worked...
}));
}).nThen(function (w) {
// move the existing file to its new path
Fse.move(oldPath, finalPath, w(function (e) {
if (e) {
// if there's an error putting the file into its final location...
// ... you should remove the ownership file
Fs.unlink(finalOwnPath, function () {
// but if you can't, it's not catestrophic
// we can clean it up later
});
w.abort();
return void cb(e.code);
}
// otherwise it worked...
}));
}).nThen(function () {
// clean up their session when you're done
// call back with the blob id...
cb(void 0, id);
});
};
// removeBlob
var remove = function (Env, blobId, cb) {
var blobPath = makeBlobPath(Env, blobId);
Fs.unlink(blobPath, cb); // TODO COLDSTORAGE
};
// removeProof
var removeProof = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
Fs.unlink(proofPath, cb);
};
// isOwnedBy(id, safeKey)
var isOwnedBy = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
isFile(proofPath, cb);
};
// archiveBlob
var archiveBlob = function (Env, blobId, cb) {
var blobPath = makeBlobPath(Env, blobId);
var archivePath = prependArchive(Env, blobPath);
Fse.move(blobPath, archivePath, { overwrite: true }, cb);
};
var removeArchivedBlob = function (Env, blobId, cb) {
var archivePath = prependArchive(Env, makeBlobPath(Env, blobId));
Fs.unlink(archivePath, cb);
};
// restoreBlob
var restoreBlob = function (Env, blobId, cb) {
var blobPath = makeBlobPath(Env, blobId);
var archivePath = prependArchive(Env, blobPath);
Fse.move(archivePath, blobPath, cb);
};
// archiveProof
var archiveProof = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
var archivePath = prependArchive(Env, proofPath);
Fse.move(proofPath, archivePath, { overwrite: true }, cb);
};
var removeArchivedProof = function (Env, safeKey, blobId, cb) {
var archivedPath = prependArchive(Env, makeProofPath(Env, safeKey, blobId));
Fs.unlink(archivedPath, cb);
};
// restoreProof
var restoreProof = function (Env, safeKey, blobId, cb) {
var proofPath = makeProofPath(Env, safeKey, blobId);
var archivePath = prependArchive(Env, proofPath);
Fse.move(archivePath, proofPath, cb);
};
var makeWalker = function (n, handleChild, done) {
if (!n || typeof(n) !== 'number' || n < 2) { n = 2; }
var W;
nThen(function (w) {
// this asynchronous bit defers the completion of this block until
// synchronous execution has completed. This means you must create
// the walker and start using it synchronously or else it will call back
// prematurely
setTimeout(w());
W = w;
}).nThen(function () {
done();
});
// do no more than 20 jobs at a time
var tasks = Semaphore.create(n);
var recurse = function (path) {
tasks.take(function (give) {
var next = give(W());
nThen(function (w) {
// check if the path is a directory...
Fs.stat(path, w(function (err, stats) {
if (err) { return next(); }
if (!stats.isDirectory()) {
w.abort();
return void handleChild(void 0, path, next);
}
// fall through
}));
}).nThen(function () {
// handle directories
Fs.readdir(path, function (err, dir) {
if (err) { return next(); }
// everything is fine and it's a directory...
dir.forEach(function (d) {
recurse(Path.join(path, d));
});
next();
});
});
});
};
return recurse;
};
var listProofs = function (root, handler, cb) {
Fs.readdir(root, function (err, dir) {
if (err) { return void cb(err); }
var walk = makeWalker(20, function (err, path, next) {
// path is the path to a child node on the filesystem
// next handles the next job in a queue
// iterate over proofs
// check for presence of corresponding files
Fs.stat(path, function (err, stats) {
if (err) {
return void handler(err, void 0, next);
}
var parsed = parseProofPath(path);
handler(void 0, {
path: path,
blobId: parsed.blobId,
safeKey: parsed.safeKey,
atime: stats.atime,
ctime: stats.ctime,
mtime: stats.mtime,
}, next);
});
}, function () {
// called when there are no more directories or children to process
cb();
});
dir.forEach(function (d) {
// ignore directories that aren't 3 characters long...
if (d.length !== 3) { return; }
walk(Path.join(root, d));
});
});
};
var listBlobs = function (root, handler, cb) {
// iterate over files
Fs.readdir(root, function (err, dir) {
if (err) { return void cb(err); }
var walk = makeWalker(20, function (err, path, next) {
Fs.stat(path, function (err, stats) {
if (err) {
return void handler(err, void 0, next);
}
handler(void 0, {
blobId: Path.basename(path),
atime: stats.atime,
ctime: stats.ctime,
mtime: stats.mtime,
}, next);
});
}, function () {
cb();
});
dir.forEach(function (d) {
if (d.length !== 2) { return; }
walk(Path.join(root, d));
});
});
};
BlobStore.create = function (config, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (typeof(config.getSession) !== 'function') {
return void cb("getSession method required");
}
var Env = {
blobPath: config.blobPath || './blob',
blobStagingPath: config.blobStagingPath || './blobstage',
archivePath: config.archivePath || './data/archive',
getSession: config.getSession,
};
nThen(function (w) {
var CB = Util.both(w.abort, cb);
Fse.mkdirp(Env.blobPath, w(function (e) {
if (e) { CB(e); }
}));
Fse.mkdirp(Env.blobStagingPath, w(function (e) {
if (e) { CB(e); }
}));
Fse.mkdirp(Path.join(Env.archivePath, Env.blobPath), w(function (e) {
if (e) { CB(e); }
}));
}).nThen(function () {
var methods = {
isFileId: isValidId,
status: function (safeKey, _cb) {
// TODO check if the final destination is a file
// because otherwise two people can try to upload to the same location
// and one will fail, invalidating their hard work
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
isFile(makeStagePath(Env, safeKey), cb);
},
upload: function (safeKey, content, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
upload(Env, safeKey, content, Util.once(Util.mkAsync(cb)));
},
cancel: function (safeKey, fileSize, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (typeof(fileSize) !== 'number' || isNaN(fileSize) || fileSize <= 0) { return void cb("INVALID_FILESIZE"); }
upload_cancel(Env, safeKey, fileSize, cb);
},
isOwnedBy: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
isOwnedBy(Env, safeKey, blobId, cb);
},
remove: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
remove(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeProof(Env, safeKey, blobId, cb);
},
archived: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeArchivedBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
removeArchivedProof(Env, safeKey, blobId, cb);
},
},
},
archive: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
archiveBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
archiveProof(Env, safeKey, blobId, cb);
},
},
restore: {
blob: function (blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
restoreBlob(Env, blobId, cb);
},
proof: function (safeKey, blobId, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(blobId)) { return void cb("INVALID_ID"); }
restoreProof(Env, safeKey, blobId, cb);
},
},
complete: function (safeKey, id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(id)) { return void cb("INVALID_ID"); }
upload_complete(Env, safeKey, id, cb);
},
completeOwned: function (safeKey, id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidSafeKey(safeKey)) { return void cb('INVALID_SAFEKEY'); }
if (!isValidId(id)) { return void cb("INVALID_ID"); }
owned_upload_complete(Env, safeKey, id, cb);
},
size: function (id, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
if (!isValidId(id)) { return void cb("INVALID_ID"); }
getUploadSize(Env, id, cb);
},
list: {
blobs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listBlobs(Env.blobPath, handler, cb);
},
proofs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listProofs(Env.blobPath, handler, cb);
},
archived: {
proofs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listProofs(prependArchive(Env, Env.blobPath), handler, cb);
},
blobs: function (handler, _cb) {
var cb = Util.once(Util.mkAsync(_cb));
listBlobs(prependArchive(Env, Env.blobPath), handler, cb);
},
}
},
};
cb(void 0, methods);
});
};

1313
lib/storage/file.js Normal file

File diff suppressed because it is too large Load Diff

396
lib/storage/tasks.js Normal file
View File

@@ -0,0 +1,396 @@
var Fs = require("fs");
var Fse = require("fs-extra");
var Path = require("path");
var nacl = require("tweetnacl/nacl-fast");
var nThen = require("nthen");
var Tasks = module.exports;
var tryParse = function (s) {
try { return JSON.parse(s); }
catch (e) { return null; }
};
var encode = function (time, command, args) {
if (typeof(time) !== 'number') { return null; }
if (typeof(command) !== 'string') { return null; }
if (!Array.isArray(args)) { return [time, command]; }
return [time, command].concat(args);
};
/*
var randomId = function () {
var bytes = Array.prototype.slice.call(nacl.randomBytes(16));
return bytes.map(function (b) {
var n = Number(b & 0xff).toString(16);
return n.length === 1? '0' + n: n;
}).join('');
};
var mkPath = function (env, id) {
return Path.join(env.root, id.slice(0, 2), id) + '.ndjson';
};
*/
// make a new folder every MODULUS ms
var MODULUS = 1000 * 60 * 60 * 24; // one day
var moduloTime = function (d) {
return d - (d % MODULUS);
};
var makeDirectoryId = function (d) {
return '' + moduloTime(d);
};
var write = function (env, task, cb) {
var str = JSON.stringify(task) + '\n';
var id = nacl.util.encodeBase64(nacl.hash(nacl.util.decodeUTF8(str))).replace(/\//g, '-');
var dir = makeDirectoryId(task[0]);
var path = Path.join(env.root, dir);
nThen(function (w) {
// create the parent directory if it does not exist
Fse.mkdirp(path, 0x1ff, w(function (err) {
if (err) {
w.abort();
return void cb(err);
}
}));
}).nThen(function () {
// write the file to the path
var fullPath = Path.join(path, id + '.ndjson');
// the file ids are based on the hash of the file contents to be written
// as such, writing an exact task a second time will overwrite the first with the same contents
// this shouldn't be a problem
Fs.writeFile(fullPath, str, function (e) {
if (e) {
env.log.error("TASK_WRITE_FAILURE", {
error: e,
path: fullPath,
});
return void cb(e);
}
env.log.info("SUCCESSFUL_WRITE", {
path: fullPath,
});
cb();
});
});
};
var remove = function (env, path, cb) {
// FIXME COLDSTORAGE?
Fs.unlink(path, cb);
};
var removeDirectory = function (env, path, cb) {
Fs.rmdir(path, cb);
};
var list = Tasks.list = function (env, cb, migration) {
var rootDirs;
nThen(function (w) {
// read the root directory
Fs.readdir(env.root, w(function (e, list) {
if (e) {
env.log.error("TASK_ROOT_DIR", {
root: env.root,
error: e,
});
return void cb(e);
}
if (list.length === 0) {
w.abort();
return void cb(void 0, []);
}
rootDirs = list;
}));
}).nThen(function () {
// schedule the nested directories for exploration
// return a list of paths to tasks
var queue = nThen(function () {});
var allPaths = [];
var currentWindow = moduloTime(+new Date() + MODULUS);
// We prioritize a small footprint over speed, so we
// iterate over directories in serial rather than parallel
rootDirs.forEach(function (dir) {
// if a directory is two characters, it's the old format
// otherwise, it indicates when the file is set to expire
// so we can ignore directories which are clearly in the future
var dirTime;
if (migration) {
// this block handles migrations. ignore new formats
if (dir.length !== 2) {
return;
}
} else {
// not in migration mode, check if it's a new format
if (dir.length >= 2) {
// might be the new format.
// check its time to see if it should be skipped
dirTime = parseInt(dir);
if (!isNaN(dirTime) && dirTime >= currentWindow) {
return;
}
}
}
queue.nThen(function (w) {
var subPath = Path.join(env.root, dir);
Fs.readdir(subPath, w(function (e, paths) {
if (e) {
env.log.error("TASKS_INVALID_SUBDIR", {
path: subPath,
error: e,
});
return;
}
if (paths.length === 0) {
removeDirectory(env, subPath, function (err) {
if (err) {
env.log.error('TASKS_REMOVE_EMPTY_DIRECTORY', {
error: err,
path: subPath,
});
}
});
}
// concat in place
Array.prototype.push.apply(allPaths, paths.map(function (p) {
return Path.join(subPath, p);
}));
}));
});
});
queue.nThen(function () {
cb(void 0, allPaths);
});
});
};
var read = function (env, filePath, cb) {
Fs.readFile(filePath, 'utf8', function (e, str) {
if (e) { return void cb(e); }
var task = tryParse(str);
if (!Array.isArray(task) || task.length < 2) {
env.log("INVALID_TASK", {
path: filePath,
task: task,
});
return cb(new Error('INVALID_TASK'));
}
cb(void 0, task);
});
};
var expire = function (env, task, cb) {
// TODO magic numbers, maybe turn task parsing into a function
// and also maybe just encode tasks in a better format to start...
var Log = env.log;
var args = task.slice(2);
Log.info('ARCHIVAL_SCHEDULED_EXPIRATION', {
task: task,
});
env.store.archiveChannel(args[0], function (err) {
if (err) {
Log.error('ARCHIVE_SCHEDULED_EXPIRATION_ERROR', {
task: task,
error: err,
});
}
cb();
});
};
var run = Tasks.run = function (env, path, cb) {
var CURRENT = +new Date();
var Log = env.log;
var task, time, command, args;
nThen(function (w) {
read(env, path, w(function (err, _task) {
if (err) {
w.abort();
// there was a file but it wasn't valid?
return void cb(err);
}
task = _task;
time = task[0];
if (time > CURRENT) {
w.abort();
return cb();
}
command = task[1];
args = task.slice(2);
}));
}).nThen(function (w) {
switch (command) {
case 'EXPIRE':
return void expire(env, task, w());
default:
Log.warn("TASKS_UNKNOWN_COMMAND", task);
}
}).nThen(function () {
// remove the task file...
remove(env, path, function (err) {
if (err) {
Log.error('TASKS_RECORD_REMOVAL', {
path: path,
err: err,
});
}
cb();
});
});
};
var runAll = function (env, cb) {
// check if already running and bail out if so
if (env.running) {
return void cb("TASK_CONCURRENCY");
}
// if not, set a flag to block concurrency and proceed
env.running = true;
var paths;
nThen(function (w) {
list(env, w(function (err, _paths) {
if (err) {
w.abort();
env.running = false;
return void cb(err);
}
paths = _paths;
}));
}).nThen(function (w) {
var done = w();
var nt = nThen(function () {});
paths.forEach(function (path) {
nt = nt.nThen(function (w) {
run(env, path, w(function (err) {
if (err) {
// Any errors are already logged in 'run'
// the admin will need to review the logs and clean up
}
}));
});
});
nt = nt.nThen(function () {
done();
});
}).nThen(function (/*w*/) {
env.running = false;
cb();
});
};
var migrate = function (env, cb) {
// list every task
list(env, function (err, paths) {
if (err) {
return void cb(err);
}
var nt = nThen(function () {});
paths.forEach(function (path) {
var bypass;
var task;
nt = nt.nThen(function (w) {
// read
read(env, path, w(function (err, _task) {
if (err) {
bypass = true;
env.log.error("TASK_MIGRATION_READ", {
error: err,
path: path,
});
return;
}
task = _task;
}));
}).nThen(function (w) {
if (bypass) { return; }
// rewrite in new format
write(env, task, w(function (err) {
if (err) {
bypass = true;
env.log.error("TASK_MIGRATION_WRITE", {
error: err,
task: task,
});
}
}));
}).nThen(function (w) {
if (bypass) { return; }
// remove
remove(env, path, w(function (err) {
if (err) {
env.log.error("TASK_MIGRATION_REMOVE", {
error: err,
path: path,
});
}
}));
});
});
nt = nt.nThen(function () {
cb();
});
}, true);
};
Tasks.create = function (config, cb) {
if (!config.store) { throw new Error("E_STORE_REQUIRED"); }
if (!config.log) { throw new Error("E_LOG_REQUIRED"); }
var env = {
root: config.taskPath || './tasks',
log: config.log,
store: config.store,
};
// make sure the path exists...
Fse.mkdirp(env.root, 0x1ff, function (err) {
if (err) { return void cb(err); }
cb(void 0, {
write: function (time, command, args, cb) {
var task = encode(time, command, args);
write(env, task, cb);
},
list: function (olderThan, cb) {
list(env, olderThan, cb);
},
remove: function (id, cb) {
remove(env, id, cb);
},
run: function (id, cb) {
run(env, id, cb);
},
runAll: function (cb) {
runAll(env, cb);
},
migrate: function (cb) {
migrate(env, cb);
},
});
});
};