From cded52f83fde33bb6340e61dab29a358e08285e8 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 11:45:51 -0500 Subject: [PATCH 1/6] replicate existing pinned.load API correctly --- lib/pins.js | 43 +++++++++++++++++++++++++++++++------------ 1 file changed, 31 insertions(+), 12 deletions(-) diff --git a/lib/pins.js b/lib/pins.js index cb3d3f0c7..3c1bf3967 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -96,11 +96,20 @@ const getSafeKeyFromPath = function (path) { return path.replace(/^.*\//, '').replace(/\.ndjson/, ''); } -Pins.list = function (done, config) { +Pins.list = function (_done, config) { const pinPath = config.pinPath || './data/pins'; const plan = Plan(config.workers || 5); const handler = config.handler || function () {}; + var isDone = false; + // ensure that 'done' is only called once + // that it calls back asynchronously + // and that it sets 'isDone' to true, so that pending processes + // know to abort + const done = Util.once(Util.both(Util.mkAsync(_done), function () { + isDone = true; + })); + // TODO externalize this via optional handlers? const stats = { logs: 0, @@ -137,29 +146,39 @@ Pins.list = function (done, config) { return void cb(err); } cb(void 0, list.map(function (item) { - return Path.join(path, item); + return { + path: Path.join(path, item), + id: item.replace(/\.ndjson$/, ''), + }; })); }); }; - scanDirectory(pinPath, function (err, paths) { - if (err) { return; } // XXX - paths.forEach(function (path) { + scanDirectory(pinPath, function (err, dirs) { + if (err) { + if (err.code === 'ENOENT') { return void cb(void 0, {}); } + return void done(err); + } + dirs.forEach(function (dir) { plan.job(1, function (next) { - scanDirectory(path, function (nested_err, nested_paths) { - if (nested_err) { return; } // XXX + if (isDone) { return void next(); } + scanDirectory(dir.path, function (nested_err, logs) { + if (nested_err) { + return void done(err); + } stats.dirs++; - nested_paths.forEach(function (nested_path) { - if (!/\.ndjson$/.test(nested_path)) { return; } + logs.forEach(function (log) { + if (!/\.ndjson$/.test(log.path)) { return; } plan.job(0, function (next) { - streamFile(nested_path, function (err, ref) { - if (err) { return; } // XXX + if (isDone) { return void next(); } + streamFile(log.path, function (err, ref) { + if (err) { return void done(err); } stats.logs++; var set = ref.pins; for (var item in set) { + (pinned[item] = pinned[item] || {})[log.id] = 1; if (!pinned.hasOwnProperty(item)) { - pinned[item] = true; stats.pinned++; } } From 38c1700173dc40e1d6b16119e81d41edbe8c0226 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 13:46:40 -0500 Subject: [PATCH 2/6] Respond to pinning RPCs as soon as possible (instead of waiting until you've read an unbounded number of pin logs while queries back up in memory) Also replace instances of 'publicKey' with 'safeKey' or 'unsafeKey' to clearly and correctly indicate their format. --- lib/commands/pin-rpc.js | 246 ++++++++++++++++++++++++++++------------ lib/pins.js | 35 +++--- lib/rpc.js | 8 +- 3 files changed, 193 insertions(+), 96 deletions(-) diff --git a/lib/commands/pin-rpc.js b/lib/commands/pin-rpc.js index 3fc339d00..504733372 100644 --- a/lib/commands/pin-rpc.js +++ b/lib/commands/pin-rpc.js @@ -25,9 +25,9 @@ var sumChannelSizes = function (sizes) { // FIXME it's possible for this to respond before the server has had a chance // to fetch the limits. Maybe we should respond with an error... // or wait until we actually know the limits before responding -var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; +var getLimit = Pinning.getLimit = function (Env, safeKey, cb) { + var unsafeKey = unescapeKeyCharacters(safeKey); + var limit = Env.limits[unsafeKey]; var defaultLimit = typeof(Env.defaultStorageLimit) === 'number'? Env.defaultStorageLimit: Core.DEFAULT_LIMIT; @@ -37,32 +37,89 @@ var getLimit = Pinning.getLimit = function (Env, publicKey, cb) { cb(void 0, toSend); }; +const answerDeferred = function (Env, channel, bool) { + const pending = Env.pendingPinInquiries; + const stack = pending[channel]; + if (!Array.isArray(stack)) { return; } + + delete pending[channel]; + + stack.forEach(function (cb) { + cb(void 0, bool); + }); +}; + var addPinned = function ( Env, - publicKey /*:string*/, + safeKey /*:string*/, channelList /*Array*/, cb /*:()=>void*/) { - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c] = Env.pinnedPads[c] || {}; - x[publicKey] = 1; - }); + channelList.forEach(function (channel) { + Pins.addUserPinToState(Env.pinnedPads, safeKey, channel); + answerDeferred(Env, channel, true); + }); + cb(); +}; + +const isEmpty = function (obj) { + if (!obj || typeof(obj) !== 'object') { return true; } + for (var key in obj) { + if (obj.hasOwnProperty(key)) { return true; } + } + return false; +}; + +const deferUserTask = function (Env, safeKey, deferred) { + const pending = Env.pendingUnpins; + (pending[safeKey] = pending[safeKey] || []).push(deferred); +}; + +const runUserDeferred = function (Env, safeKey) { + const pending = Env.pendingUnpins; + const stack = pending[safeKey]; + if (!Array.isArray(stack)) { return; } + delete pending[safeKey]; + + stack.forEach(function (cb) { cb(); }); }; + +const runRemainingDeferred = function (Env) { + const pending = Env.pendingUnpins; + for (var safeKey in pending) { + runUserDeferred(Env, safeKey); + } +}; + +const removeSelfFromPinned = function (Env, safeKey, channelList) { + channelList.forEach(function (channel) { + const channelPinStatus = Env.pinnedPads[channel]; + if (!channelPinStatus) { return; } + delete channelPinStatus[safeKey]; + if (isEmpty(channelPinStatus)) { + delete Env.pinnedPads[channel]; + } + }); +}; + var removePinned = function ( Env, - publicKey /*:string*/, + safeKey /*:string*/, channelList /*Array*/, cb /*:()=>void*/) { - Env.evPinnedPadsReady.reg(() => { - channelList.forEach((c) => { - const x = Env.pinnedPads[c]; - if (!x) { return; } - delete x[publicKey]; - }); + + // if pins are already loaded then you can just unpin normally + if (Env.pinsLoaded) { + removeSelfFromPinned(Env, safeKey, channelList); + return void cb(); + } + + // otherwise defer until later... + deferUserTask(Env, safeKey, function () { + removeSelfFromPinned(Env, safeKey, channelList); cb(); }); }; @@ -100,24 +157,24 @@ var getMultipleFileSize = function (Env, channels, cb) { }; const batchUserPins = BatchRead("LOAD_USER_PINS"); -var loadUserPins = function (Env, publicKey, cb) { - var session = Core.getSession(Env.Sessions, publicKey); +var loadUserPins = function (Env, safeKey, cb) { + var session = Core.getSession(Env.Sessions, safeKey); if (session.channels) { return cb(session.channels); } - batchUserPins(publicKey, cb, function (done) { + batchUserPins(safeKey, cb, function (done) { var ref = {}; var lineHandler = Pins.createLineHandler(ref, function (label, data) { Env.Log.error(label, { - log: publicKey, + log: safeKey, data: data, }); }); // if channels aren't in memory. load them from disk - Env.pinStore.getMessages(publicKey, lineHandler, function () { + Env.pinStore.getMessages(safeKey, lineHandler, function () { // no more messages // only put this into the cache if it completes @@ -133,27 +190,27 @@ var truthyKeys = function (O) { }); }; -var getChannelList = Pinning.getChannelList = function (Env, publicKey, _cb) { +var getChannelList = Pinning.getChannelList = function (Env, safeKey, _cb) { var cb = Util.once(Util.mkAsync(_cb)); - loadUserPins(Env, publicKey, function (pins) { + loadUserPins(Env, safeKey, function (pins) { cb(truthyKeys(pins)); }); }; const batchTotalSize = BatchRead("GET_TOTAL_SIZE"); -Pinning.getTotalSize = function (Env, publicKey, cb) { - var unescapedKey = unescapeKeyCharacters(publicKey); - var limit = Env.limits[unescapedKey]; +Pinning.getTotalSize = function (Env, safeKey, cb) { + var unsafeKey = unescapeKeyCharacters(safeKey); + var limit = Env.limits[unsafeKey]; // Get a common key if multiple users share the same quota, otherwise take the public key - var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : publicKey; + var batchKey = (limit && Array.isArray(limit.users)) ? limit.users.join('') : safeKey; batchTotalSize(batchKey, cb, function (done) { var channels = []; var bytes = 0; nThen(function (waitFor) { // Get the channels list for our user account - Pinning.getChannelList(Env, publicKey, waitFor(function (_channels) { + getChannelList(Env, safeKey, waitFor(function (_channels) { if (!_channels) { waitFor.abort(); return done('INVALID_PIN_LIST'); @@ -163,7 +220,7 @@ Pinning.getTotalSize = function (Env, publicKey, cb) { // Get the channels list for users sharing our quota if (limit && Array.isArray(limit.users) && limit.users.length > 1) { limit.users.forEach(function (key) { - if (key === unescapedKey) { return; } // Don't count ourselves twice + if (key === unsafeKey) { return; } // Don't count ourselves twice getChannelList(Env, key, waitFor(function (_channels) { if (!_channels) { return; } // Broken user, don't count their quota Array.prototype.push.apply(channels, _channels); @@ -207,10 +264,10 @@ Pinning.trimPins = function (Env, safeKey, cb) { cb("NOT_IMPLEMENTED"); }; -var getFreeSpace = Pinning.getFreeSpace = function (Env, publicKey, cb) { - getLimit(Env, publicKey, function (e, limit) { +var getFreeSpace = Pinning.getFreeSpace = function (Env, safeKey, cb) { + getLimit(Env, safeKey, function (e, limit) { if (e) { return void cb(e); } - Pinning.getTotalSize(Env, publicKey, function (e, size) { + Pinning.getTotalSize(Env, safeKey, function (e, size) { if (typeof(size) === 'undefined') { return void cb(e); } var rem = limit[0] - size; @@ -236,20 +293,20 @@ var hashChannelList = function (A) { return hash; }; -var getHash = Pinning.getHash = function (Env, publicKey, cb) { - getChannelList(Env, publicKey, function (channels) { +var getHash = Pinning.getHash = function (Env, safeKey, cb) { + getChannelList(Env, safeKey, function (channels) { cb(void 0, hashChannelList(channels)); }); }; -Pinning.pinChannel = function (Env, publicKey, channels, cb) { +Pinning.pinChannel = function (Env, safeKey, channels, cb) { if (!channels && channels.filter) { return void cb('INVALID_PIN_LIST'); } // get channel list ensures your session has a cached channel list - getChannelList(Env, publicKey, function (pinned) { - var session = Core.getSession(Env.Sessions, publicKey); + getChannelList(Env, safeKey, function (pinned) { + var session = Core.getSession(Env.Sessions, safeKey); // only pin channels which are not already pinned var toStore = channels.filter(function (channel) { @@ -257,42 +314,42 @@ Pinning.pinChannel = function (Env, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); + return void getHash(Env, safeKey, cb); } getMultipleFileSize(Env, toStore, function (e, sizes) { if (typeof(sizes) === 'undefined') { return void cb(e); } var pinSize = sumChannelSizes(sizes); - getFreeSpace(Env, publicKey, function (e, free) { + getFreeSpace(Env, safeKey, function (e, free) { if (typeof(free) === 'undefined') { Env.WARN('getFreeSpace', e); return void cb(e); } if (pinSize > free) { return void cb('E_OVER_LIMIT'); } - Env.pinStore.message(publicKey, JSON.stringify(['PIN', toStore, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['PIN', toStore, +new Date()]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { session.channels[channel] = true; }); - addPinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); + addPinned(Env, safeKey, toStore, () => {}); + getHash(Env, safeKey, cb); }); }); }); }); }; -Pinning.unpinChannel = function (Env, publicKey, channels, cb) { +Pinning.unpinChannel = function (Env, safeKey, channels, cb) { if (!channels && channels.filter) { // expected array return void cb('INVALID_PIN_LIST'); } - getChannelList(Env, publicKey, function (pinned) { - var session = Core.getSession(Env.Sessions, publicKey); + getChannelList(Env, safeKey, function (pinned) { + var session = Core.getSession(Env.Sessions, safeKey); // only unpin channels which are pinned var toStore = channels.filter(function (channel) { @@ -300,27 +357,27 @@ Pinning.unpinChannel = function (Env, publicKey, channels, cb) { }); if (toStore.length === 0) { - return void getHash(Env, publicKey, cb); + return void getHash(Env, safeKey, cb); } - Env.pinStore.message(publicKey, JSON.stringify(['UNPIN', toStore, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['UNPIN', toStore, +new Date()]), function (e) { if (e) { return void cb(e); } toStore.forEach(function (channel) { delete session.channels[channel]; }); - removePinned(Env, publicKey, toStore, () => {}); - getHash(Env, publicKey, cb); + removePinned(Env, safeKey, toStore, () => {}); + getHash(Env, safeKey, cb); }); }); }; -Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { +Pinning.resetUserPins = function (Env, safeKey, channelList, cb) { if (!Array.isArray(channelList)) { return void cb('INVALID_PIN_LIST'); } - var session = Core.getSession(Env.Sessions, publicKey); + var session = Core.getSession(Env.Sessions, safeKey); if (!channelList.length) { - return void getHash(Env, publicKey, function (e, hash) { + return void getHash(Env, safeKey, function (e, hash) { if (e) { return cb(e); } cb(void 0, hash); }); @@ -332,7 +389,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { var pinSize = sumChannelSizes(sizes); - getLimit(Env, publicKey, function (e, limit) { + getLimit(Env, safeKey, function (e, limit) { if (e) { Env.WARN('[RESET_ERR]', e); return void cb(e); @@ -347,7 +404,7 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { They will not be able to pin additional pads until they upgrade or delete enough files to go back under their limit. */ if (pinSize > limit[0] && session.hasPinned) { return void(cb('E_OVER_LIMIT')); } - Env.pinStore.message(publicKey, JSON.stringify(['RESET', channelList, +new Date()]), + Env.pinStore.message(safeKey, JSON.stringify(['RESET', channelList, +new Date()]), function (e) { if (e) { return void cb(e); } channelList.forEach(function (channel) { @@ -360,13 +417,13 @@ Pinning.resetUserPins = function (Env, publicKey, channelList, cb) { } else { oldChannels = []; } - removePinned(Env, publicKey, oldChannels, () => { - addPinned(Env, publicKey, channelList, ()=>{}); + removePinned(Env, safeKey, oldChannels, () => { + addPinned(Env, safeKey, channelList, ()=>{}); }); // update in-memory cache IFF the reset was allowed. session.channels = pins; - getHash(Env, publicKey, function (e, hash) { + getHash(Env, safeKey, function (e, hash) { cb(e, hash); }); }); @@ -429,35 +486,74 @@ Pinning.getDeletedPads = function (Env, channels, cb) { }); }; +const answerNoConclusively = function (Env) { + const pending = Env.pendingPinInquiries; + for (var channel in pending) { + answerDeferred(Env, channel, false); + } +}; + // inform that the Pinning.loadChannelPins = function (Env) { - Pins.list(function (err, data) { - if (err) { - Env.Log.error("LOAD_CHANNEL_PINS", err); + const stats = { + surplus: 0, + pinned: 0, + duplicated: 0, + users: 0, // XXX useful for admin panel ? + }; - // FIXME not sure what should be done here instead - Env.pinnedPads = {}; - Env.evPinnedPadsReady.fire(); + const handler = function (ref, safeKey, pinned) { + if (ref.surplus) { + stats.surplus += ref.surplus; + } + for (var channel in ref.pins) { + if (!pinned.hasOwnProperty(channel)) { + answerDeferred(Env, channel, true); + stats.pinned++; + } else { + stats.duplicated++; + } + } + stats.users++; + runUserDeferred(Env, safeKey); + }; + + Pins.list(function (err) { + if (err) { + Env.pinsLoaded = true; + Env.Log.error("LOAD_CHANNEL_PINS", err); return; } - - Env.pinnedPads = data; - Env.evPinnedPadsReady.fire(); + Env.pinsLoaded = true; + answerNoConclusively(Env); + runRemainingDeferred(Env); }, { pinPath: Env.paths.pin, + handler: handler, + pinned: Env.pinnedPads, + workers: Env.pinWorkers, }); }; +const deferResponse = function (Env, channel, cb) { + const pending = Env.pendingPinInquiries; + (pending[channel] = pending[channel] || []).push(cb); +}; + Pinning.isChannelPinned = function (Env, channel, cb) { - Env.evPinnedPadsReady.reg(() => { - if (Env.pinnedPads[channel] && Object.keys(Env.pinnedPads[channel]).length) { // FIXME 'Object.keys' here is overkill. We only need to know that it isn't empty - cb(void 0, true); - } else { - delete Env.pinnedPads[channel]; - cb(void 0, false); - } - }); + // if the pins are fully loaded then you can answer yes/no definitively + if (Env.pinsLoaded) { + return void cb(void 0, !isEmpty(Env.pinnedPads[channel])); + } + + // you may already know that a channel is pinned + // even if you're still loading. answer immediately if so + if (!isEmpty(Env.pinnedPads[channel])) { return cb(void 0, true); } + + // if you're still loading them then can answer 'yes' as soon + // as you learn that one account has pinned a file. + // negative responses have to wait until the end + deferResponse(Env, channel, cb); }; - diff --git a/lib/pins.js b/lib/pins.js index 3c1bf3967..41e871446 100644 --- a/lib/pins.js +++ b/lib/pins.js @@ -94,13 +94,26 @@ Pins.calculateFromLog = function (pinFile, fileName) { const getSafeKeyFromPath = function (path) { return path.replace(/^.*\//, '').replace(/\.ndjson/, ''); -} +}; + +const addUserPinToState = Pins.addUserPinToState = function (state, safeKey, itemId) { + (state[itemId] = state[itemId] || {})[safeKey] = 1; +}; Pins.list = function (_done, config) { + // allow for a configurable pin store location const pinPath = config.pinPath || './data/pins'; + + // allow for a configurable amount of parallelism const plan = Plan(config.workers || 5); + + // run a supplied handler whenever you finish reading a log + // or noop if not supplied. const handler = config.handler || function () {}; + // use and mutate a supplied object for state if it's passed + const pinned = config.pinned || {}; + var isDone = false; // ensure that 'done' is only called once // that it calls back asynchronously @@ -110,20 +123,10 @@ Pins.list = function (_done, config) { isDone = true; })); - // TODO externalize this via optional handlers? - const stats = { - logs: 0, - dirs: 0, - pinned: 0, - lines: 0, - }; - const errorHandler = function (label, info) { console.log(label, info); }; - const pinned = {}; - // TODO replace this with lib-readline? const streamFile = function (path, cb) { const id = getSafeKeyFromPath(path); @@ -133,7 +136,6 @@ Pins.list = function (_done, config) { const ref = {}; const pinHandler = createLineHandler(ref, errorHandler); var lines = body.split('\n'); - stats.lines += lines.length; lines.forEach(pinHandler); handler(ref, id, pinned); cb(void 0, ref); @@ -156,7 +158,7 @@ Pins.list = function (_done, config) { scanDirectory(pinPath, function (err, dirs) { if (err) { - if (err.code === 'ENOENT') { return void cb(void 0, {}); } + if (err.code === 'ENOENT') { return void done(void 0, {}); } return void done(err); } dirs.forEach(function (dir) { @@ -166,21 +168,16 @@ Pins.list = function (_done, config) { if (nested_err) { return void done(err); } - stats.dirs++; logs.forEach(function (log) { if (!/\.ndjson$/.test(log.path)) { return; } plan.job(0, function (next) { if (isDone) { return void next(); } streamFile(log.path, function (err, ref) { if (err) { return void done(err); } - stats.logs++; var set = ref.pins; for (var item in set) { - (pinned[item] = pinned[item] || {})[log.id] = 1; - if (!pinned.hasOwnProperty(item)) { - stats.pinned++; - } + addUserPinToState(pinned, log.id, item); } next(); }); diff --git a/lib/rpc.js b/lib/rpc.js index 23597eb5b..eb09ddcd0 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -2,7 +2,6 @@ const nThen = require("nthen"); const Util = require("./common-util"); -const mkEvent = Util.mkEvent; const Core = require("./commands/core"); const Admin = require("./commands/admin-rpc"); @@ -219,9 +218,14 @@ RPC.create = function (config, cb) { Sessions: {}, paths: {}, msgStore: config.store, + pinStore: undefined, pinnedPads: {}, - evPinnedPadsReady: mkEvent(true), + pinsLoaded: false, + pendingPinInquiries: {}, + pendingUnpins: {}, + pinWorkers: 5, + limits: {}, admins: [], Log: Log, From 725d10fc6029acb5b7aecd5ef5d4885b48b0715d Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 16:29:30 -0500 Subject: [PATCH 3/6] nest storage directory inside './lib' --- lib/historyKeeper.js | 4 ++-- lib/log.js | 2 +- lib/rpc.js | 4 ++-- {storage => lib/storage}/README.md | 0 {storage => lib/storage}/blob.js | 2 +- {storage => lib/storage}/file.js | 8 ++++---- {storage => lib/storage}/tasks.js | 0 scripts/diagnose-archive-conflicts.js | 2 +- scripts/evict-inactive.js | 4 ++-- scripts/expire-channels.js | 4 ++-- scripts/migrations/migrate-tasks-v1.js | 4 ++-- scripts/restore-archived.js | 2 +- 12 files changed, 18 insertions(+), 18 deletions(-) rename {storage => lib/storage}/README.md (100%) rename {storage => lib/storage}/blob.js (99%) rename {storage => lib/storage}/file.js (99%) rename {storage => lib/storage}/tasks.js (100%) diff --git a/lib/historyKeeper.js b/lib/historyKeeper.js index dd80d0b53..03f31383b 100644 --- a/lib/historyKeeper.js +++ b/lib/historyKeeper.js @@ -63,12 +63,12 @@ module.exports.create = function (config, cb) { Log.verbose('HK_ID', 'History keeper ID: ' + Env.id); nThen(function (w) { - require('../storage/file').create(config, w(function (_store) { + require('./storage/file').create(config, w(function (_store) { config.store = _store; Env.store = _store; })); }).nThen(function (w) { - require("../storage/tasks").create(config, w(function (e, tasks) { + require("./storage/tasks").create(config, w(function (e, tasks) { if (e) { throw e; } diff --git a/lib/log.js b/lib/log.js index 7de6badb8..756da8734 100644 --- a/lib/log.js +++ b/lib/log.js @@ -1,5 +1,5 @@ /*jshint esversion: 6 */ -var Store = require("../storage/file"); +var Store = require("./storage/file"); var Logger = module.exports; diff --git a/lib/rpc.js b/lib/rpc.js index eb09ddcd0..b5c142af5 100644 --- a/lib/rpc.js +++ b/lib/rpc.js @@ -14,8 +14,8 @@ const Upload = require("./commands/upload"); var RPC = module.exports; -const Store = require("../storage/file"); -const BlobStore = require("../storage/blob"); +const Store = require("./storage/file"); +const BlobStore = require("./storage/blob"); const UNAUTHENTICATED_CALLS = { GET_FILE_SIZE: Pinning.getFileSize, diff --git a/storage/README.md b/lib/storage/README.md similarity index 100% rename from storage/README.md rename to lib/storage/README.md diff --git a/storage/blob.js b/lib/storage/blob.js similarity index 99% rename from storage/blob.js rename to lib/storage/blob.js index c9396d12a..006f5ca80 100644 --- a/storage/blob.js +++ b/lib/storage/blob.js @@ -6,7 +6,7 @@ var Path = require("path"); var BlobStore = module.exports; var nThen = require("nthen"); var Semaphore = require("saferphore"); -var Util = require("../lib/common-util"); +var Util = require("../common-util"); var isValidSafeKey = function (safeKey) { return typeof(safeKey) === 'string' && !/\//.test(safeKey) && safeKey.length === 44; diff --git a/storage/file.js b/lib/storage/file.js similarity index 99% rename from storage/file.js rename to lib/storage/file.js index 6162f4a8b..6b1577d80 100644 --- a/storage/file.js +++ b/lib/storage/file.js @@ -6,11 +6,11 @@ var Fse = require("fs-extra"); var Path = require("path"); var nThen = require("nthen"); var Semaphore = require("saferphore"); -var Util = require("../lib/common-util"); -var Meta = require("../lib/metadata"); -var Extras = require("../lib/hk-util"); +var Util = require("../common-util"); +var Meta = require("../metadata"); +var Extras = require("../hk-util"); -const Schedule = require("../lib/schedule"); +const Schedule = require("../schedule"); const Readline = require("readline"); const ToPull = require('stream-to-pull-stream'); const Pull = require('pull-stream'); diff --git a/storage/tasks.js b/lib/storage/tasks.js similarity index 100% rename from storage/tasks.js rename to lib/storage/tasks.js diff --git a/scripts/diagnose-archive-conflicts.js b/scripts/diagnose-archive-conflicts.js index 8617150fc..0e75f4abe 100644 --- a/scripts/diagnose-archive-conflicts.js +++ b/scripts/diagnose-archive-conflicts.js @@ -1,6 +1,6 @@ var nThen = require("nthen"); -var Store = require("../storage/file"); +var Store = require("../lib/storage/file"); var config = require("../lib/load-config"); var store; diff --git a/scripts/evict-inactive.js b/scripts/evict-inactive.js index e3419ffad..18730fd1e 100644 --- a/scripts/evict-inactive.js +++ b/scripts/evict-inactive.js @@ -1,7 +1,7 @@ var nThen = require("nthen"); -var Store = require("../storage/file"); -var BlobStore = require("../storage/blob"); +var Store = require("../lib/storage/file"); +var BlobStore = require("../lib/storage/blob"); var Pins = require("../lib/pins"); var config = require("../lib/load-config"); diff --git a/scripts/expire-channels.js b/scripts/expire-channels.js index 2479fc193..9151398d2 100644 --- a/scripts/expire-channels.js +++ b/scripts/expire-channels.js @@ -1,9 +1,9 @@ var nThen = require("nthen"); -var Tasks = require("../storage/tasks"); +var Tasks = require("../lib/storage/tasks"); var Logger = require("../lib/log"); var config = require("../lib/load-config"); -var FileStorage = require('../' + config.storage || './storage/file'); +var FileStorage = require('../lib/storage/file'); nThen(function (w) { Logger.create(config, w(function (_log) { diff --git a/scripts/migrations/migrate-tasks-v1.js b/scripts/migrations/migrate-tasks-v1.js index 365faeb4c..87807c74d 100644 --- a/scripts/migrations/migrate-tasks-v1.js +++ b/scripts/migrations/migrate-tasks-v1.js @@ -1,5 +1,5 @@ var nThen = require("nthen"); -var Tasks = require("../../storage/tasks"); +var Tasks = require("../../lib/storage/tasks"); var Logger = require("../../lib/log"); var config = require("../../lib/load-config"); @@ -7,7 +7,7 @@ var config = require("../../lib/load-config"); // this isn't strictly necessary for what we want to do // but the API requires it, and I don't feel like changing that // --ansuz -var FileStorage = require("../../" + (config.storage || "./storage/file")); +var FileStorage = require("../../lib/storage/file")); var tasks; nThen(function (w) { diff --git a/scripts/restore-archived.js b/scripts/restore-archived.js index a420e35e5..3f68b607e 100644 --- a/scripts/restore-archived.js +++ b/scripts/restore-archived.js @@ -1,6 +1,6 @@ var nThen = require("nthen"); -var Store = require("../storage/file"); +var Store = require("../lib/storage/file"); var config = require("../lib/load-config"); var store; From d274be6de4ca3e818f317c009a68f89539b7dd95 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 16:37:00 -0500 Subject: [PATCH 4/6] remove ancient import script --- import | 65 ---------------------------------------------------------- 1 file changed, 65 deletions(-) delete mode 100755 import diff --git a/import b/import deleted file mode 100755 index 1bf1d5de9..000000000 --- a/import +++ /dev/null @@ -1,65 +0,0 @@ -#!/usr/bin/env node -/* globals process */ - -var Config = require("./config"); -var Fs = require("fs"); -var Storage = require(Config.storage); - -var args = process.argv.slice(2); - -if (!args.length) { - console.log("Insufficient arguments!"); - console.log("Pass a path to a database backup!"); - process.exit(); -} - -var dump = Fs.readFileSync(args[0], 'utf-8'); - -var ready = function (store) { - var lock = 0; - dump.split(/\n/) - .filter(function (line) { - return line; - }) - .forEach(function (line, i) { - lock++; - var parts; - - var channel; - var msg; - - line.replace(/^(.*?)\|(.*)$/, function (all, c, m) { - channel = c; - msg = m; - return ''; - }); - - if (!channel || !msg) { - console.log("BAD LINE on line %s", i); - return; - } - - try { - JSON.parse(msg); - } catch (err) { - console.log("BAD LINE on line %s", i); - console.log(msg); - console.log(); - } - - store.message(channel, msg, function () { - console.log(line); - lock--; - if (!lock) { - console.log("DONE"); - process.exit(0); - } - }); - }); -}; - -Storage.create(Config, function (store) { - console.log("READY"); - ready(store); -}); - From 3f606d8c75a45a2869527f70e1cd6e1397df3b0d Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 16:45:53 -0500 Subject: [PATCH 5/6] remove some duplicated code --- lib/deduplicate.js | 11 ----------- lib/hk-util.js | 4 ++-- lib/metadata.js | 2 +- lib/once.js | 7 ------- 4 files changed, 3 insertions(+), 21 deletions(-) delete mode 100644 lib/deduplicate.js delete mode 100644 lib/once.js diff --git a/lib/deduplicate.js b/lib/deduplicate.js deleted file mode 100644 index 3ad62e6b0..000000000 --- a/lib/deduplicate.js +++ /dev/null @@ -1,11 +0,0 @@ -// remove duplicate elements in an array -module.exports = function (O) { - // make a copy of the original array - var A = O.slice(); - for (var i = 0; i < A.length; i++) { - for (var j = i + 1; j < A.length; j++) { - if (A[i] === A[j]) { A.splice(j--, 1); } - } - } - return A; -}; diff --git a/lib/hk-util.js b/lib/hk-util.js index 0a661c6c8..3221d50a2 100644 --- a/lib/hk-util.js +++ b/lib/hk-util.js @@ -3,7 +3,7 @@ var HK = module.exports; const nThen = require('nthen'); -const Once = require("./once"); +const Util = require("./common-util"); const Meta = require("./metadata"); const Nacl = require('tweetnacl/nacl-fast'); @@ -182,7 +182,7 @@ const computeIndex = function (Env, channelName, cb) { const ref = {}; - const CB = Once(cb); + const CB = Util.once(cb); const offsetByHash = {}; let size = 0; diff --git a/lib/metadata.js b/lib/metadata.js index 2b3a0b737..220327456 100644 --- a/lib/metadata.js +++ b/lib/metadata.js @@ -1,6 +1,6 @@ var Meta = module.exports; -var deduplicate = require("./deduplicate"); +var deduplicate = require("./common-util").deduplicateString; /* Metadata fields: diff --git a/lib/once.js b/lib/once.js deleted file mode 100644 index a851af259..000000000 --- a/lib/once.js +++ /dev/null @@ -1,7 +0,0 @@ -module.exports = function (f, g) { - return function () { - if (!f) { return; } - f.apply(this, Array.prototype.slice.call(arguments)); - f = g; - }; -}; From d9ab8d3f62576bba4072705f0c2790ecbef33718 Mon Sep 17 00:00:00 2001 From: ansuz Date: Fri, 14 Feb 2020 16:48:14 -0500 Subject: [PATCH 6/6] lint compliance --- scripts/migrations/migrate-tasks-v1.js | 2 +- scripts/tests/test-pins.js | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/scripts/migrations/migrate-tasks-v1.js b/scripts/migrations/migrate-tasks-v1.js index 87807c74d..40b8d7a87 100644 --- a/scripts/migrations/migrate-tasks-v1.js +++ b/scripts/migrations/migrate-tasks-v1.js @@ -7,7 +7,7 @@ var config = require("../../lib/load-config"); // this isn't strictly necessary for what we want to do // but the API requires it, and I don't feel like changing that // --ansuz -var FileStorage = require("../../lib/storage/file")); +var FileStorage = require("../../lib/storage/file"); var tasks; nThen(function (w) { diff --git a/scripts/tests/test-pins.js b/scripts/tests/test-pins.js index eea164230..712fe621b 100644 --- a/scripts/tests/test-pins.js +++ b/scripts/tests/test-pins.js @@ -30,7 +30,8 @@ var handler = function (ref, id /* safeKey */, pinned) { //console.log(ref, id); }; -Pins.list(function (err, pinned) { +Pins.list(function (err) { + if (err) { return void console.error(err); } /* for (var id in pinned) { console.log(id);