Add separate client for dynamic relays (fixes #2368)

Did some manual tests in the playground, such as kicking off two clients in parallel, first connecting,
second one getting a message about already being connected, falling back to the second address.
This commit is contained in:
Audrius Butkevicius
2015-10-16 23:59:24 +01:00
parent 8c26fe44c3
commit 4f0680c3c8
5 changed files with 551 additions and 375 deletions

View File

@@ -8,15 +8,13 @@ package relay
import (
"crypto/tls"
"encoding/json"
"net/http"
"net/url"
"sort"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
@@ -34,7 +32,7 @@ type Svc struct {
tlsCfg *tls.Config
tokens map[string]suture.ServiceToken
clients map[string]*client.ProtocolClient
clients map[string]client.RelayClient
mut sync.RWMutex
invitations chan protocol.SessionInvitation
conns chan *tls.Conn
@@ -56,7 +54,7 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config) *Svc {
tlsCfg: tlsCfg,
tokens: make(map[string]suture.ServiceToken),
clients: make(map[string]*client.ProtocolClient),
clients: make(map[string]client.RelayClient),
mut: sync.NewRWMutex(),
invitations: make(chan protocol.SessionInvitation),
conns: conns,
@@ -106,61 +104,17 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
existing[uri.String()] = uri
}
// Query dynamic addresses, and pick the closest relay from the ones they provide.
for key, uri := range existing {
if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" {
continue
}
delete(existing, key)
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:]
l.Debugln("Looking up dynamic relays from", uri)
data, err := http.Get(uri.String())
if err != nil {
l.Debugln("Failed to lookup dynamic relays", err)
continue
}
var ann dynamicAnnouncement
err = json.NewDecoder(data.Body).Decode(&ann)
data.Body.Close()
if err != nil {
l.Debugln("Failed to lookup dynamic relays", err)
continue
}
var dynRelayAddrs []string
for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL)
if err != nil {
l.Debugln("Failed to parse dynamic relay address", relayAnn.URL, err)
continue
}
l.Debugln("Found", ruri, "via", uri)
dynRelayAddrs = append(dynRelayAddrs, ruri.String())
}
if len(dynRelayAddrs) > 0 {
dynRelayAddrs = relayAddressesSortedByLatency(dynRelayAddrs)
closestRelay := dynRelayAddrs[0]
l.Debugln("Picking", closestRelay, "as closest dynamic relay from", uri)
ruri, _ := url.Parse(closestRelay)
existing[closestRelay] = ruri
} else {
l.Debugln("No dynamic relay found on", uri)
}
}
s.mut.Lock()
for key, uri := range existing {
_, ok := s.tokens[key]
if !ok {
l.Debugln("Connecting to relay", uri)
c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
c, err := client.NewClient(uri, s.tlsCfg.Certificates, s.invitations)
if err != nil {
l.Debugln("Failed to connect to relay", uri, err)
continue
}
s.tokens[key] = s.Add(c)
s.clients[key] = c
}
@@ -197,8 +151,8 @@ func (s *Svc) Relays() []string {
s.mut.RLock()
relays := make([]string, 0, len(s.clients))
for uri := range s.clients {
relays = append(relays, uri)
for _, client := range s.clients {
relays = append(relays, client.URI().String())
}
s.mut.RUnlock()
@@ -216,14 +170,14 @@ func (s *Svc) RelayStatus(uri string) (time.Duration, bool) {
}
s.mut.RLock()
client, ok := s.clients[uri]
for _, client := range s.clients {
if client.URI().String() == uri {
return client.Latency(), client.StatusOK()
}
}
s.mut.RUnlock()
if !ok || !client.StatusOK() {
return time.Hour, false
}
return client.Latency(), true
return time.Hour, false
}
// Accept returns a new *tls.Conn. The connection is already handshaken.
@@ -277,7 +231,7 @@ func (r *invitationReceiver) Stop() {
// The eventBroadcaster sends a RelayStateChanged event when the relay status
// changes. We need this somewhat ugly polling mechanism as there's currently
// no way to get the event feed directly from the relay lib. This may be
// somethign to revisit later, possibly.
// something to revisit later, possibly.
type eventBroadcaster struct {
svc *Svc
stop chan struct{}
@@ -322,51 +276,3 @@ func (e *eventBroadcaster) Serve() {
func (e *eventBroadcaster) Stop() {
close(e.stop)
}
// This is the announcement recieved from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct {
Relays []struct {
URL string
}
}
// relayAddressesSortedByLatency adds local latency to the relay, and sorts them
// by sum latency, and returns the addresses.
func relayAddressesSortedByLatency(input []string) []string {
relays := make(relayList, len(input))
for i, relay := range input {
if latency, err := osutil.GetLatencyForURL(relay); err == nil {
relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)}
} else {
relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)}
}
}
sort.Sort(relays)
addresses := make([]string, len(relays))
for i, relay := range relays {
addresses[i] = relay.relay
}
return addresses
}
type relayWithLatency struct {
relay string
latency int
}
type relayList []relayWithLatency
func (l relayList) Len() int {
return len(l)
}
func (l relayList) Less(a, b int) bool {
return l[a].latency < l[b].latency
}
func (l relayList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}