Merge branch 'netflux' into beta

Conflicts:
	www/assert/main.js
This commit is contained in:
ansuz
2016-04-21 15:47:07 +02:00
5 changed files with 52 additions and 251 deletions

View File

@@ -1,189 +0,0 @@
;(function () { 'use strict';
let Crypto = require('crypto');
let WebSocket = require('ws');
let LogStore = require('./storage/LogStore');
let LAG_MAX_BEFORE_DISCONNECT = 30000;
let LAG_MAX_BEFORE_PING = 15000;
let HISTORY_KEEPER_ID = "_HISTORY_KEEPER_";
let dropUser;
let now = function () { return (new Date()).getTime(); };
let sendMsg = function (ctx, user, msg) {
try {
console.log('<' + JSON.stringify(msg));
user.socket.send(JSON.stringify(msg));
} catch (e) {
console.log(e.stack);
dropUser(ctx, user);
}
};
let sendChannelMessage = function (ctx, channel, msgStruct) {
msgStruct.unshift(0);
channel.forEach(function (user) { sendMsg(ctx, user, msgStruct); });
if (msgStruct[2] === 'MSG') {
ctx.store.message(channel.id, JSON.stringify(msgStruct), function () { });
}
};
dropUser = function (ctx, user) {
if (user.socket.readyState !== WebSocket.CLOSING
&& user.socket.readyState !== WebSocket.CLOSED)
{
try {
user.socket.close();
} catch (e) {
console.log("Failed to disconnect ["+user.id+"], attempting to terminate");
try {
user.socket.terminate();
} catch (ee) {
console.log("Failed to terminate ["+user.id+"] *shrug*");
}
}
}
delete ctx.users[user.id];
Object.keys(ctx.channels).forEach(function (chanName) {
let chan = ctx.channels[chanName];
let idx = chan.indexOf(user);
if (idx < 0) { return; }
console.log("Removing ["+user.id+"] from channel ["+chanName+"]");
chan.splice(idx, 1);
if (chan.length === 0) {
console.log("Removing empty channel ["+chanName+"]");
delete ctx.channels[chanName];
} else {
sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chanName, 'Quit: [ dropUser() ]']);
}
});
};
let getHistory = function (ctx, channelName, handler) {
ctx.store.getMessages(channelName, function (msgStr) { handler(JSON.parse(msgStr)); });
};
let randName = function () { return Crypto.randomBytes(16).toString('hex'); };
let handleMessage = function (ctx, user, msg) {
let json = JSON.parse(msg);
let seq = json.shift();
let cmd = json[0];
let obj = json[1];
user.timeOfLastMessage = now();
user.pingOutstanding = false;
if (cmd === 'JOIN') {
/*if (obj && obj.length !== 32) {
sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
return;
}*/
let chanName = obj || randName();
let chan = ctx.channels[chanName] = ctx.channels[chanName] || [];
chan.id = chanName;
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'JOIN', chanName]);
chan.forEach(function (u) { sendMsg(ctx, user, [0, u.id, 'JOIN', chanName]); });
chan.push(user);
sendChannelMessage(ctx, chan, [user.id, 'JOIN', chanName]);
return;
}
if (cmd === 'MSG') {
if (obj === HISTORY_KEEPER_ID) {
let parsed;
try { parsed = JSON.parse(json[2]); } catch (err) { return; }
if (parsed[0] === 'GET_HISTORY') {
console.log('getHistory ' + parsed[1]);
getHistory(ctx, parsed[1], function (msg) {
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, JSON.stringify(msg)]);
});
sendMsg(ctx, user, [0, HISTORY_KEEPER_ID, 'MSG', user.id, 0]);
}
return;
}
if (obj && !ctx.channels[obj] && !ctx.users[obj]) {
sendMsg(ctx, user, [seq, 'ERROR', 'ENOENT', obj]);
return;
}
let target;
json.unshift(user.id);
if ((target = ctx.channels[obj])) {
sendChannelMessage(ctx, target, json);
return;
}
if ((target = ctx.users[obj])) {
json.unshift(0);
sendMsg(ctx, target, json);
return;
}
}
if (cmd === 'LEAVE') {
let err;
let chan;
let idx;
if (!obj) { err = 'EINVAL'; }
if (!err && !(chan = ctx.channels[obj])) { err = 'ENOENT'; }
if (!err && (idx = chan.indexOf(user)) === -1) { err = 'NOT_IN_CHAN'; }
if (err) {
sendMsg(ctx, user, [seq, 'ERROR', err]);
return;
}
json.unshift(user.id);
sendChannelMessage(ctx, chan, [user.id, 'LEAVE', chan.id]);
chan.splice(idx, 1);
}
if (cmd === 'PING') {
sendMsg(ctx, user, [seq, 'PONG', obj]);
return;
}
};
let run = module.exports.run = function (storage, socketServer) {
let ctx = {
users: {},
channels: {},
store: LogStore.create('messages.log', storage)
};
setInterval(function () {
Object.keys(ctx.users).forEach(function (userId) {
let u = ctx.users[userId];
if (now() - u.timeOfLastMessage > LAG_MAX_BEFORE_DISCONNECT) {
dropUser(ctx, u);
} else if (!u.pingOutstanding && now() - u.timeOfLastMessage > LAG_MAX_BEFORE_PING) {
sendMsg(ctx, u, [0, 'PING', now()]);
u.pingOutstanding = true;
}
});
}, 5000);
socketServer.on('connection', function(socket) {
if(socket.upgradeReq.url !== '/cryptpad_websocket') { return; }
let conn = socket.upgradeReq.connection;
let user = {
addr: conn.remoteAddress + '|' + conn.remotePort,
socket: socket,
id: randName(),
timeOfLastMessage: now(),
pingOutstanding: false
};
ctx.users[user.id] = user;
sendMsg(ctx, user, [0, 'IDENT', user.id]);
socket.on('message', function(message) {
console.log('>'+message);
try {
handleMessage(ctx, user, message);
} catch (e) {
console.log(e.stack);
dropUser(ctx, user);
}
});
socket.on('close', function (evt) {
for (let userId in ctx.users) {
if (ctx.users[userId].socket === socket) {
dropUser(ctx, ctx.users[userId]);
}
}
});
});
};
}());

View File

@@ -222,8 +222,8 @@ define([
verbose(message);
allMessages.push(message);
if (!initializing) {
if (toReturn.onLocal) {
toReturn.onLocal();
if (config.onLocal) {
config.onLocal();
}
}
realtime.message(message);

View File

@@ -18,11 +18,10 @@ define([
'/common/messages.js',
'/common/netflux-client.js',
'/common/crypto.js',
'/common/TextPatcher.js',
'/common/es6-promise.min.js',
'/common/chainpad.js',
'/bower_components/jquery/dist/jquery.min.js',
], function (Messages, Netflux, Crypto, TextPatcher) {
], function (Messages, Netflux, Crypto) {
var $ = window.jQuery;
var ChainPad = window.ChainPad;
var PARANOIA = true;
@@ -117,6 +116,7 @@ define([
initializing = false;
// execute an onReady callback if one was supplied
// FIXME this should be once the chain has synced
if (config.onReady) {
config.onReady({
realtime: realtime
@@ -227,13 +227,6 @@ define([
// Open a Chainpad session
realtime = createRealtime();
toReturn.onEvent = function (newText) {
// assert to show that we're not out of sync
if (realtime.getUserDoc() !== newText) {
warn("realtime.getUserDoc() !== newText");
}
};
// Sending a message...
realtime.onMessage(function(message) {
// Filter messages sent by Chainpad to make it compatible with Netflux
@@ -265,12 +258,6 @@ define([
wc.history_keeper = hc;
if (hc) { network.sendto(hc, JSON.stringify(['GET_HISTORY', wc.id])); }
toReturn.patchText = TextPatcher.create({
realtime: realtime,
logging: true
});
realtime.start();
};
@@ -290,6 +277,16 @@ define([
Netflux.connect(websocketUrl).then(function(network) {
// pass messages that come out of netflux into our local handler
network.on('disconnect', function (evt) {
// TODO also abort if Netflux times out
// that will be managed in Netflux-client.js
if (config.onAbort) {
config.onAbort({
reason: evt.reason
});
}
});
network.on('message', function (msg, sender) { // Direct message
var wchan = findChannelById(network.webChannels, channel);
if(wchan) {

View File

@@ -311,25 +311,14 @@ define([
toolbar.failed();
};
var rti = module.realtimeInput = realtimeInput.start(realtimeOptions);
/* It's incredibly important that you assign 'rti.onLocal'
It's used inside of realtimeInput to make sure that all changes
make it into chainpad.
It's being assigned this way because it can't be passed in, and
and can't be easily returned from realtime input without making
the code less extensible.
*/
var propogate = rti.onLocal = function () {
var onLocal = realtimeOptions.onLocal = function () {
if (initializing) { return; }
var shjson = stringifyDOM(inner);
if (!rti.patchText(shjson)) {
return;
}
rti.onEvent(shjson);
rti.patchText(shjson);
};
var rti = module.realtimeInput = realtimeInput.start(realtimeOptions);
/* hitting enter makes a new line, but places the cursor inside
of the <br> instead of the <p>. This makes it such that you
cannot type until you click, which is rather unnacceptable.
@@ -342,12 +331,13 @@ define([
var easyTest = window.easyTest = function () {
cursor.update();
var start = cursor.Range.start;
var test = TypingTest.testInput(inner, start.el, start.offset, propogate);
propogate();
var test = TypingTest.testInput(inner, start.el, start.offset, onLocal);
// why twice?
onLocale();
return test;
};
editor.on('change', propogate);
editor.on('change', onLocal);
});
};

View File

@@ -11,10 +11,13 @@ define([
'/common/json-ot.js',
'/common/TypingTests.js',
'json.sortify',
'/common/TextPatcher.js',
'/bower_components/diff-dom/diffDOM.js',
'/bower_components/jquery/dist/jquery.min.js',
'/customize/pad.js'
], function (Config, Messages, Crypto, realtimeInput, Hyperjson, Hyperscript, Toolbar, Cursor, JsonOT, TypingTest, JSONSortify) {
], function (Config, Messages, Crypto, realtimeInput, Hyperjson, Hyperscript,
Toolbar, Cursor, JsonOT, TypingTest, JSONSortify, TextPatcher) {
var $ = window.jQuery;
var ifrw = $('#pad-iframe')[0].contentWindow;
var Ckeditor; // to be initialized later...
@@ -288,7 +291,7 @@ define([
var shjson2 = stringifyDOM(inner);
if (shjson2 !== shjson) {
console.error("shjson2 !== shjson");
module.realtimeInput.patchText(shjson2);
module.patchText(shjson2);
}
};
@@ -304,12 +307,21 @@ define([
/* TODO handle disconnects and such*/
};
// this should only ever get called once, when the chain syncs
var onReady = realtimeOptions.onReady = function (info) {
console.log("Unlocking editor");
initializing = false;
setEditable(true);
module.patchText = TextPatcher.create({
realtime: info.realtime,
logging: false,
});
module.realtime = info.realtime;
var shjson = info.realtime.getUserDoc();
applyHjson(shjson);
console.log("Unlocking editor");
setEditable(true);
initializing = false;
};
var onAbort = realtimeOptions.onAbort = function (info) {
@@ -320,21 +332,9 @@ define([
toolbar.failed();
};
var onLocal = realtimeOptions.onLocal = function () {
if (initializing) { return; }
var rti = module.realtimeInput = realtimeInput.start(realtimeOptions);
/* It's incredibly important that you assign 'rti.onLocal'
It's used inside of realtimeInput to make sure that all changes
make it into chainpad.
It's being assigned this way because it can't be passed in, and
and can't be easily returned from realtime input without making
the code less extensible.
*/
var propogate = rti.onLocal = function () {
// serialize your DOM into an object
var hjson = Hyperjson.fromDOM(inner, isNotMagicLine, brFilter);
@@ -344,12 +344,15 @@ define([
}
// stringify the json and send it into chainpad
var shjson = stringify(hjson);
if (!rti.patchText(shjson)) {
return;
module.patchText(shjson);
if (module.realtime.getUserDoc() !== shjson) {
console.error("realtime.getUserDoc() !== shjson");
}
rti.onEvent(shjson);
};
var rti = module.realtimeInput = realtimeInput.start(realtimeOptions);
/* hitting enter makes a new line, but places the cursor inside
of the <br> instead of the <p>. This makes it such that you
cannot type until you click, which is rather unnacceptable.
@@ -359,7 +362,7 @@ define([
the first such keypress will not be inserted into the P. */
inner.addEventListener('keydown', cursor.brFix);
editor.on('change', propogate);
editor.on('change', onLocal);
// export the typing tests to the window.
// call like `test = easyTest()`
@@ -367,8 +370,8 @@ define([
var easyTest = window.easyTest = function () {
cursor.update();
var start = cursor.Range.start;
var test = TypingTest.testInput(inner, start.el, start.offset, propogate);
propogate();
var test = TypingTest.testInput(inner, start.el, start.offset, onLocal);
onLocal();
return test;
};
});