From 9047d56aa06245cc4201cc0640e8c8a5ec773212 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 20 Nov 2015 23:42:49 +0000 Subject: [PATCH 1/2] Add RelayFull message --- lib/relay/client/static.go | 7 ++++ lib/relay/protocol/packets.go | 2 ++ lib/relay/protocol/packets_xdr.go | 57 +++++++++++++++++++++++++++++++ lib/relay/protocol/protocol.go | 7 ++++ 4 files changed, 73 insertions(+) diff --git a/lib/relay/client/static.go b/lib/relay/client/static.go index 5cce7b25..19624e47 100644 --- a/lib/relay/client/static.go +++ b/lib/relay/client/static.go @@ -119,6 +119,10 @@ func (c *staticClient) Serve() { } c.invitations <- msg + case protocol.RelayFull: + l.Infoln("Disconnected from relay due to it becoming full.") + return + default: l.Infoln("Relay: protocol error: unexpected message %v", msg) return @@ -240,6 +244,9 @@ func (c *staticClient) join() error { return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) } + case protocol.RelayFull: + return fmt.Errorf("relay full") + default: return fmt.Errorf("protocol error: expecting response got %v", msg) } diff --git a/lib/relay/protocol/packets.go b/lib/relay/protocol/packets.go index cd6ff620..fca3598a 100644 --- a/lib/relay/protocol/packets.go +++ b/lib/relay/protocol/packets.go @@ -20,6 +20,7 @@ const ( messageTypeResponse messageTypeConnectRequest messageTypeSessionInvitation + messageTypeRelayFull ) type header struct { @@ -31,6 +32,7 @@ type header struct { type Ping struct{} type Pong struct{} type JoinRelayRequest struct{} +type RelayFull struct{} type JoinSessionRequest struct { Key []byte // max:32 diff --git a/lib/relay/protocol/packets_xdr.go b/lib/relay/protocol/packets_xdr.go index f18e18c1..51149da2 100644 --- a/lib/relay/protocol/packets_xdr.go +++ b/lib/relay/protocol/packets_xdr.go @@ -256,6 +256,63 @@ func (o *JoinRelayRequest) DecodeXDRFrom(xr *xdr.Reader) error { /* +RelayFull Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct RelayFull { +} + +*/ + +func (o RelayFull) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o RelayFull) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o RelayFull) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o RelayFull) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o RelayFull) EncodeXDRInto(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *RelayFull) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *RelayFull) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *RelayFull) DecodeXDRFrom(xr *xdr.Reader) error { + return xr.Error() +} + +/* + JoinSessionRequest Structure: 0 1 2 3 diff --git a/lib/relay/protocol/protocol.go b/lib/relay/protocol/protocol.go index 57a967ac..dad76d94 100644 --- a/lib/relay/protocol/protocol.go +++ b/lib/relay/protocol/protocol.go @@ -50,6 +50,9 @@ func WriteMessage(w io.Writer, message interface{}) error { case SessionInvitation: payload, err = msg.MarshalXDR() header.messageType = messageTypeSessionInvitation + case RelayFull: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeRelayFull default: err = fmt.Errorf("Unknown message type") } @@ -108,6 +111,10 @@ func ReadMessage(r io.Reader) (interface{}, error) { var msg SessionInvitation err := msg.DecodeXDR(r) return msg, err + case messageTypeRelayFull: + var msg RelayFull + err := msg.DecodeXDR(r) + return msg, err } return nil, fmt.Errorf("Unknown message type") From eeb5d99942b7f3e0d7d2bd55920779f0b61b8713 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 20 Nov 2015 23:58:29 +0000 Subject: [PATCH 2/2] Sort relays in 50ms latency increments, shuffle relays within the same increment --- lib/relay/client/dynamic.go | 67 +++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/lib/relay/client/dynamic.go b/lib/relay/client/dynamic.go index 34de8475..3e3dc4e4 100644 --- a/lib/relay/client/dynamic.go +++ b/lib/relay/client/dynamic.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "encoding/json" "fmt" + "math/rand" "net/http" "net/url" "sort" @@ -82,7 +83,7 @@ func (c *dynamicClient) Serve() { addrs = append(addrs, ruri.String()) } - for _, addr := range relayAddressesSortedByLatency(addrs) { + for _, addr := range relayAddressesOrder(addrs) { select { case <-c.stop: l.Debugln(c, "stopping") @@ -176,42 +177,44 @@ type dynamicAnnouncement struct { } } -// relayAddressesSortedByLatency adds local latency to the relay, and sorts them -// by sum latency, and returns the addresses. -func relayAddressesSortedByLatency(input []string) []string { - relays := make(relayList, len(input)) - for i, relay := range input { - if latency, err := osutil.GetLatencyForURL(relay); err == nil { - relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)} - } else { - relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)} +// relayAddressesOrder checks the latency to each relay, rounds latency down to +// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then +// shuffles each bucket, and returns all addresses starting with the ones from +// the lowest latency bucket, ending with the highest latency buceket. +func relayAddressesOrder(input []string) []string { + buckets := make(map[int][]string) + + for _, relay := range input { + latency, err := osutil.GetLatencyForURL(relay) + if err != nil { + latency = time.Hour } + + id := int(latency/time.Millisecond) / 50 + + buckets[id] = append(buckets[id], relay) } - sort.Sort(relays) - - addresses := make([]string, len(relays)) - for i, relay := range relays { - addresses[i] = relay.relay + var ids []int + for id, bucket := range buckets { + shuffle(bucket) + ids = append(ids, id) } + + sort.Ints(ids) + + addresses := make([]string, len(input)) + + for _, id := range ids { + addresses = append(addresses, buckets[id]...) + } + return addresses } -type relayWithLatency struct { - relay string - latency int -} - -type relayList []relayWithLatency - -func (l relayList) Len() int { - return len(l) -} - -func (l relayList) Less(a, b int) bool { - return l[a].latency < l[b].latency -} - -func (l relayList) Swap(a, b int) { - l[a], l[b] = l[b], l[a] +func shuffle(slice []string) { + for i := len(slice) - 1; i > 0; i-- { + j := rand.Intn(i + 1) + slice[i], slice[j] = slice[j], slice[i] + } }