diff --git a/lib/discover/discover.go b/lib/discover/discover.go index f979c398..084049bc 100644 --- a/lib/discover/discover.go +++ b/lib/discover/discover.go @@ -21,22 +21,21 @@ import ( "github.com/syncthing/syncthing/lib/beacon" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/osutil" - "github.com/syncthing/syncthing/lib/relay" "github.com/syncthing/syncthing/lib/sync" ) type Discoverer struct { - myID protocol.DeviceID - listenAddrs []string - relaySvc *relay.Svc - localBcastIntv time.Duration - localBcastStart time.Time - cacheLifetime time.Duration - negCacheCutoff time.Duration - beacons []beacon.Interface - extPort uint16 - localBcastTick <-chan time.Time - forcedBcastTick chan time.Time + myID protocol.DeviceID + listenAddrs []string + relayStatusProvider relayStatusProvider + localBcastIntv time.Duration + localBcastStart time.Time + cacheLifetime time.Duration + negCacheCutoff time.Duration + beacons []beacon.Interface + extPort uint16 + localBcastTick <-chan time.Time + forcedBcastTick chan time.Time registryLock sync.RWMutex addressRegistry map[protocol.DeviceID][]CacheEntry @@ -47,6 +46,10 @@ type Discoverer struct { mut sync.RWMutex } +type relayStatusProvider interface { + ClientStatus() map[string]bool +} + type CacheEntry struct { Address string Seen time.Time @@ -56,19 +59,19 @@ var ( ErrIncorrectMagic = errors.New("incorrect magic number") ) -func NewDiscoverer(id protocol.DeviceID, addresses []string, relaySvc *relay.Svc) *Discoverer { +func NewDiscoverer(id protocol.DeviceID, addresses []string, relayStatusProvider relayStatusProvider) *Discoverer { return &Discoverer{ - myID: id, - listenAddrs: addresses, - relaySvc: relaySvc, - localBcastIntv: 30 * time.Second, - cacheLifetime: 5 * time.Minute, - negCacheCutoff: 3 * time.Minute, - addressRegistry: make(map[protocol.DeviceID][]CacheEntry), - relayRegistry: make(map[protocol.DeviceID][]CacheEntry), - lastLookup: make(map[protocol.DeviceID]time.Time), - registryLock: sync.NewRWMutex(), - mut: sync.NewRWMutex(), + myID: id, + listenAddrs: addresses, + relayStatusProvider: relayStatusProvider, + localBcastIntv: 30 * time.Second, + cacheLifetime: 5 * time.Minute, + negCacheCutoff: 3 * time.Minute, + addressRegistry: make(map[protocol.DeviceID][]CacheEntry), + relayRegistry: make(map[protocol.DeviceID][]CacheEntry), + lastLookup: make(map[protocol.DeviceID]time.Time), + registryLock: sync.NewRWMutex(), + mut: sync.NewRWMutex(), } } @@ -251,7 +254,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) ([]string, []string) { } } - relays = addressesSortedByLatency(availableRelays) + relays = RelayAddressesSortedByLatency(availableRelays) cachedRelays := make([]CacheEntry, len(relays)) for i := range relays { cachedRelays[i] = CacheEntry{ @@ -307,8 +310,8 @@ func (d *Discoverer) announcementPkt(allowExternal bool) Announce { } var relayAddrs []string - if d.relaySvc != nil { - status := d.relaySvc.ClientStatus() + if d.relayStatusProvider != nil { + status := d.relayStatusProvider.ClientStatus() for uri, ok := range status { if ok { relayAddrs = append(relayAddrs, uri) @@ -489,7 +492,7 @@ func measureLatency(relayAdresses []string) []Relay { } relays = append(relays, relay) - if latency, err := getLatencyForURL(addr); err == nil { + if latency, err := osutil.GetLatencyForURL(addr); err == nil { if debug { l.Debugf("Relay %s latency %s", addr, latency) } @@ -501,13 +504,13 @@ func measureLatency(relayAdresses []string) []Relay { return relays } -// addressesSortedByLatency adds local latency to the relay, and sorts them +// RelayAddressesSortedByLatency adds local latency to the relay, and sorts them // by sum latency, and returns the addresses. -func addressesSortedByLatency(input []Relay) []string { +func RelayAddressesSortedByLatency(input []Relay) []string { relays := make([]Relay, len(input)) copy(relays, input) for i, relay := range relays { - if latency, err := getLatencyForURL(relay.Address); err == nil { + if latency, err := osutil.GetLatencyForURL(relay.Address); err == nil { relays[i].Latency += int32(latency / time.Millisecond) } else { relays[i].Latency += int32(time.Hour / time.Millisecond) @@ -523,15 +526,6 @@ func addressesSortedByLatency(input []Relay) []string { return addresses } -func getLatencyForURL(addr string) (time.Duration, error) { - uri, err := url.Parse(addr) - if err != nil { - return 0, err - } - - return osutil.TCPPing(uri.Host) -} - type relayList []Relay func (l relayList) Len() int { diff --git a/lib/osutil/ping.go b/lib/osutil/ping.go index d02bf2d6..743da5f7 100644 --- a/lib/osutil/ping.go +++ b/lib/osutil/ping.go @@ -8,6 +8,7 @@ package osutil import ( "net" + "net/url" "time" ) @@ -25,3 +26,14 @@ func TCPPing(address string) (time.Duration, error) { } return time.Since(start), err } + +// GetLatencyForURL parses the given URL, tries opening a TCP connection to it +// and returns the time it took to establish the connection. +func GetLatencyForURL(addr string) (time.Duration, error) { + uri, err := url.Parse(addr) + if err != nil { + return 0, err + } + + return TCPPing(uri.Host) +} diff --git a/lib/relay/relay.go b/lib/relay/relay.go index 7b7029c3..044d59d2 100644 --- a/lib/relay/relay.go +++ b/lib/relay/relay.go @@ -17,6 +17,7 @@ import ( "github.com/syncthing/relaysrv/client" "github.com/syncthing/relaysrv/protocol" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/model" "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/sync" @@ -29,7 +30,7 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.Intermed Supervisor: suture.New("Svc", suture.Spec{ Log: func(log string) { if debug { - l.Infoln(log) + l.Debugln(log) } }, FailureBackoff: 5 * time.Minute, @@ -97,15 +98,20 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool { existing[uri.String()] = uri } - // Expand dynamic addresses into a set of relays + // Query dynamic addresses, and pick the closest relay from the ones they provide. for key, uri := range existing { if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" { continue } delete(existing, key) + // Trim off the `dynamic+` prefix uri.Scheme = uri.Scheme[8:] + if debug { + l.Debugln("Looking up dynamic relays from", uri) + } + data, err := http.Get(uri.String()) if err != nil { if debug { @@ -124,6 +130,7 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool { continue } + dynRelays := make([]discover.Relay, 0, len(ann.Relays)) for _, relayAnn := range ann.Relays { ruri, err := url.Parse(relayAnn.URL) if err != nil { @@ -135,7 +142,21 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool { if debug { l.Debugln("Found", ruri, "via", uri) } - existing[ruri.String()] = ruri + dynRelays = append(dynRelays, discover.Relay{ + Address: ruri.String(), + }) + } + + dynRelayAddrs := discover.RelayAddressesSortedByLatency(dynRelays) + if len(dynRelayAddrs) > 0 { + closestRelay := dynRelayAddrs[0] + if debug { + l.Debugln("Picking", closestRelay, "as closest dynamic relay from", uri) + } + ruri, _ := url.Parse(closestRelay) + existing[closestRelay] = ruri + } else if debug { + l.Debugln("No dynamic relay found on", uri) } }