Replace the Netflux old client (netflux.js) by the Netflux2 client.

Move the WebRTC peer-to-peer use case in /padrtc, which still uses the old
Netflux client
Use es6-promises.min.js to solve a issue with some browser and the new
Netflux client
This commit is contained in:
Yann Flory
2016-04-07 18:48:01 +02:00
parent cf9f60bd57
commit 0b3d6e15b8
14 changed files with 2628 additions and 160 deletions

9
www/common/es6-promise.min.js vendored Normal file

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,225 @@
/*global: WebSocket */
define(() => {
'use strict';
const MAX_LAG_BEFORE_PING = 15000;
const MAX_LAG_BEFORE_DISCONNECT = 30000;
const PING_CYCLE = 5000;
const REQUEST_TIMEOUT = 5000;
const now = () => new Date().getTime();
const networkSendTo = (ctx, peerId, content) => {
const seq = ctx.seq++;
ctx.ws.send(JSON.stringify([seq, 'MSG', peerId, content]));
return new Promise((res, rej) => {
ctx.requests[seq] = { reject: rej, resolve: res, time: now() };
});
};
const channelBcast = (ctx, chanId, content) => {
const chan = ctx.channels[chanId];
if (!chan) { throw new Error("no such channel " + chanId); }
const seq = ctx.seq++;
ctx.ws.send(JSON.stringify([seq, 'MSG', chanId, content]));
return new Promise((res, rej) => {
ctx.requests[seq] = { reject: rej, resolve: res, time: now() };
});
};
const channelLeave = (ctx, chanId, reason) => {
const chan = ctx.channels[chanId];
if (!chan) { throw new Error("no such channel " + chanId); }
delete ctx.channels[chanId];
ctx.ws.send(JSON.stringify([ctx.seq++, 'LEAVE', chanId, reason]));
};
const makeEventHandlers = (ctx, mappings) => {
return (name, handler) => {
const handlers = mappings[name];
if (!handlers) { throw new Error("no such event " + name); }
handlers.push(handler);
};
};
const mkChannel = (ctx, id) => {
const internal = {
onMessage: [],
onJoin: [],
onLeave: [],
members: [],
jSeq: ctx.seq++
};
const chan = {
_: internal,
id: id,
members: internal.members,
bcast: (msg) => channelBcast(ctx, chan.id, msg),
leave: (reason) => channelLeave(ctx, chan.id, reason),
on: makeEventHandlers(ctx, { message:
internal.onMessage, join: internal.onJoin, leave: internal.onLeave })
};
ctx.requests[internal.jSeq] = chan;
ctx.ws.send(JSON.stringify([internal.jSeq, 'JOIN', id]));
return new Promise((res, rej) => {
chan._.resolve = res;
chan._.reject = rej;
})
};
const mkNetwork = (ctx) => {
const network = {
webChannels: ctx.channels,
getLag: () => (ctx.lag),
sendto: (peerId, content) => (networkSendTo(ctx, peerId, content)),
join: (chanId) => (mkChannel(ctx, chanId)),
on: makeEventHandlers(ctx, { message: ctx.onMessage, disconnect: ctx.onDisconnect })
};
network.__defineGetter__("webChannels", () => {
return Object.keys(ctx.channels).map((k) => (ctx.channels[k]));
});
return network;
};
const onMessage = (ctx, evt) => {
let msg;
try { msg = JSON.parse(evt.data); } catch (e) { console.log(e.stack); return; }
if (msg[0] !== 0) {
const req = ctx.requests[msg[0]];
if (!req) {
console.log("error: " + JSON.stringify(msg));
return;
}
delete ctx.requests[msg[0]];
if (msg[1] === 'ACK') {
if (req.ping) { // ACK of a PING
ctx.lag = now() - Number(req.ping);
return;
}
req.resolve();
} else if (msg[1] === 'JACK') {
if (req._) {
// Channel join request...
if (!msg[2]) { throw new Error("wrong type of ACK for channel join"); }
req.id = msg[2];
ctx.channels[req.id] = req;
return;
}
req.resolve();
} else if (msg[1] === 'ERROR') {
req.reject({ type: msg[2], message: msg[3] });
} else {
req.reject({ type: 'UNKNOWN', message: JSON.stringify(msg) });
}
return;
}
if (msg[2] === 'IDENT') {
ctx.uid = msg[3];
setInterval(() => {
if (now() - ctx.timeOfLastMessage < MAX_LAG_BEFORE_PING) { return; }
let seq = ctx.seq++;
let currentDate = now();
ctx.requests[seq] = {time: now(), ping: currentDate};
ctx.ws.send(JSON.stringify([seq, 'PING', currentDate]));
if (now() - ctx.timeOfLastMessage > MAX_LAG_BEFORE_DISCONNECT) {
ctx.ws.close();
}
}, PING_CYCLE);
return;
} else if (!ctx.uid) {
// extranious message, waiting for an ident.
return;
}
if (msg[2] === 'PING') {
msg[1] = 'PONG';
ctx.ws.send(JSON.stringify(msg));
return;
}
if (msg[2] === 'MSG') {
let handlers;
if (msg[3] === ctx.uid) {
handlers = ctx.onMessage;
} else {
const chan = ctx.channels[msg[3]];
if (!chan) {
console.log("message to non-existant chan " + JSON.stringify(msg));
return;
}
handlers = chan._.onMessage;
}
handlers.forEach((h) => {
try { h(msg[4], msg[1]); } catch (e) { console.log(e.stack); }
});
}
if (msg[2] === 'LEAVE') {
const chan = ctx.channels[msg[3]];
if (!chan) {
console.log("leaving non-existant chan " + JSON.stringify(msg));
return;
}
chan._.onLeave.forEach((h) => {
try { h(msg[1], msg[4]); } catch (e) { console.log(e.stack); }
});
}
if (msg[2] === 'JOIN') {
const chan = ctx.channels[msg[3]];
if (!chan) {
console.log("ERROR: join to non-existant chan " + JSON.stringify(msg));
return;
}
// have we yet fully joined the chan?
const synced = (chan._.members.indexOf(ctx.uid) !== -1);
chan._.members.push(msg[1]);
if (!synced && msg[1] === ctx.uid) {
// sync the channel join event
chan.myID = ctx.uid;
chan._.resolve(chan);
}
if (synced) {
chan._.onJoin.forEach((h) => {
try { h(msg[1]); } catch (e) { console.log(e.stack); }
});
}
}
};
const connect = (websocketURL) => {
let ctx = {
ws: new WebSocket(websocketURL),
seq: 1,
lag: 0,
uid: null,
network: null,
channels: {},
onMessage: [],
onDisconnect: [],
requests: {}
};
setInterval(() => {
for (let id in ctx.requests) {
const req = ctx.requests[id];
if (now() - req.time > REQUEST_TIMEOUT) {
delete ctx.requests[id];
req.reject({ type: 'TIMEOUT', message: 'waited ' + now() - req.time + 'ms' });
}
}
}, 5000);
ctx.network = mkNetwork(ctx);
ctx.ws.onmessage = (msg) => (onMessage(ctx, msg));
ctx.ws.onclose = (evt) => {
ctx.onDisconnect.forEach((h) => {
try { h(evt.reason); } catch (e) { console.log(e.stack); }
});
};
return new Promise((resolve, reject) => {
ctx.ws.onopen = () => resolve(ctx.network);
});
};
return { connect: connect };
});

View File

@@ -1342,6 +1342,7 @@ return /******/ (function(modules) { // webpackBootstrap
if (msg[0] !== 0 && msg[1] !== 'ACK') {
return;
}
if (msg[2] === 'IDENT' && msg[1] === '') {
socket.uid = msg[3];
webChannel.myID = msg[3];
@@ -1401,7 +1402,7 @@ return /******/ (function(modules) { // webpackBootstrap
// Trigger onJoining() when another user is joining the channel
// Register the user in the list of peers in the channel
if (webChannel.peers.length === 0 && msg[1].length === 16) {
// We've just catched the history keeper
// We've just catched the history keeper (16 characters length name)
history_keeper = msg[1];
webChannel.hc = history_keeper;
}

View File

@@ -17,10 +17,11 @@
window.Reflect = { has: (x,y) => { return (y in x); } };
define([
'/common/messages.js',
'/common/netflux.js',
'/common/netflux-client.js',
'/common/crypto.js',
'/common/toolbar.js',
'/_socket/text-patcher.js',
'/common/es6-promise.min.js',
'/common/chainpad.js',
'/bower_components/jquery/dist/jquery.min.js',
], function (Messages, Netflux, Crypto, Toolbar, TextPatcher) {
@@ -75,7 +76,6 @@ define([
function (config)
{
var websocketUrl = config.websocketURL;
var webrtcUrl = config.webrtcURL;
var userName = config.userName;
var channel = config.channel;
var chanKey = config.cryptKey;
@@ -122,25 +122,20 @@ define([
content.length + ':' + content;
};
var onPeerMessage = function(toId, type, wc) {
if(type === 6) {
messagesHistory.forEach(function(msg) {
wc.sendTo(toId, '1:y'+msg);
});
wc.sendTo(toId, '0');
}
};
var whoami = new RegExp(userName.replace(/[\/\+]/g, function (c) {
return '\\' +c;
}));
var onMessage = function(peer, msg, wc) {
var onMessage = function(peer, msg, wc, network) {
if(msg === 0 || msg === '0') {
onReady(wc);
var hc = (wc && wc.history_keeper) ? wc.history_keeper : null;
if(wc && (msg === 0 || msg === '0')) {
onReady(wc, network);
return;
}
else if (peer === hc){
msg = JSON.parse(msg)[4];
}
var message = chainpadAdapter.msgIn(peer, msg);
verbose(message);
@@ -176,8 +171,10 @@ define([
users: []
};
var onJoining = function(peer) {
if(peer.length !== 32) { return; }
var list = userList.users;
if(list.indexOf(peer) === -1) {
var index = list.indexOf(peer);
if(index === -1) {
userList.users.push(peer);
}
userList.onChange();
@@ -216,7 +213,7 @@ define([
if(parsed.content[0] === 4) { // PING message from Chainpad
parsed.content[0] = 5;
onMessage('', '1:y'+mkMessage(parsed.user, parsed.channelId, parsed.content));
wc.sendPing();
// wc.sendPing();
return;
}
return Crypto.encrypt(msg, cryptKey);
@@ -227,20 +224,6 @@ define([
key: ''
};
var rtc = true;
if(!getParameterByName("webrtc") || !webrtcUrl) {
rtc = false;
options.signaling = websocketUrl;
options.topology = 'StarTopologyService';
options.protocol = 'WebSocketProtocolService';
options.connector = 'WebSocketService';
options.openWebChannel = true;
}
else {
options.signaling = webrtcUrl;
}
var createRealtime = function(chan) {
return ChainPad.create(userName,
passwd,
@@ -251,12 +234,12 @@ define([
});
};
var onReady = function(wc) {
var onReady = function(wc, network) {
if(config.onInit) {
config.onInit({
myID: wc.myID,
realtime: realtime,
webChannel: wc,
getLag: network.getLag,
userList: userList
});
}
@@ -274,18 +257,21 @@ define([
}
}
var onOpen = function(wc) {
var onOpen = function(wc, network) {
channel = wc.id;
window.location.hash = channel + '|' + chanKey;
// Add the existing peers in the userList
wc.members.forEach(onJoining);
// Add the handlers to the WebChannel
wc.onmessage = function(peer, msg) { // On receiving message
onMessage(peer, msg, wc);
};
wc.onJoining = onJoining; // On user joining the session
wc.onLeaving = onLeaving; // On user leaving the session
wc.onPeerMessage = function(peerId, type) {
onPeerMessage(peerId, type, wc);
};
wc.on('message', function (msg, sender) { //Channel msg
onMessage(sender, msg, wc, network);
});
wc.on('join', onJoining);
wc.on('leave', onLeaving);
if(config.setMyID) {
config.setMyID({
myID: wc.myID
@@ -299,7 +285,7 @@ define([
// Filter messages sent by Chainpad to make it compatible with Netflux
message = chainpadAdapter.msgOut(message, wc);
if(message) {
wc.send(message).then(function() {
wc.bcast(message).then(function() {
// Send the message back to Chainpad once it is sent to the recipients.
onMessage(wc.myID, message);
}, function(err) {
@@ -311,17 +297,11 @@ define([
// Get the channel history
var hc;
if(rtc) {
wc.channels.forEach(function (c) { if(!hc) { hc = c; } });
if(hc) {
wc.getHistory(hc.peerID);
}
}
else {
// TODO : Improve WebSocket service to use the latest Netflux's API
wc.peers.forEach(function (p) { if (!hc || p.linkQuality > hc.linkQuality) { hc = p; } });
hc.send(JSON.stringify(['GET_HISTORY', wc.id]));
}
wc.members.forEach(function (p) {
if (p.length === 16) { hc = p; }
});
wc.history_keeper = hc;
if (hc) { network.sendto(hc, JSON.stringify(['GET_HISTORY', wc.id])); }
toReturn.patchText = TextPatcher.create({
@@ -331,58 +311,30 @@ define([
realtime.start();
};
var createRTCChannel = function () {
// Check if the WebRTC channel exists and create it if necessary
var webchannel = Netflux.create();
webchannel.openForJoining(options).then(function(data) {
onOpen(webchannel);
onReady(webchannel);
}, function(error) {
warn(error);
});
};
var findChannelById = function(webChannels, channelId) {
var webChannel;
webChannels.forEach(function(chan) {
if(chan.id == channelId) { webChannel = chan; return;}
});
return webChannel;
}
var joinChannel = function() {
// Connect to the WebSocket/WebRTC channel
Netflux.join(channel, options).then(function(wc) {
onOpen(wc);
}, function(error) {
if(rtc && error.code === 1008) {// Unexisting RTC channel
createRTCChannel();
// Connect to the WebSocket channel
Netflux.connect(websocketUrl).then(function(network) {
network.on('message', function (msg, sender) { // Direct message
var wchan = findChannelById(network.webChannels, channel);
if(wchan) {
onMessage(sender, msg, wchan, network);
}
else { warn(error); }
});
};
joinChannel();
var checkConnection = function(wc) {
if(wc.channels && wc.channels.size > 0) {
var channels = Array.from(wc.channels);
var channel = channels[0];
var socketChecker = setInterval(function () {
if (channel.checkSocket(realtime)) {
warn("Socket disconnected!");
recoverableErrorCount += 1;
if (recoverableErrorCount >= MAX_RECOVERABLE_ERRORS) {
warn("Giving up!");
realtime.abort();
try { channel.close(); } catch (e) { warn(e); }
if (config.onAbort) {
config.onAbort({
socket: channel
});
}
if (socketChecker) { clearInterval(socketChecker); }
}
} else {
// it's working as expected, continue
}
}, 200);
}
};
network.join(channel || null).then(function(wc) {
onOpen(wc, network);
}, function(error) {
console.error(error);
})
}, function(error) {
warn(error);
});
return toReturn;
};

View File

@@ -132,7 +132,7 @@ define([
userList.forEach(function(user) {
if(user !== myUserName) {
var data = (userData) ? (userData[user] || null) : null;
var userName = (data) ? data.name : null;
var userName = (data) ? data.name : user;
if(userName) {
if(i === 0) list = ' : ';
list += userName + ', ';
@@ -170,9 +170,9 @@ define([
return $container.find('#'+id)[0];
};
var checkLag = function (webChannel, lagElement) {
if(typeof webChannel.getLag !== "function") { return; }
var lag = webChannel.getLag();
var checkLag = function (getLag, lagElement) {
if(typeof getLag !== "function") { return; }
var lag = getLag();
var lagMsg = Messages.lag + ' ';
if(lag) {
var lagSec = lag/1000;
@@ -214,7 +214,7 @@ define([
localStorage['CryptPad_RECENTPADS'] = JSON.stringify(out);
};
var create = function ($container, myUserName, realtime, webChannel, userList, config) {
var create = function ($container, myUserName, realtime, getLag, userList, config) {
var toolbar = createRealtimeToolbar($container);
createEscape(toolbar.find('.rtwysiwyg-toolbar-leftside'));
var userListElement = createUserList(toolbar.find('.rtwysiwyg-toolbar-leftside'));
@@ -223,7 +223,7 @@ define([
var userData = config.userData;
var changeNameID = config.changeNameID;
// Check if the suer is allowed to change his name
// Check if the user is allowed to change his name
if(changeNameID) {
// Create the button and update the element containing the user list
userListElement = createChangeName($container, userListElement, changeNameID);
@@ -253,7 +253,7 @@ define([
setInterval(function () {
if (!connected) { return; }
checkLag(webChannel, lagElement);
checkLag(getLag, lagElement);
}, 3000);
return {