implement flat file storage
This commit is contained in:
parent
217e664d4c
commit
be69e2828c
145
storage/file.js
145
storage/file.js
@ -1,22 +1,149 @@
|
|||||||
var Fs = require("fs");
|
var Fs = require("fs");
|
||||||
|
var Path = require("path");
|
||||||
|
|
||||||
var insert = function (env, channel, content, cb) {
|
//function will check if a directory exists, and create it if it doesn't
|
||||||
|
var checkDir = function (dir, cb) {
|
||||||
|
Fs.stat(dir, function(err, stats) {
|
||||||
|
//Check if error defined and the error code is "not exists"
|
||||||
|
if (err) {
|
||||||
|
//Create the directory, call the callback.
|
||||||
|
Fs.mkdir(dir, cb);
|
||||||
|
} else {
|
||||||
|
//just in case there was a different error:
|
||||||
|
cb(err);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
var checkFile = function (path, cb) {
|
||||||
|
Fs.stat(path, function (err, stats) {
|
||||||
|
if (err) {
|
||||||
|
if (err.code === 'ENOENT') {
|
||||||
|
return cb(null, false);
|
||||||
|
} else {
|
||||||
|
return cb(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cb(null, stats.isFile());
|
||||||
|
});
|
||||||
|
};
|
||||||
|
|
||||||
|
var separate = function (channel) {
|
||||||
|
return {
|
||||||
|
first: channel.slice(0, 2),
|
||||||
|
rest: channel.slice(2),
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
var Channel = function (env, id, filepath, cb) {
|
||||||
|
if (!env.channels[id]) {
|
||||||
|
return (env.channels[id] = {
|
||||||
|
atime: +new Date(),
|
||||||
|
queue: [],
|
||||||
|
stream: Fs.createWriteStream(filepath, {
|
||||||
|
flags: 'a'
|
||||||
|
}).on('open', function () {
|
||||||
|
cb(null, env.channels[id]);
|
||||||
|
}).on('error', function (err) {
|
||||||
|
cb(err);
|
||||||
|
})
|
||||||
|
});
|
||||||
|
}
|
||||||
|
cb(null, env.channels[id]);
|
||||||
|
};
|
||||||
|
|
||||||
|
var insert = function (env, channelName, content, cb) {
|
||||||
|
var parts = separate(channelName);
|
||||||
|
|
||||||
|
var dirpath = Path.join(env.root, parts.first);
|
||||||
|
checkDir(dirpath, function (e) {
|
||||||
|
if (e) { throw new Error(e); }
|
||||||
|
|
||||||
|
var filepath = Path.join(env.root, parts.first, parts.rest);
|
||||||
|
checkFile(filepath, function (err, isFile) {
|
||||||
|
Channel(env, channelName, filepath, function (err, channel) {
|
||||||
|
if (err) {
|
||||||
|
console.error(err);
|
||||||
|
return cb();
|
||||||
|
}
|
||||||
|
|
||||||
|
var doIt = function () {
|
||||||
|
channel.locked = true;
|
||||||
|
channel.atime = +new Date();
|
||||||
|
channel.stream.write(JSON.stringify(content) + '\n');
|
||||||
|
|
||||||
|
if (!channel.queue.length) {
|
||||||
|
channel.locked = false;
|
||||||
|
cb();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
channel.queue.shift()();
|
||||||
|
cb();
|
||||||
|
};
|
||||||
|
|
||||||
|
if (channel.locked) {
|
||||||
|
channel.queue.push(doIt);
|
||||||
|
} else {
|
||||||
|
doIt();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
var getMessages = function (env, channelName, msgHandler, cb) {
|
var getMessages = function (env, channelName, msgHandler, cb) {
|
||||||
|
var parts = separate(channelName);
|
||||||
|
|
||||||
|
var filepath = Path.join(env.root, parts.first, parts.rest);
|
||||||
|
|
||||||
|
var remainder = '';
|
||||||
|
var newlines = /[\n\r]+/;
|
||||||
|
|
||||||
|
var stream = Fs.createReadStream(filepath, 'utf-8')
|
||||||
|
.on('data', function (chunk) {
|
||||||
|
var lines = chunk.split(newlines);
|
||||||
|
lines[0] = remainder + lines[0];
|
||||||
|
remainder = lines.pop();
|
||||||
|
lines.forEach(function (line) {
|
||||||
|
msgHandler(JSON.parse(line));
|
||||||
|
});
|
||||||
|
})
|
||||||
|
.on('end', function () { cb(); })
|
||||||
|
.on('error', function (e) { cb(); });
|
||||||
};
|
};
|
||||||
|
|
||||||
module.exports.create = function (conf, cb) {
|
module.exports.create = function (conf, cb) {
|
||||||
var env = {};
|
var env = {
|
||||||
|
root: conf.filePath,
|
||||||
|
channels: { },
|
||||||
|
};
|
||||||
|
|
||||||
cb({
|
checkDir(env.root, function (e, data) {
|
||||||
message: function (channelName, content, cb) {
|
cb({
|
||||||
insert(env, channelName, content, cb);
|
message: function (channelName, content, cb) {
|
||||||
},
|
insert(env, channelName, content, cb);
|
||||||
getMessages: function (channelName, msgHandler, cb) {
|
},
|
||||||
getMessages(env, channelName, msgHandler, cb);
|
getMessages: function (channelName, msgHandler, cb) {
|
||||||
},
|
getMessages(env, channelName, msgHandler, cb);
|
||||||
|
},
|
||||||
|
removeChannel: function (channelName, cb) {
|
||||||
|
console.log("[storage/file.removeChannel()] Not implemented");
|
||||||
|
cb();
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
setInterval(function () {
|
||||||
|
var now = +new Date();
|
||||||
|
Object.keys(env.channels).forEach(function (id) {
|
||||||
|
var channel = env.channels[id];
|
||||||
|
if (now - channel.atime > (1000 * 60)) {
|
||||||
|
console.log("Cleaning up channel [%s]", id);
|
||||||
|
|
||||||
|
channel.stream.close();
|
||||||
|
delete env.channels[id];
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}, 60 * 1000);
|
||||||
});
|
});
|
||||||
};
|
};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user