Convert netflux to es5

This commit is contained in:
Yann Flory
2016-04-25 15:29:39 +02:00
parent c0b8aac792
commit 85d5f5c47f

View File

@@ -1,99 +1,124 @@
/*global: WebSocket */ /*global: WebSocket */
define(() => { define(function () {
'use strict'; 'use strict';
const MAX_LAG_BEFORE_PING = 15000;
const MAX_LAG_BEFORE_DISCONNECT = 30000;
const PING_CYCLE = 5000;
const REQUEST_TIMEOUT = 30000;
const now = () => new Date().getTime(); var MAX_LAG_BEFORE_PING = 15000;
var MAX_LAG_BEFORE_DISCONNECT = 30000;
var PING_CYCLE = 5000;
var REQUEST_TIMEOUT = 30000;
const networkSendTo = (ctx, peerId, content) => { var now = function now() {
const seq = ctx.seq++; return new Date().getTime();
};
var networkSendTo = function networkSendTo(ctx, peerId, content) {
var seq = ctx.seq++;
ctx.ws.send(JSON.stringify([seq, 'MSG', peerId, content])); ctx.ws.send(JSON.stringify([seq, 'MSG', peerId, content]));
return new Promise((res, rej) => { return new Promise(function (res, rej) {
ctx.requests[seq] = { reject: rej, resolve: res, time: now() }; ctx.requests[seq] = { reject: rej, resolve: res, time: now() };
}); });
}; };
const channelBcast = (ctx, chanId, content) => { var channelBcast = function channelBcast(ctx, chanId, content) {
const chan = ctx.channels[chanId]; var chan = ctx.channels[chanId];
if (!chan) { throw new Error("no such channel " + chanId); } if (!chan) {
const seq = ctx.seq++; throw new Error("no such channel " + chanId);
}
var seq = ctx.seq++;
ctx.ws.send(JSON.stringify([seq, 'MSG', chanId, content])); ctx.ws.send(JSON.stringify([seq, 'MSG', chanId, content]));
return new Promise((res, rej) => { return new Promise(function (res, rej) {
ctx.requests[seq] = { reject: rej, resolve: res, time: now() }; ctx.requests[seq] = { reject: rej, resolve: res, time: now() };
}); });
}; };
const channelLeave = (ctx, chanId, reason) => { var channelLeave = function channelLeave(ctx, chanId, reason) {
const chan = ctx.channels[chanId]; var chan = ctx.channels[chanId];
if (!chan) { throw new Error("no such channel " + chanId); } if (!chan) {
throw new Error("no such channel " + chanId);
}
delete ctx.channels[chanId]; delete ctx.channels[chanId];
ctx.ws.send(JSON.stringify([ctx.seq++, 'LEAVE', chanId, reason])); ctx.ws.send(JSON.stringify([ctx.seq++, 'LEAVE', chanId, reason]));
}; };
const makeEventHandlers = (ctx, mappings) => { var makeEventHandlers = function makeEventHandlers(ctx, mappings) {
return (name, handler) => { return function (name, handler) {
const handlers = mappings[name]; var handlers = mappings[name];
if (!handlers) { throw new Error("no such event " + name); } if (!handlers) {
throw new Error("no such event " + name);
}
handlers.push(handler); handlers.push(handler);
}; };
}; };
const mkChannel = (ctx, id) => { var mkChannel = function mkChannel(ctx, id) {
const internal = { var internal = {
onMessage: [], onMessage: [],
onJoin: [], onJoin: [],
onLeave: [], onLeave: [],
members: [], members: [],
jSeq: ctx.seq++ jSeq: ctx.seq++
}; };
const chan = { var chan = {
_: internal, _: internal,
time: now(), time: now(),
id: id, id: id,
members: internal.members, members: internal.members,
bcast: (msg) => channelBcast(ctx, chan.id, msg), bcast: function bcast(msg) {
leave: (reason) => channelLeave(ctx, chan.id, reason), return channelBcast(ctx, chan.id, msg);
on: makeEventHandlers(ctx, { message: },
internal.onMessage, join: internal.onJoin, leave: internal.onLeave }) leave: function leave(reason) {
return channelLeave(ctx, chan.id, reason);
},
on: makeEventHandlers(ctx, { message: internal.onMessage, join: internal.onJoin, leave: internal.onLeave })
}; };
ctx.requests[internal.jSeq] = chan; ctx.requests[internal.jSeq] = chan;
ctx.ws.send(JSON.stringify([internal.jSeq, 'JOIN', id])); ctx.ws.send(JSON.stringify([internal.jSeq, 'JOIN', id]));
return new Promise((res, rej) => { return new Promise(function (res, rej) {
chan._.resolve = res; chan._.resolve = res;
chan._.reject = rej; chan._.reject = rej;
}) });
}; };
const mkNetwork = (ctx) => { var mkNetwork = function mkNetwork(ctx) {
const network = { var network = {
webChannels: ctx.channels, webChannels: ctx.channels,
getLag: () => (ctx.lag), getLag: function getLag() {
sendto: (peerId, content) => (networkSendTo(ctx, peerId, content)), return ctx.lag;
join: (chanId) => (mkChannel(ctx, chanId)), },
sendto: function sendto(peerId, content) {
return networkSendTo(ctx, peerId, content);
},
join: function join(chanId) {
return mkChannel(ctx, chanId);
},
on: makeEventHandlers(ctx, { message: ctx.onMessage, disconnect: ctx.onDisconnect }) on: makeEventHandlers(ctx, { message: ctx.onMessage, disconnect: ctx.onDisconnect })
}; };
network.__defineGetter__("webChannels", () => { network.__defineGetter__("webChannels", function () {
return Object.keys(ctx.channels).map((k) => (ctx.channels[k])); return Object.keys(ctx.channels).map(function (k) {
return ctx.channels[k];
});
}); });
return network; return network;
}; };
const onMessage = (ctx, evt) => { var onMessage = function onMessage(ctx, evt) {
let msg; var msg = void 0;
try { msg = JSON.parse(evt.data); } catch (e) { console.log(e.stack); return; } try {
msg = JSON.parse(evt.data);
} catch (e) {
console.log(e.stack);return;
}
if (msg[0] !== 0) { if (msg[0] !== 0) {
const req = ctx.requests[msg[0]]; var req = ctx.requests[msg[0]];
if (!req) { if (!req) {
console.log("error: " + JSON.stringify(msg)); console.log("error: " + JSON.stringify(msg));
return; return;
} }
delete ctx.requests[msg[0]]; delete ctx.requests[msg[0]];
if (msg[1] === 'ACK') { if (msg[1] === 'ACK') {
if (req.ping) { // ACK of a PING if (req.ping) {
// ACK of a PING
ctx.lag = now() - Number(req.ping); ctx.lag = now() - Number(req.ping);
return; return;
} }
@@ -101,7 +126,9 @@ const onMessage = (ctx, evt) => {
} else if (msg[1] === 'JACK') { } else if (msg[1] === 'JACK') {
if (req._) { if (req._) {
// Channel join request... // Channel join request...
if (!msg[2]) { throw new Error("wrong type of ACK for channel join"); } if (!msg[2]) {
throw new Error("wrong type of ACK for channel join");
}
req.id = msg[2]; req.id = msg[2];
ctx.channels[req.id] = req; ctx.channels[req.id] = req;
return; return;
@@ -114,14 +141,17 @@ const onMessage = (ctx, evt) => {
} }
return; return;
} }
if (msg[2] === 'IDENT') { if (msg[2] === 'IDENT') {
ctx.uid = msg[3]; ctx.uid = msg[3];
setInterval(() => { setInterval(function () {
if (now() - ctx.timeOfLastMessage < MAX_LAG_BEFORE_PING) { return; } if (now() - ctx.timeOfLastMessage < MAX_LAG_BEFORE_PING) {
let seq = ctx.seq++; return;
let currentDate = now(); }
ctx.requests[seq] = {time: now(), ping: currentDate}; var seq = ctx.seq++;
var currentDate = now();
ctx.requests[seq] = { time: now(), ping: currentDate };
ctx.ws.send(JSON.stringify([seq, 'PING', currentDate])); ctx.ws.send(JSON.stringify([seq, 'PING', currentDate]));
if (now() - ctx.timeOfLastMessage > MAX_LAG_BEFORE_DISCONNECT) { if (now() - ctx.timeOfLastMessage > MAX_LAG_BEFORE_DISCONNECT) {
ctx.ws.close(); ctx.ws.close();
@@ -140,57 +170,69 @@ const onMessage = (ctx, evt) => {
} }
if (msg[2] === 'MSG') { if (msg[2] === 'MSG') {
let handlers; var handlers = void 0;
if (msg[3] === ctx.uid) { if (msg[3] === ctx.uid) {
handlers = ctx.onMessage; handlers = ctx.onMessage;
} else { } else {
const chan = ctx.channels[msg[3]]; var chan = ctx.channels[msg[3]];
if (!chan) { if (!chan) {
console.log("message to non-existant chan " + JSON.stringify(msg)); console.log("message to non-existant chan " + JSON.stringify(msg));
return; return;
} }
handlers = chan._.onMessage; handlers = chan._.onMessage;
} }
handlers.forEach((h) => { handlers.forEach(function (h) {
try { h(msg[4], msg[1]); } catch (e) { console.error(e); } try {
h(msg[4], msg[1]);
} catch (e) {
console.error(e);
}
}); });
} }
if (msg[2] === 'LEAVE') { if (msg[2] === 'LEAVE') {
const chan = ctx.channels[msg[3]]; var _chan = ctx.channels[msg[3]];
if (!chan) { if (!_chan) {
console.log("leaving non-existant chan " + JSON.stringify(msg)); console.log("leaving non-existant chan " + JSON.stringify(msg));
return; return;
} }
chan._.onLeave.forEach((h) => { _chan._.onLeave.forEach(function (h) {
try { h(msg[1], msg[4]); } catch (e) { console.log(e.stack); } try {
h(msg[1], msg[4]);
} catch (e) {
console.log(e.stack);
}
}); });
} }
if (msg[2] === 'JOIN') { if (msg[2] === 'JOIN') {
const chan = ctx.channels[msg[3]]; var _chan2 = ctx.channels[msg[3]];
if (!chan) { if (!_chan2) {
console.log("ERROR: join to non-existant chan " + JSON.stringify(msg)); console.log("ERROR: join to non-existant chan " + JSON.stringify(msg));
return; return;
} }
// have we yet fully joined the chan? // have we yet fully joined the chan?
const synced = (chan._.members.indexOf(ctx.uid) !== -1); var synced = _chan2._.members.indexOf(ctx.uid) !== -1;
chan._.members.push(msg[1]); _chan2._.members.push(msg[1]);
if (!synced && msg[1] === ctx.uid) { if (!synced && msg[1] === ctx.uid) {
// sync the channel join event // sync the channel join event
chan.myID = ctx.uid; _chan2.myID = ctx.uid;
chan._.resolve(chan); _chan2._.resolve(_chan2);
} }
if (synced) { if (synced) {
chan._.onJoin.forEach((h) => { _chan2._.onJoin.forEach(function (h) {
try { h(msg[1]); } catch (e) { console.log(e.stack); } try {
h(msg[1]);
} catch (e) {
console.log(e.stack);
}
}); });
} }
} }
}; };
const connect = (websocketURL) => { var connect = function connect(websocketURL) {
let ctx = { var ctx = {
ws: new WebSocket(websocketURL), ws: new WebSocket(websocketURL),
seq: 1, seq: 1,
lag: 0, lag: 0,
@@ -201,26 +243,49 @@ const connect = (websocketURL) => {
onDisconnect: [], onDisconnect: [],
requests: {} requests: {}
}; };
setInterval(() => { setInterval(function () {
for (let id in ctx.requests) { for (var id in ctx.requests) {
const req = ctx.requests[id]; var req = ctx.requests[id];
if (now() - req.time > REQUEST_TIMEOUT) { if (now() - req.time > REQUEST_TIMEOUT) {
delete ctx.requests[id]; delete ctx.requests[id];
if(typeof req.reject === "function") { req.reject({ type: 'TIMEOUT', message: 'waited ' + now() - req.time + 'ms' }); } if (typeof req.reject === "function") {
req.reject({ type: 'TIMEOUT', message: 'waited ' + (now() - req.time) + 'ms' });
}
} }
} }
}, 5000); }, 5000);
ctx.network = mkNetwork(ctx); ctx.network = mkNetwork(ctx);
ctx.ws.onmessage = (msg) => (onMessage(ctx, msg)); ctx.ws.onmessage = function (msg) {
ctx.ws.onclose = (evt) => { return onMessage(ctx, msg);
ctx.onDisconnect.forEach((h) => { };
try { h(evt.reason); } catch (e) { console.log(e.stack); } ctx.ws.onclose = function (evt) {
ctx.onDisconnect.forEach(function (h) {
try {
h(evt.reason);
} catch (e) {
console.log(e.stack);
}
}); });
}; };
return new Promise((resolve, reject) => { return new Promise(function (resolve, reject) {
ctx.ws.onopen = () => resolve(ctx.network); ctx.ws.onopen = function () {
var count = 0;
var interval = 100;
var checkIdent = function() {
if(ctx.uid !== null) {
return resolve(ctx.network);
}
else {
if(count * interval > REQUEST_TIMEOUT) {
return reject({ type: 'TIMEOUT', message: 'waited ' + (count * interval) + 'ms' });
}
setTimeout(checkIdent, 100);
}
}
checkIdent();
};
}); });
}; };
return { connect: connect }; return { connect: connect };
}); });