Merge branch 'staging' into communities-allow

This commit is contained in:
yflory 2020-02-17 14:59:14 +01:00
commit 4ec6d8072c
20 changed files with 244 additions and 210 deletions

65
import
View File

@ -1,65 +0,0 @@
#!/usr/bin/env node
/* globals process */
var Config = require("./config");
var Fs = require("fs");
var Storage = require(Config.storage);
var args = process.argv.slice(2);
if (!args.length) {
console.log("Insufficient arguments!");
console.log("Pass a path to a database backup!");
process.exit();
}
var dump = Fs.readFileSync(args[0], 'utf-8');
var ready = function (store) {
var lock = 0;
dump.split(/\n/)
.filter(function (line) {
return line;
})
.forEach(function (line, i) {
lock++;
var parts;
var channel;
var msg;
line.replace(/^(.*?)\|(.*)$/, function (all, c, m) {
channel = c;
msg = m;
return '';
});
if (!channel || !msg) {
console.log("BAD LINE on line %s", i);
return;
}
try {
JSON.parse(msg);
} catch (err) {
console.log("BAD LINE on line %s", i);
console.log(msg);
console.log();
}
store.message(channel, msg, function () {
console.log(line);
lock--;
if (!lock) {
console.log("DONE");
process.exit(0);
}
});
});
};
Storage.create(Config, function (store) {
console.log("READY");
ready(store);
});

View File

@ -25,9 +25,9 @@ var sumChannelSizes = function (sizes) {
// FIXME it's possible for this to respond before the server has had a chance // FIXME it's possible for this to respond before the server has had a chance
// to fetch the limits. Maybe we should respond with an error... // to fetch the limits. Maybe we should respond with an error...
// or wait until we actually know the limits before responding // or wait until we actually know the limits before responding
var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { var getLimit = Pinning.getLimit = function (Env, safeKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey); var unsafeKey = unescapeKeyCharacters(safeKey);
var limit = Env.limits[unescapedKey]; var limit = Env.limits[unsafeKey];
var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'?
Env.defaultStorageLimit: Core.DEFAULT_LIMIT; Env.defaultStorageLimit: Core.DEFAULT_LIMIT;
@ -37,32 +37,89 @@ var getLimit = Pinning.getLimit = function (Env, publicKey, cb) {
cb(void 0, toSend); cb(void 0, toSend);
}; };
const answerDeferred = function (Env, channel, bool) {
const pending = Env.pendingPinInquiries;
const stack = pending[channel];
if (!Array.isArray(stack)) { return; }
delete pending[channel];
stack.forEach(function (cb) {
cb(void 0, bool);
});
};
var addPinned = function ( var addPinned = function (
Env, Env,
publicKey /*:string*/, safeKey /*:string*/,
channelList /*Array<string>*/, channelList /*Array<string>*/,
cb /*:()=>void*/) cb /*:()=>void*/)
{ {
Env.evPinnedPadsReady.reg(() => { channelList.forEach(function (channel) {
channelList.forEach((c) => { Pins.addUserPinToState(Env.pinnedPads, safeKey, channel);
const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {}; answerDeferred(Env, channel, true);
x[publicKey] = 1;
}); });
cb(); cb();
};
const isEmpty = function (obj) {
if (!obj || typeof(obj) !== 'object') { return true; }
for (var key in obj) {
if (obj.hasOwnProperty(key)) { return true; }
}
return false;
};
const deferUserTask = function (Env, safeKey, deferred) {
const pending = Env.pendingUnpins;
(pending[safeKey] = pending[safeKey] || []).push(deferred);
};
const runUserDeferred = function (Env, safeKey) {
const pending = Env.pendingUnpins;
const stack = pending[safeKey];
if (!Array.isArray(stack)) { return; }
delete pending[safeKey];
stack.forEach(function (cb) {
cb();
}); });
}; };
const runRemainingDeferred = function (Env) {
const pending = Env.pendingUnpins;
for (var safeKey in pending) {
runUserDeferred(Env, safeKey);
}
};
const removeSelfFromPinned = function (Env, safeKey, channelList) {
channelList.forEach(function (channel) {
const channelPinStatus = Env.pinnedPads[channel];
if (!channelPinStatus) { return; }
delete channelPinStatus[safeKey];
if (isEmpty(channelPinStatus)) {
delete Env.pinnedPads[channel];
}
});
};
var removePinned = function ( var removePinned = function (
Env, Env,
publicKey /*:string*/, safeKey /*:string*/,
channelList /*Array<string>*/, channelList /*Array<string>*/,
cb /*:()=>void*/) cb /*:()=>void*/)
{ {
Env.evPinnedPadsReady.reg(() => {
channelList.forEach((c) => { // if pins are already loaded then you can just unpin normally
const x = Env.pinnedPads[c]; if (Env.pinsLoaded) {
if (!x) { return; } removeSelfFromPinned(Env, safeKey, channelList);
delete x[publicKey]; return void cb();
}); }
// otherwise defer until later...
deferUserTask(Env, safeKey, function () {
removeSelfFromPinned(Env, safeKey, channelList);
cb(); cb();
}); });
}; };
@ -100,24 +157,24 @@ var getMultipleFileSize = function (Env, channels, cb) {
}; };
const batchUserPins = BatchRead("LOAD_USER_PINS"); const batchUserPins = BatchRead("LOAD_USER_PINS");
var loadUserPins = function (Env, publicKey, cb) { var loadUserPins = function (Env, safeKey, cb) {
var session = Core.getSession(Env.Sessions, publicKey); var session = Core.getSession(Env.Sessions, safeKey);
if (session.channels) { if (session.channels) {
return cb(session.channels); return cb(session.channels);
} }
batchUserPins(publicKey, cb, function (done) { batchUserPins(safeKey, cb, function (done) {
var ref = {}; var ref = {};
var lineHandler = Pins.createLineHandler(ref, function (label, data) { var lineHandler = Pins.createLineHandler(ref, function (label, data) {
Env.Log.error(label, { Env.Log.error(label, {
log: publicKey, log: safeKey,
data: data, data: data,
}); });
}); });
// if channels aren't in memory. load them from disk // if channels aren't in memory. load them from disk
Env.pinStore.getMessages(publicKey, lineHandler, function () { Env.pinStore.getMessages(safeKey, lineHandler, function () {
// no more messages // no more messages
// only put this into the cache if it completes // only put this into the cache if it completes
@ -133,27 +190,27 @@ var truthyKeys = function (O) {
}); });
}; };
var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) { var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) {
var cb = Util.once(Util.mkAsync(_cb)); var cb = Util.once(Util.mkAsync(_cb));
loadUserPins(Env, publicKey, function (pins) { loadUserPins(Env, safeKey, function (pins) {
cb(truthyKeys(pins)); cb(truthyKeys(pins));
}); });
}; };
const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); const batchTotalSize = BatchRead("GET_TOTAL_SIZE");
Pinning.getTotalSize = function (Env, publicKey, cb) { Pinning.getTotalSize = function (Env, safeKey, cb) {
var unescapedKey = unescapeKeyCharacters(publicKey); var unsafeKey = unescapeKeyCharacters(safeKey);
var limit = Env.limits[unescapedKey]; var limit = Env.limits[unsafeKey];
// Get a common key if multiple users share the same quota, otherwise take the public key // Get a common key if multiple users share the same quota, otherwise take the public key
var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey; var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : safeKey;
batchTotalSize(batchKey, cb, function (done) { batchTotalSize(batchKey, cb, function (done) {
var channels = []; var channels = [];
var bytes = 0; var bytes = 0;
nThen(function (waitFor) { nThen(function (waitFor) {
// Get the channels list for our user account // Get the channels list for our user account
Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) { getChannelList(Env, safeKey, waitFor(function (_channels) {
if (!_channels) { if (!_channels) {
waitFor.abort(); waitFor.abort();
return done('INVALID_PIN_LIST'); return done('INVALID_PIN_LIST');
@ -163,7 +220,7 @@ Pinning.getTotalSize = function (Env, publicKey, cb) {
// Get the channels list for users sharing our quota // Get the channels list for users sharing our quota
if (limit && Array.isArray(limit.users) && limit.users.length > 1) { if (limit && Array.isArray(limit.users) && limit.users.length > 1) {
limit.users.forEach(function (key) { limit.users.forEach(function (key) {
if (key === unescapedKey) { return; } // Don't count ourselves twice if (key === unsafeKey) { return; } // Don't count ourselves twice
getChannelList(Env, key, waitFor(function (_channels) { getChannelList(Env, key, waitFor(function (_channels) {
if (!_channels) { return; } // Broken user, don't count their quota if (!_channels) { return; } // Broken user, don't count their quota
Array.prototype.push.apply(channels, _channels); Array.prototype.push.apply(channels, _channels);
@ -207,10 +264,10 @@ Pinning.trimPins = function (Env, safeKey, cb) {
cb("NOT_IMPLEMENTED"); cb("NOT_IMPLEMENTED");
}; };
var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) { var getFreeSpace = Pinning.getFreeSpace = function (Env, safeKey, cb) {
getLimit(Env, publicKey, function (e, limit) { getLimit(Env, safeKey, function (e, limit) {
if (e) { return void cb(e); } if (e) { return void cb(e); }
Pinning.getTotalSize(Env, publicKey, function (e, size) { Pinning.getTotalSize(Env, safeKey, function (e, size) {
if (typeof(size) === 'undefined') { return void cb(e); } if (typeof(size) === 'undefined') { return void cb(e); }
var rem = limit[0] - size; var rem = limit[0] - size;
@ -236,20 +293,20 @@ var hashChannelList = function (A) {
return hash; return hash;
}; };
var getHash = Pinning.getHash = function (Env, publicKey, cb) { var getHash = Pinning.getHash = function (Env, safeKey, cb) {
getChannelList(Env, publicKey, function (channels) { getChannelList(Env, safeKey, function (channels) {
cb(void 0, hashChannelList(channels)); cb(void 0, hashChannelList(channels));
}); });
}; };
Pinning.pinChannel = function (Env, publicKey, channels, cb) { Pinning.pinChannel = function (Env, safeKey, channels, cb) {
if (!channels && channels.filter) { if (!channels && channels.filter) {
return void cb('INVALID_PIN_LIST'); return void cb('INVALID_PIN_LIST');
} }
// get channel list ensures your session has a cached channel list // get channel list ensures your session has a cached channel list
getChannelList(Env, publicKey, function (pinned) { getChannelList(Env, safeKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey); var session = Core.getSession(Env.Sessions, safeKey);
// only pin channels which are not already pinned // only pin channels which are not already pinned
var toStore = channels.filter(function (channel) { var toStore = channels.filter(function (channel) {
@ -257,42 +314,42 @@ Pinning.pinChannel = function (Env, publicKey, channels, cb) {
}); });
if (toStore.length === 0) { if (toStore.length === 0) {
return void getHash(Env, publicKey, cb); return void getHash(Env, safeKey, cb);
} }
getMultipleFileSize(Env, toStore, function (e, sizes) { getMultipleFileSize(Env, toStore, function (e, sizes) {
if (typeof(sizes) === 'undefined') { return void cb(e); } if (typeof(sizes) === 'undefined') { return void cb(e); }
var pinSize = sumChannelSizes(sizes); var pinSize = sumChannelSizes(sizes);
getFreeSpace(Env, publicKey, function (e, free) { getFreeSpace(Env, safeKey, function (e, free) {
if (typeof(free) === 'undefined') { if (typeof(free) === 'undefined') {
Env.WARN('getFreeSpace', e); Env.WARN('getFreeSpace', e);
return void cb(e); return void cb(e);
} }
if (pinSize > free) { return void cb('E_OVER_LIMIT'); } if (pinSize > free) { return void cb('E_OVER_LIMIT'); }
Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]), Env.pinStore.message(safeKey, JSON.stringify(['PIN', toStore, +new Date()]),
function (e) { function (e) {
if (e) { return void cb(e); } if (e) { return void cb(e); }
toStore.forEach(function (channel) { toStore.forEach(function (channel) {
session.channels[channel] = true; session.channels[channel] = true;
}); });
addPinned(Env, publicKey, toStore, () => {}); addPinned(Env, safeKey, toStore, () => {});
getHash(Env, publicKey, cb); getHash(Env, safeKey, cb);
}); });
}); });
}); });
}); });
}; };
Pinning.unpinChannel = function (Env, publicKey, channels, cb) { Pinning.unpinChannel = function (Env, safeKey, channels, cb) {
if (!channels && channels.filter) { if (!channels && channels.filter) {
// expected array // expected array
return void cb('INVALID_PIN_LIST'); return void cb('INVALID_PIN_LIST');
} }
getChannelList(Env, publicKey, function (pinned) { getChannelList(Env, safeKey, function (pinned) {
var session = Core.getSession(Env.Sessions, publicKey); var session = Core.getSession(Env.Sessions, safeKey);
// only unpin channels which are pinned // only unpin channels which are pinned
var toStore = channels.filter(function (channel) { var toStore = channels.filter(function (channel) {
@ -300,27 +357,27 @@ Pinning.unpinChannel = function (Env, publicKey, channels, cb) {
}); });
if (toStore.length === 0) { if (toStore.length === 0) {
return void getHash(Env, publicKey, cb); return void getHash(Env, safeKey, cb);
} }
Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]), Env.pinStore.message(safeKey, JSON.stringify(['UNPIN', toStore, +new Date()]),
function (e) { function (e) {
if (e) { return void cb(e); } if (e) { return void cb(e); }
toStore.forEach(function (channel) { toStore.forEach(function (channel) {
delete session.channels[channel]; delete session.channels[channel];
}); });
removePinned(Env, publicKey, toStore, () => {}); removePinned(Env, safeKey, toStore, () => {});
getHash(Env, publicKey, cb); getHash(Env, safeKey, cb);
}); });
}); });
}; };
Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { Pinning.resetUserPins = function (Env, safeKey, channelList, cb) {
if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); } if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); }
var session = Core.getSession(Env.Sessions, publicKey); var session = Core.getSession(Env.Sessions, safeKey);
if (!channelList.length) { if (!channelList.length) {
return void getHash(Env, publicKey, function (e, hash) { return void getHash(Env, safeKey, function (e, hash) {
if (e) { return cb(e); } if (e) { return cb(e); }
cb(void 0, hash); cb(void 0, hash);
}); });
@ -332,7 +389,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
var pinSize = sumChannelSizes(sizes); var pinSize = sumChannelSizes(sizes);
getLimit(Env, publicKey, function (e, limit) { getLimit(Env, safeKey, function (e, limit) {
if (e) { if (e) {
Env.WARN('[RESET_ERR]', e); Env.WARN('[RESET_ERR]', e);
return void cb(e); return void cb(e);
@ -347,7 +404,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
They will not be able to pin additional pads until they upgrade They will not be able to pin additional pads until they upgrade
or delete enough files to go back under their limit. */ or delete enough files to go back under their limit. */
if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); } if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); }
Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]), Env.pinStore.message(safeKey, JSON.stringify(['RESET', channelList, +new Date()]),
function (e) { function (e) {
if (e) { return void cb(e); } if (e) { return void cb(e); }
channelList.forEach(function (channel) { channelList.forEach(function (channel) {
@ -360,13 +417,13 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) {
} else { } else {
oldChannels = []; oldChannels = [];
} }
removePinned(Env, publicKey, oldChannels, () => { removePinned(Env, safeKey, oldChannels, () => {
addPinned(Env, publicKey, channelList, ()=>{}); addPinned(Env, safeKey, channelList, ()=>{});
}); });
// update in-memory cache IFF the reset was allowed. // update in-memory cache IFF the reset was allowed.
session.channels = pins; session.channels = pins;
getHash(Env, publicKey, function (e, hash) { getHash(Env, safeKey, function (e, hash) {
cb(e, hash); cb(e, hash);
}); });
}); });
@ -429,35 +486,74 @@ Pinning.getDeletedPads = function (Env, channels, cb) {
}); });
}; };
const answerNoConclusively = function (Env) {
const pending = Env.pendingPinInquiries;
for (var channel in pending) {
answerDeferred(Env, channel, false);
}
};
// inform that the // inform that the
Pinning.loadChannelPins = function (Env) { Pinning.loadChannelPins = function (Env) {
Pins.list(function (err, data) { const stats = {
if (err) { surplus: 0,
Env.Log.error("LOAD_CHANNEL_PINS", err); pinned: 0,
duplicated: 0,
users: 0, // XXX useful for admin panel ?
};
// FIXME not sure what should be done here instead const handler = function (ref, safeKey, pinned) {
Env.pinnedPads = {}; if (ref.surplus) {
Env.evPinnedPadsReady.fire(); stats.surplus += ref.surplus;
}
for (var channel in ref.pins) {
if (!pinned.hasOwnProperty(channel)) {
answerDeferred(Env, channel, true);
stats.pinned++;
} else {
stats.duplicated++;
}
}
stats.users++;
runUserDeferred(Env, safeKey);
};
Pins.list(function (err) {
if (err) {
Env.pinsLoaded = true;
Env.Log.error("LOAD_CHANNEL_PINS", err);
return; return;
} }
Env.pinsLoaded = true;
Env.pinnedPads = data; answerNoConclusively(Env);
Env.evPinnedPadsReady.fire(); runRemainingDeferred(Env);
}, { }, {
pinPath: Env.paths.pin, pinPath: Env.paths.pin,
handler: handler,
pinned: Env.pinnedPads,
workers: Env.pinWorkers,
}); });
}; };
const deferResponse = function (Env, channel, cb) {
const pending = Env.pendingPinInquiries;
(pending[channel] = pending[channel] || []).push(cb);
};
Pinning.isChannelPinned = function (Env, channel, cb) { Pinning.isChannelPinned = function (Env, channel, cb) {
Env.evPinnedPadsReady.reg(() => { // if the pins are fully loaded then you can answer yes/no definitively
if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty if (Env.pinsLoaded) {
cb(void 0, true); return void cb(void 0, !isEmpty(Env.pinnedPads[channel]));
} else {
delete Env.pinnedPads[channel];
cb(void 0, false);
} }
});
// you may already know that a channel is pinned
// even if you're still loading. answer immediately if so
if (!isEmpty(Env.pinnedPads[channel])) { return cb(void 0, true); }
// if you're still loading them then can answer 'yes' as soon
// as you learn that one account has pinned a file.
// negative responses have to wait until the end
deferResponse(Env, channel, cb);
}; };

View File

@ -1,11 +0,0 @@
// remove duplicate elements in an array
module.exports = function (O) {
// make a copy of the original array
var A = O.slice();
for (var i = 0; i < A.length; i++) {
for (var j = i + 1; j < A.length; j++) {
if (A[i] === A[j]) { A.splice(j--, 1); }
}
}
return A;
};

View File

@ -63,12 +63,12 @@ module.exports.create = function (config, cb) {
Log.verbose('HK_ID', 'History keeper ID: ' + Env.id); Log.verbose('HK_ID', 'History keeper ID: ' + Env.id);
nThen(function (w) { nThen(function (w) {
require('../storage/file').create(config, w(function (_store) { require('./storage/file').create(config, w(function (_store) {
config.store = _store; config.store = _store;
Env.store = _store; Env.store = _store;
})); }));
}).nThen(function (w) { }).nThen(function (w) {
require("../storage/tasks").create(config, w(function (e, tasks) { require("./storage/tasks").create(config, w(function (e, tasks) {
if (e) { if (e) {
throw e; throw e;
} }

View File

@ -3,7 +3,7 @@
var HK = module.exports; var HK = module.exports;
const nThen = require('nthen'); const nThen = require('nthen');
const Once = require("./once"); const Util = require("./common-util");
const Meta = require("./metadata"); const Meta = require("./metadata");
const Nacl = require('tweetnacl/nacl-fast'); const Nacl = require('tweetnacl/nacl-fast');
@ -182,7 +182,7 @@ const computeIndex = function (Env, channelName, cb) {
const ref = {}; const ref = {};
const CB = Once(cb); const CB = Util.once(cb);
const offsetByHash = {}; const offsetByHash = {};
let size = 0; let size = 0;

View File

@ -1,5 +1,5 @@
/*jshint esversion: 6 */ /*jshint esversion: 6 */
var Store = require("../storage/file"); var Store = require("./storage/file");
var Logger = module.exports; var Logger = module.exports;

View File

@ -1,6 +1,6 @@
var Meta = module.exports; var Meta = module.exports;
var deduplicate = require("./deduplicate"); var deduplicate = require("./common-util").deduplicateString;
/* Metadata fields: /* Metadata fields:

View File

@ -1,7 +0,0 @@
module.exports = function (f, g) {
return function () {
if (!f) { return; }
f.apply(this, Array.prototype.slice.call(arguments));
f = g;
};
};

View File

@ -94,27 +94,39 @@ Pins.calculateFromLog = function (pinFile, fileName) {
const getSafeKeyFromPath = function (path) { const getSafeKeyFromPath = function (path) {
return path.replace(/^.*\//, '').replace(/\.ndjson/, ''); return path.replace(/^.*\//, '').replace(/\.ndjson/, '');
} };
Pins.list = function (done, config) { const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) {
(state[itemId] = state[itemId] || {})[safeKey] = 1;
};
Pins.list = function (_done, config) {
// allow for a configurable pin store location
const pinPath = config.pinPath || './data/pins'; const pinPath = config.pinPath || './data/pins';
// allow for a configurable amount of parallelism
const plan = Plan(config.workers || 5); const plan = Plan(config.workers || 5);
// run a supplied handler whenever you finish reading a log
// or noop if not supplied.
const handler = config.handler || function () {}; const handler = config.handler || function () {};
// TODO externalize this via optional handlers? // use and mutate a supplied object for state if it's passed
const stats = { const pinned = config.pinned || {};
logs: 0,
dirs: 0, var isDone = false;
pinned: 0, // ensure that 'done' is only called once
lines: 0, // that it calls back asynchronously
}; // and that it sets 'isDone' to true, so that pending processes
// know to abort
const done = Util.once(Util.both(Util.mkAsync(_done), function () {
isDone = true;
}));
const errorHandler = function (label, info) { const errorHandler = function (label, info) {
console.log(label, info); console.log(label, info);
}; };
const pinned = {};
// TODO replace this with lib-readline? // TODO replace this with lib-readline?
const streamFile = function (path, cb) { const streamFile = function (path, cb) {
const id = getSafeKeyFromPath(path); const id = getSafeKeyFromPath(path);
@ -124,7 +136,6 @@ Pins.list = function (done, config) {
const ref = {}; const ref = {};
const pinHandler = createLineHandler(ref, errorHandler); const pinHandler = createLineHandler(ref, errorHandler);
var lines = body.split('\n'); var lines = body.split('\n');
stats.lines += lines.length;
lines.forEach(pinHandler); lines.forEach(pinHandler);
handler(ref, id, pinned); handler(ref, id, pinned);
cb(void 0, ref); cb(void 0, ref);
@ -137,31 +148,36 @@ Pins.list = function (done, config) {
return void cb(err); return void cb(err);
} }
cb(void 0, list.map(function (item) { cb(void 0, list.map(function (item) {
return Path.join(path, item); return {
path: Path.join(path, item),
id: item.replace(/\.ndjson$/, ''),
};
})); }));
}); });
}; };
scanDirectory(pinPath, function (err, paths) { scanDirectory(pinPath, function (err, dirs) {
if (err) { return; } // XXX if (err) {
paths.forEach(function (path) { if (err.code === 'ENOENT') { return void done(void 0, {}); }
return void done(err);
}
dirs.forEach(function (dir) {
plan.job(1, function (next) { plan.job(1, function (next) {
scanDirectory(path, function (nested_err, nested_paths) { if (isDone) { return void next(); }
if (nested_err) { return; } // XXX scanDirectory(dir.path, function (nested_err, logs) {
stats.dirs++; if (nested_err) {
nested_paths.forEach(function (nested_path) { return void done(err);
if (!/\.ndjson$/.test(nested_path)) { return; } }
logs.forEach(function (log) {
if (!/\.ndjson$/.test(log.path)) { return; }
plan.job(0, function (next) { plan.job(0, function (next) {
streamFile(nested_path, function (err, ref) { if (isDone) { return void next(); }
if (err) { return; } // XXX streamFile(log.path, function (err, ref) {
stats.logs++; if (err) { return void done(err); }
var set = ref.pins; var set = ref.pins;
for (var item in set) { for (var item in set) {
if (!pinned.hasOwnProperty(item)) { addUserPinToState(pinned, log.id, item);
pinned[item] = true;
stats.pinned++;
}
} }
next(); next();
}); });

View File

@ -2,7 +2,6 @@
const nThen = require("nthen"); const nThen = require("nthen");
const Util = require("./common-util"); const Util = require("./common-util");
const mkEvent = Util.mkEvent;
const Core = require("./commands/core"); const Core = require("./commands/core");
const Admin = require("./commands/admin-rpc"); const Admin = require("./commands/admin-rpc");
@ -15,8 +14,8 @@ const Upload = require("./commands/upload");
var RPC = module.exports; var RPC = module.exports;
const Store = require("../storage/file"); const Store = require("./storage/file");
const BlobStore = require("../storage/blob"); const BlobStore = require("./storage/blob");
const UNAUTHENTICATED_CALLS = { const UNAUTHENTICATED_CALLS = {
GET_FILE_SIZE: Pinning.getFileSize, GET_FILE_SIZE: Pinning.getFileSize,
@ -219,9 +218,14 @@ RPC.create = function (config, cb) {
Sessions: {}, Sessions: {},
paths: {}, paths: {},
msgStore: config.store, msgStore: config.store,
pinStore: undefined, pinStore: undefined,
pinnedPads: {}, pinnedPads: {},
evPinnedPadsReady: mkEvent(true), pinsLoaded: false,
pendingPinInquiries: {},
pendingUnpins: {},
pinWorkers: 5,
limits: {}, limits: {},
admins: [], admins: [],
Log: Log, Log: Log,

View File

@ -6,7 +6,7 @@ var Path = require("path");
var BlobStore = module.exports; var BlobStore = module.exports;
var nThen = require("nthen"); var nThen = require("nthen");
var Semaphore = require("saferphore"); var Semaphore = require("saferphore");
var Util = require("../lib/common-util"); var Util = require("../common-util");
var isValidSafeKey = function (safeKey) { var isValidSafeKey = function (safeKey) {
return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44; return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44;

View File

@ -6,11 +6,11 @@ var Fse = require("fs-extra");
var Path = require("path"); var Path = require("path");
var nThen = require("nthen"); var nThen = require("nthen");
var Semaphore = require("saferphore"); var Semaphore = require("saferphore");
var Util = require("../lib/common-util"); var Util = require("../common-util");
var Meta = require("../lib/metadata"); var Meta = require("../metadata");
var Extras = require("../lib/hk-util"); var Extras = require("../hk-util");
const Schedule = require("../lib/schedule"); const Schedule = require("../schedule");
const Readline = require("readline"); const Readline = require("readline");
const ToPull = require('stream-to-pull-stream'); const ToPull = require('stream-to-pull-stream');
const Pull = require('pull-stream'); const Pull = require('pull-stream');

View File

@ -1,6 +1,6 @@
var nThen = require("nthen"); var nThen = require("nthen");
var Store = require("../storage/file"); var Store = require("../lib/storage/file");
var config = require("../lib/load-config"); var config = require("../lib/load-config");
var store; var store;

View File

@ -1,7 +1,7 @@
var nThen = require("nthen"); var nThen = require("nthen");
var Store = require("../storage/file"); var Store = require("../lib/storage/file");
var BlobStore = require("../storage/blob"); var BlobStore = require("../lib/storage/blob");
var Pins = require("../lib/pins"); var Pins = require("../lib/pins");
var config = require("../lib/load-config"); var config = require("../lib/load-config");

View File

@ -1,9 +1,9 @@
var nThen = require("nthen"); var nThen = require("nthen");
var Tasks = require("../storage/tasks"); var Tasks = require("../lib/storage/tasks");
var Logger = require("../lib/log"); var Logger = require("../lib/log");
var config = require("../lib/load-config"); var config = require("../lib/load-config");
var FileStorage = require('../' + config.storage || './storage/file'); var FileStorage = require('../lib/storage/file');
nThen(function (w) { nThen(function (w) {
Logger.create(config, w(function (_log) { Logger.create(config, w(function (_log) {

View File

@ -1,5 +1,5 @@
var nThen = require("nthen"); var nThen = require("nthen");
var Tasks = require("../../storage/tasks"); var Tasks = require("../../lib/storage/tasks");
var Logger = require("../../lib/log"); var Logger = require("../../lib/log");
var config = require("../../lib/load-config"); var config = require("../../lib/load-config");
@ -7,7 +7,7 @@ var config = require("../../lib/load-config");
// this isn't strictly necessary for what we want to do // this isn't strictly necessary for what we want to do
// but the API requires it, and I don't feel like changing that // but the API requires it, and I don't feel like changing that
// --ansuz // --ansuz
var FileStorage = require("../../" + (config.storage || "./storage/file")); var FileStorage = require("../../lib/storage/file");
var tasks; var tasks;
nThen(function (w) { nThen(function (w) {

View File

@ -1,6 +1,6 @@
var nThen = require("nthen"); var nThen = require("nthen");
var Store = require("../storage/file"); var Store = require("../lib/storage/file");
var config = require("../lib/load-config"); var config = require("../lib/load-config");
var store; var store;

View File

@ -30,7 +30,8 @@ var handler = function (ref, id /* safeKey */, pinned) {
//console.log(ref, id); //console.log(ref, id);
}; };
Pins.list(function (err, pinned) { Pins.list(function (err) {
if (err) { return void console.error(err); }
/* /*
for (var id in pinned) { for (var id in pinned) {
console.log(id); console.log(id);