New global discovery protocol over HTTPS (fixes #628, fixes #1907)

This commit is contained in:
Jakob Borg
2015-09-20 15:30:25 +02:00
parent a7169a6348
commit b0cd7be39b
41 changed files with 1937 additions and 1504 deletions

192
lib/discover/cache.go Normal file
View File

@@ -0,0 +1,192 @@
package discover
import (
stdsync "sync"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
)
// The CachingMux aggregates results from multiple Finders. Each Finder has
// an associated cache time and negative cache time. The cache time sets how
// long we cache and return successfull lookup results, the negative cache
// time sets how long we refrain from asking about the same device ID after
// receiving a negative answer. The value of zero disables caching (positive
// or negative).
type CachingMux struct {
*suture.Supervisor
finders []cachedFinder
caches []*cache
mut sync.Mutex
}
// A cachedFinder is a Finder with associated cache timeouts.
type cachedFinder struct {
Finder
cacheTime time.Duration
negCacheTime time.Duration
}
func NewCachingMux() *CachingMux {
return &CachingMux{
Supervisor: suture.NewSimple("discover.cachingMux"),
mut: sync.NewMutex(),
}
}
// Add registers a new Finder, with associated cache timeouts.
func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
m.mut.Lock()
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
m.caches = append(m.caches, newCache())
m.mut.Unlock()
if svc, ok := finder.(suture.Service); ok {
m.Supervisor.Add(svc)
}
}
// Lookup attempts to resolve the device ID using any of the added Finders,
// while obeying the cache settings.
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
m.mut.Lock()
for i, finder := range m.finders {
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
// We have a cache entry. Lets see what it says.
if cacheEntry.found && time.Since(cacheEntry.when) < finder.cacheTime {
// It's a positive, valid entry. Use it.
if debug {
l.Debugln("cached discovery entry for", deviceID, "at", finder.String())
l.Debugln(" ", cacheEntry)
}
direct = append(direct, cacheEntry.Direct...)
relays = append(relays, cacheEntry.Relays...)
continue
}
if !cacheEntry.found && time.Since(cacheEntry.when) < finder.negCacheTime {
// It's a negative, valid entry. We should not make another
// attempt right now.
if debug {
l.Debugln("negative cache entry for", deviceID, "at", finder.String())
}
continue
}
// It's expired. Ignore and continue.
}
// Perform the actual lookup and cache the result.
if td, tr, err := finder.Lookup(deviceID); err == nil {
if debug {
l.Debugln("lookup for", deviceID, "at", finder.String())
l.Debugln(" ", td)
l.Debugln(" ", tr)
}
direct = append(direct, td...)
relays = append(relays, tr...)
m.caches[i].Set(deviceID, CacheEntry{
Direct: td,
Relays: tr,
when: time.Now(),
found: len(td)+len(tr) > 0,
})
}
}
m.mut.Unlock()
if debug {
l.Debugln("lookup results for", deviceID)
l.Debugln(" ", direct)
l.Debugln(" ", relays)
}
return direct, relays, nil
}
func (m *CachingMux) String() string {
return "discovery cache"
}
func (m *CachingMux) Error() error {
return nil
}
func (m *CachingMux) ChildErrors() map[string]error {
m.mut.Lock()
children := make(map[string]error, len(m.finders))
for _, f := range m.finders {
children[f.String()] = f.Error()
}
m.mut.Unlock()
return children
}
func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
// Res will be the "total" cache, i.e. the union of our cache and all our
// children's caches.
res := make(map[protocol.DeviceID]CacheEntry)
m.mut.Lock()
for i := range m.finders {
// Each finder[i] has a corresponding cache at cache[i]. Go through it
// and populate the total, if it's newer than what's already in there.
// We skip any negative cache entries.
for k, v := range m.caches[i].Cache() {
if v.found && v.when.After(res[k].when) {
res[k] = v
}
}
// Then ask the finder itself for it's cache and do the same. If this
// finder is a global discovery client, it will have no cache. If it's
// a local discovery client, this will be it's current state.
for k, v := range m.finders[i].Cache() {
if v.found && v.when.After(res[k].when) {
res[k] = v
}
}
}
m.mut.Unlock()
return res
}
// A cache can be embedded wherever useful
type cache struct {
entries map[protocol.DeviceID]CacheEntry
mut stdsync.Mutex
}
func newCache() *cache {
return &cache{
entries: make(map[protocol.DeviceID]CacheEntry),
}
}
func (c *cache) Set(id protocol.DeviceID, ce CacheEntry) {
c.mut.Lock()
c.entries[id] = ce
c.mut.Unlock()
}
func (c *cache) Get(id protocol.DeviceID) (CacheEntry, bool) {
c.mut.Lock()
ce, ok := c.entries[id]
c.mut.Unlock()
return ce, ok
}
func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
c.mut.Lock()
m := make(map[protocol.DeviceID]CacheEntry, len(c.entries))
for k, v := range c.entries {
m[k] = v
}
c.mut.Unlock()
return m
}

View File

@@ -1,54 +0,0 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"fmt"
"net/url"
"time"
"github.com/syncthing/protocol"
)
type Announcer interface {
Announcement() Announce
}
type Factory func(*url.URL, Announcer) (Client, error)
var (
factories = make(map[string]Factory)
DefaultErrorRetryInternval = 60 * time.Second
DefaultGlobalBroadcastInterval = 1800 * time.Second
)
func Register(proto string, factory Factory) {
factories[proto] = factory
}
func New(addr string, announcer Announcer) (Client, error) {
uri, err := url.Parse(addr)
if err != nil {
return nil, err
}
factory, ok := factories[uri.Scheme]
if !ok {
return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
}
client, err := factory(uri, announcer)
if err != nil {
return nil, err
}
return client, nil
}
type Client interface {
Lookup(device protocol.DeviceID) (Announce, error)
StatusOK() bool
Address() string
Stop()
}

View File

@@ -1,239 +0,0 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"fmt"
"net"
"time"
"testing"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
var device protocol.DeviceID
func init() {
device, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
}
type FakeAnnouncer struct {
pkt Announce
}
func (f *FakeAnnouncer) Announcement() Announce {
return f.pkt
}
func TestUDP4Success(t *testing.T) {
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
t.Fatal(err)
}
port := conn.LocalAddr().(*net.UDPAddr).Port
address := fmt.Sprintf("udp4://127.0.0.1:%d", port)
pkt := Announce{
Magic: AnnouncementMagic,
This: Device{
device[:],
[]string{"tcp://123.123.123.123:1234"},
nil,
},
}
ann := &FakeAnnouncer{
pkt: pkt,
}
client, err := New(address, ann)
if err != nil {
t.Fatal(err)
}
udpclient := client.(*UDPClient)
if udpclient.errorRetryInterval != DefaultErrorRetryInternval {
t.Fatal("Incorrect retry interval")
}
if udpclient.listenAddress.IP != nil || udpclient.listenAddress.Port != 0 {
t.Fatal("Wrong listen IP or port", udpclient.listenAddress)
}
if client.Address() != address {
t.Fatal("Incorrect address")
}
buf := make([]byte, 2048)
// First announcement
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err != nil {
t.Fatal(err)
}
// Announcement verification
conn.SetDeadline(time.Now().Add(time.Millisecond * 1100))
_, addr, err := conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Reply to it.
_, err = conn.WriteToUDP(pkt.MustMarshalXDR(), addr)
if err != nil {
t.Fatal(err)
}
// We should get nothing else
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err == nil {
t.Fatal("Expected error")
}
// Status should be ok
if !client.StatusOK() {
t.Fatal("Wrong status")
}
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.NewWaitGroup()
wg.Add(1)
go func() {
pkt, err := client.Lookup(device)
if err == nil {
for _, addr := range pkt.This.Addresses {
addrs = append(addrs, addr)
}
}
wg.Done()
}()
// Receive the lookup and reply
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, addr, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
conn.WriteToUDP(pkt.MustMarshalXDR(), addr)
// Wait for the lookup to arrive, verify that the number of answers is correct
wg.Wait()
if len(addrs) != 1 || addrs[0] != "tcp://123.123.123.123:1234" {
t.Fatal("Wrong number of answers")
}
client.Stop()
}
func TestUDP4Failure(t *testing.T) {
conn, err := net.ListenUDP("udp4", nil)
if err != nil {
t.Fatal(err)
}
port := conn.LocalAddr().(*net.UDPAddr).Port
address := fmt.Sprintf("udp4://127.0.0.1:%d/?listenaddress=127.0.0.1&retry=5", port)
pkt := Announce{
Magic: AnnouncementMagic,
This: Device{
device[:],
[]string{"tcp://123.123.123.123:1234"},
nil,
},
}
ann := &FakeAnnouncer{
pkt: pkt,
}
client, err := New(address, ann)
if err != nil {
t.Fatal(err)
}
udpclient := client.(*UDPClient)
if udpclient.errorRetryInterval != time.Second*5 {
t.Fatal("Incorrect retry interval")
}
if !udpclient.listenAddress.IP.Equal(net.IPv4(127, 0, 0, 1)) || udpclient.listenAddress.Port != 0 {
t.Fatal("Wrong listen IP or port", udpclient.listenAddress)
}
if client.Address() != address {
t.Fatal("Incorrect address")
}
buf := make([]byte, 2048)
// First announcement
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err != nil {
t.Fatal(err)
}
// Announcement verification
conn.SetDeadline(time.Now().Add(time.Millisecond * 1100))
_, _, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Don't reply
// We should get nothing else
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, err = conn.Read(buf)
if err == nil {
t.Fatal("Expected error")
}
// Status should be failure
if client.StatusOK() {
t.Fatal("Wrong status")
}
// Do a lookup in a separate routine
addrs := []string{}
wg := sync.NewWaitGroup()
wg.Add(1)
go func() {
pkt, err := client.Lookup(device)
if err == nil {
for _, addr := range pkt.This.Addresses {
addrs = append(addrs, addr)
}
}
wg.Done()
}()
// Receive the lookup and don't reply
conn.SetDeadline(time.Now().Add(time.Millisecond * 100))
_, _, err = conn.ReadFromUDP(buf)
if err != nil {
t.Fatal(err)
}
// Wait for the lookup to timeout, verify that the number of answers is none
wg.Wait()
if len(addrs) != 0 {
t.Fatal("Wrong number of answers")
}
client.Stop()
}

View File

@@ -1,261 +0,0 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"encoding/hex"
"io"
"net"
"net/url"
"strconv"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/sync"
)
func init() {
for _, proto := range []string{"udp", "udp4", "udp6"} {
Register(proto, func(uri *url.URL, announcer Announcer) (Client, error) {
c := &UDPClient{
announcer: announcer,
wg: sync.NewWaitGroup(),
mut: sync.NewRWMutex(),
}
err := c.Start(uri)
if err != nil {
return nil, err
}
return c, nil
})
}
}
type UDPClient struct {
url *url.URL
stop chan struct{}
wg sync.WaitGroup
listenAddress *net.UDPAddr
globalBroadcastInterval time.Duration
errorRetryInterval time.Duration
announcer Announcer
status bool
mut sync.RWMutex
}
func (d *UDPClient) Start(uri *url.URL) error {
d.url = uri
d.stop = make(chan struct{})
params := uri.Query()
// The address must not have a port, as otherwise both announce and lookup
// sockets would try to bind to the same port.
addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
if err != nil {
return err
}
d.listenAddress = addr
broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
if err != nil {
d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
} else {
d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
}
retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
if err != nil {
d.errorRetryInterval = DefaultErrorRetryInternval
} else {
d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
}
d.wg.Add(1)
go d.broadcast()
return nil
}
func (d *UDPClient) broadcast() {
defer d.wg.Done()
conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
for err != nil {
if debug {
l.Debugf("discover %s: broadcast listen: %v; trying again in %v", d.url, err, d.errorRetryInterval)
}
select {
case <-d.stop:
return
case <-time.After(d.errorRetryInterval):
}
conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
}
defer conn.Close()
remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
for err != nil {
if debug {
l.Debugf("discover %s: broadcast resolve: %v; trying again in %v", d.url, err, d.errorRetryInterval)
}
select {
case <-d.stop:
return
case <-time.After(d.errorRetryInterval):
}
remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
}
timer := time.NewTimer(0)
eventSub := events.Default.Subscribe(events.ExternalPortMappingChanged)
defer events.Default.Unsubscribe(eventSub)
for {
select {
case <-d.stop:
return
case <-eventSub.C():
ok := d.sendAnnouncement(remote, conn)
d.mut.Lock()
d.status = ok
d.mut.Unlock()
case <-timer.C:
ok := d.sendAnnouncement(remote, conn)
d.mut.Lock()
d.status = ok
d.mut.Unlock()
if ok {
timer.Reset(d.globalBroadcastInterval)
} else {
timer.Reset(d.errorRetryInterval)
}
}
}
}
func (d *UDPClient) sendAnnouncement(remote net.Addr, conn *net.UDPConn) bool {
if debug {
l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
}
ann := d.announcer.Announcement()
pkt, err := ann.MarshalXDR()
if err != nil {
return false
}
myID := protocol.DeviceIDFromBytes(ann.This.ID)
_, err = conn.WriteTo(pkt, remote)
if err != nil {
if debug {
l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
}
return false
}
// Verify that the announce server responds positively for our device ID
time.Sleep(1 * time.Second)
ann, err = d.Lookup(myID)
if err != nil && debug {
l.Debugf("discover %s: broadcast: Self-lookup failed: %v", d.url, err)
} else if debug {
l.Debugf("discover %s: broadcast: Self-lookup returned: %v", d.url, ann.This.Addresses)
}
return len(ann.This.Addresses) > 0
}
func (d *UDPClient) Lookup(device protocol.DeviceID) (Announce, error) {
extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
_, err = conn.Write(buf)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
buf = make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
// Expected if the server doesn't know about requested device ID
return Announce{}, err
}
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
var pkt Announce
err = pkt.UnmarshalXDR(buf[:n])
if err != nil && err != io.EOF {
if debug {
l.Debugf("discover %s: Lookup(%s): %s\n%s", d.url, device, err, hex.Dump(buf[:n]))
}
return Announce{}, err
}
if debug {
l.Debugf("discover %s: Lookup(%s) result: %v relays: %v", d.url, device, pkt.This.Addresses, pkt.This.Relays)
}
return pkt, nil
}
func (d *UDPClient) Stop() {
if d.stop != nil {
close(d.stop)
d.wg.Wait()
}
}
func (d *UDPClient) StatusOK() bool {
d.mut.RLock()
defer d.mut.RUnlock()
return d.status
}
func (d *UDPClient) Address() string {
return d.url.String()
}

View File

@@ -1,4 +1,4 @@
// Copyright (C) 2014 The Syncthing Authors.
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
@@ -7,539 +7,48 @@
package discover
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"net/url"
"sort"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/beacon"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
)
type Discoverer struct {
myID protocol.DeviceID
listenAddrs []string
relayStatusProvider relayStatusProvider
localBcastIntv time.Duration
localBcastStart time.Time
cacheLifetime time.Duration
negCacheCutoff time.Duration
beacons []beacon.Interface
extAddr externalAddr
localBcastTick <-chan time.Time
forcedBcastTick chan time.Time
registryLock sync.RWMutex
addressRegistry map[protocol.DeviceID][]CacheEntry
relayRegistry map[protocol.DeviceID][]CacheEntry
lastLookup map[protocol.DeviceID]time.Time
clients []Client
mut sync.RWMutex
}
type relayStatusProvider interface {
ClientStatus() map[string]bool
}
type externalAddr interface {
ExternalAddresses() []string
// A Finder provides lookup services of some kind.
type Finder interface {
Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error)
Error() error
String() string
Cache() map[protocol.DeviceID]CacheEntry
}
type CacheEntry struct {
Address string
Seen time.Time
Direct []string `json:"direct"`
Relays []Relay `json:"relays"`
when time.Time // When did we get the result
found bool // Is it a success (cacheTime applies) or a failure (negCacheTime applies)?
}
var (
ErrIncorrectMagic = errors.New("incorrect magic number")
)
func NewDiscoverer(id protocol.DeviceID, addresses []string, relayStatusProvider relayStatusProvider) *Discoverer {
return &Discoverer{
myID: id,
listenAddrs: addresses,
relayStatusProvider: relayStatusProvider,
localBcastIntv: 30 * time.Second,
cacheLifetime: 5 * time.Minute,
negCacheCutoff: 3 * time.Minute,
addressRegistry: make(map[protocol.DeviceID][]CacheEntry),
relayRegistry: make(map[protocol.DeviceID][]CacheEntry),
lastLookup: make(map[protocol.DeviceID]time.Time),
registryLock: sync.NewRWMutex(),
mut: sync.NewRWMutex(),
}
// A FinderService is a Finder that has background activity and must be run as
// a suture.Service.
type FinderService interface {
Finder
suture.Service
}
func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
if localPort > 0 {
d.startLocalIPv4Broadcasts(localPort)
}
if len(localMCAddr) > 0 {
d.startLocalIPv6Multicasts(localMCAddr)
}
if len(d.beacons) == 0 {
l.Warnln("Local discovery unavailable")
return
}
d.localBcastTick = time.Tick(d.localBcastIntv)
d.forcedBcastTick = make(chan time.Time)
d.localBcastStart = time.Now()
go d.sendLocalAnnouncements()
type FinderMux interface {
Finder
ChildStatus() map[string]error
}
func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
bb := beacon.NewBroadcast(localPort)
d.beacons = append(d.beacons, bb)
go d.recvAnnouncements(bb)
bb.ServeBackground()
// The RelayStatusProvider answers questions about current relay status.
type RelayStatusProvider interface {
Relays() []string
RelayStatus(uri string) (time.Duration, bool)
}
func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
mb, err := beacon.NewMulticast(localMCAddr)
if err != nil {
if debug {
l.Debugln("beacon.NewMulticast:", err)
}
l.Infoln("Local discovery over IPv6 unavailable")
return
}
d.beacons = append(d.beacons, mb)
go d.recvAnnouncements(mb)
}
func (d *Discoverer) StartGlobal(servers []string, extAddr externalAddr) {
d.mut.Lock()
defer d.mut.Unlock()
if len(d.clients) > 0 {
d.stopGlobal()
}
d.extAddr = extAddr
wg := sync.NewWaitGroup()
clients := make(chan Client, len(servers))
for _, address := range servers {
wg.Add(1)
go func(addr string) {
defer wg.Done()
client, err := New(addr, d)
if err != nil {
l.Infoln("Error creating discovery client", addr, err)
return
}
clients <- client
}(address)
}
wg.Wait()
close(clients)
for client := range clients {
d.clients = append(d.clients, client)
}
}
func (d *Discoverer) StopGlobal() {
d.mut.Lock()
defer d.mut.Unlock()
d.stopGlobal()
}
func (d *Discoverer) stopGlobal() {
for _, client := range d.clients {
client.Stop()
}
d.clients = []Client{}
}
func (d *Discoverer) ExtAnnounceOK() map[string]bool {
d.mut.RLock()
defer d.mut.RUnlock()
ret := make(map[string]bool)
for _, client := range d.clients {
ret[client.Address()] = client.StatusOK()
}
return ret
}
// Lookup returns a list of addresses the device is available at, as well as
// a list of relays the device is supposed to be available on sorted by the
// sum of latencies between this device, and the device in question.
func (d *Discoverer) Lookup(device protocol.DeviceID) ([]string, []string) {
d.registryLock.RLock()
cachedAddresses := d.filterCached(d.addressRegistry[device])
cachedRelays := d.filterCached(d.relayRegistry[device])
lastLookup := d.lastLookup[device]
d.registryLock.RUnlock()
d.mut.RLock()
defer d.mut.RUnlock()
relays := make([]string, len(cachedRelays))
for i := range cachedRelays {
relays[i] = cachedRelays[i].Address
}
if len(cachedAddresses) > 0 {
// There are cached address entries.
addrs := make([]string, len(cachedAddresses))
for i := range cachedAddresses {
addrs[i] = cachedAddresses[i].Address
}
return addrs, relays
}
if time.Since(lastLookup) < d.negCacheCutoff {
// We have recently tried to lookup this address and failed. Lets
// chill for a while.
return nil, relays
}
if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
// Only perform external lookups if we have at least one external
// server client and one local announcement interval has passed. This is
// to avoid finding local peers on their remote address at startup.
results := make(chan Announce, len(d.clients))
wg := sync.NewWaitGroup()
for _, client := range d.clients {
wg.Add(1)
go func(c Client) {
defer wg.Done()
ann, err := c.Lookup(device)
if err == nil {
results <- ann
}
}(client)
}
wg.Wait()
close(results)
cachedAddresses := []CacheEntry{}
availableRelays := []Relay{}
seenAddresses := make(map[string]struct{})
seenRelays := make(map[string]struct{})
now := time.Now()
var addrs []string
for result := range results {
for _, addr := range result.This.Addresses {
_, ok := seenAddresses[addr]
if !ok {
cachedAddresses = append(cachedAddresses, CacheEntry{
Address: addr,
Seen: now,
})
seenAddresses[addr] = struct{}{}
addrs = append(addrs, addr)
}
}
for _, relay := range result.This.Relays {
_, ok := seenRelays[relay.Address]
if !ok {
availableRelays = append(availableRelays, relay)
seenRelays[relay.Address] = struct{}{}
}
}
}
relays = RelayAddressesSortedByLatency(availableRelays)
cachedRelays := make([]CacheEntry, len(relays))
for i := range relays {
cachedRelays[i] = CacheEntry{
Address: relays[i],
Seen: now,
}
}
d.registryLock.Lock()
d.addressRegistry[device] = cachedAddresses
d.relayRegistry[device] = cachedRelays
d.lastLookup[device] = time.Now()
d.registryLock.Unlock()
return addrs, relays
}
return nil, relays
}
func (d *Discoverer) Hint(device string, addrs []string) {
resAddrs := resolveAddrs(addrs)
var id protocol.DeviceID
id.UnmarshalText([]byte(device))
d.registerDevice(nil, Device{
Addresses: resAddrs,
ID: id[:],
})
}
func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
d.registryLock.RLock()
devices := make(map[protocol.DeviceID][]CacheEntry, len(d.addressRegistry))
for device, addrs := range d.addressRegistry {
addrsCopy := make([]CacheEntry, len(addrs))
copy(addrsCopy, addrs)
devices[device] = addrsCopy
}
d.registryLock.RUnlock()
return devices
}
func (d *Discoverer) Announcement() Announce {
return d.announcementPkt(true)
}
func (d *Discoverer) announcementPkt(allowExternal bool) Announce {
var addrs []string
if allowExternal && d.extAddr != nil {
addrs = d.extAddr.ExternalAddresses()
} else {
addrs = resolveAddrs(d.listenAddrs)
}
var relayAddrs []string
if d.relayStatusProvider != nil {
status := d.relayStatusProvider.ClientStatus()
for uri, ok := range status {
if ok {
relayAddrs = append(relayAddrs, uri)
}
}
}
return Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs, measureLatency(relayAddrs)},
}
}
func (d *Discoverer) sendLocalAnnouncements() {
var pkt = d.announcementPkt(false)
msg := pkt.MustMarshalXDR()
for {
for _, b := range d.beacons {
b.Send(msg)
}
select {
case <-d.localBcastTick:
case <-d.forcedBcastTick:
}
}
}
func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
for {
buf, addr := b.Recv()
var pkt Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
if debug {
l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
}
continue
}
if debug {
l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
}
var newDevice bool
if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
newDevice = d.registerDevice(addr, pkt.This)
}
if newDevice {
select {
case d.forcedBcastTick <- time.Now():
}
}
}
}
func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
var id protocol.DeviceID
copy(id[:], device.ID)
d.registryLock.Lock()
defer d.registryLock.Unlock()
current := d.filterCached(d.addressRegistry[id])
orig := current
for _, deviceAddr := range device.Addresses {
uri, err := url.Parse(deviceAddr)
if err != nil {
if debug {
l.Debugf("discover: Failed to parse address %s: %s", deviceAddr, err)
}
continue
}
host, port, err := net.SplitHostPort(uri.Host)
if err != nil {
if debug {
l.Debugf("discover: Failed to split address host %s: %s", deviceAddr, err)
}
continue
}
if host == "" {
uri.Host = net.JoinHostPort(addr.(*net.UDPAddr).IP.String(), port)
deviceAddr = uri.String()
}
for i := range current {
if current[i].Address == deviceAddr {
current[i].Seen = time.Now()
goto done
}
}
current = append(current, CacheEntry{
Address: deviceAddr,
Seen: time.Now(),
})
done:
}
if debug {
l.Debugf("discover: Caching %s addresses: %v", id, current)
}
d.addressRegistry[id] = current
if len(current) > len(orig) {
addrs := make([]string, len(current))
for i := range current {
addrs[i] = current[i].Address
}
events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
"device": id.String(),
"addrs": addrs,
})
}
return len(current) > len(orig)
}
func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
for i := 0; i < len(c); {
if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
if debug {
l.Debugf("discover: Removing cached entry %s - seen %v ago", c[i].Address, ago)
}
c[i] = c[len(c)-1]
c = c[:len(c)-1]
} else {
i++
}
}
return c
}
func addrToAddr(addr *net.TCPAddr) string {
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
return fmt.Sprintf(":%d", addr.Port)
} else if bs := addr.IP.To4(); bs != nil {
return fmt.Sprintf("%s:%d", bs.String(), addr.Port)
} else if bs := addr.IP.To16(); bs != nil {
return fmt.Sprintf("[%s]:%d", bs.String(), addr.Port)
}
return ""
}
func resolveAddrs(addrs []string) []string {
var raddrs []string
for _, addrStr := range addrs {
uri, err := url.Parse(addrStr)
if err != nil {
continue
}
addrRes, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
continue
}
addr := addrToAddr(addrRes)
if len(addr) > 0 {
uri.Host = addr
raddrs = append(raddrs, uri.String())
}
}
return raddrs
}
func measureLatency(relayAdresses []string) []Relay {
relays := make([]Relay, 0, len(relayAdresses))
for i, addr := range relayAdresses {
relay := Relay{
Address: addr,
Latency: int32(time.Hour / time.Millisecond),
}
relays = append(relays, relay)
if latency, err := osutil.GetLatencyForURL(addr); err == nil {
if debug {
l.Debugf("Relay %s latency %s", addr, latency)
}
relays[i].Latency = int32(latency / time.Millisecond)
} else {
l.Debugf("Failed to get relay %s latency %s", addr, err)
}
}
return relays
}
// RelayAddressesSortedByLatency adds local latency to the relay, and sorts them
// by sum latency, and returns the addresses.
func RelayAddressesSortedByLatency(input []Relay) []string {
relays := make([]Relay, len(input))
copy(relays, input)
for i, relay := range relays {
if latency, err := osutil.GetLatencyForURL(relay.Address); err == nil {
relays[i].Latency += int32(latency / time.Millisecond)
} else {
relays[i].Latency += int32(time.Hour / time.Millisecond)
}
}
sort.Sort(relayList(relays))
addresses := make([]string, 0, len(relays))
for _, relay := range relays {
addresses = append(addresses, relay.Address)
}
return addresses
}
type relayList []Relay
func (l relayList) Len() int {
return len(l)
}
func (l relayList) Less(a, b int) bool {
return l[a].Latency < l[b].Latency
}
func (l relayList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
// The AddressLister answers questions about what addresses we are listening
// on.
type AddressLister interface {
ExternalAddresses() []string
AllAddresses() []string
}

View File

@@ -1,163 +0,0 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"net/url"
"time"
"testing"
"github.com/syncthing/protocol"
)
type DummyClient struct {
url *url.URL
lookups []protocol.DeviceID
lookupRet Announce
stops int
statusRet bool
statusChecks int
}
func (c *DummyClient) Lookup(device protocol.DeviceID) (Announce, error) {
c.lookups = append(c.lookups, device)
return c.lookupRet, nil
}
func (c *DummyClient) StatusOK() bool {
c.statusChecks++
return c.statusRet
}
func (c *DummyClient) Stop() {
c.stops++
}
func (c *DummyClient) Address() string {
return c.url.String()
}
func TestGlobalDiscovery(t *testing.T) {
c1 := &DummyClient{
statusRet: false,
lookupRet: Announce{
Magic: AnnouncementMagic,
This: Device{
ID: protocol.LocalDeviceID[:],
Addresses: []string{"test.com:1234"},
Relays: nil,
},
Extra: nil,
},
}
c2 := &DummyClient{
statusRet: true,
lookupRet: Announce{
Magic: AnnouncementMagic,
This: Device{
ID: protocol.LocalDeviceID[:],
Addresses: nil,
Relays: nil,
},
Extra: nil,
},
}
c3 := &DummyClient{
statusRet: true,
lookupRet: Announce{
Magic: AnnouncementMagic,
This: Device{
ID: protocol.LocalDeviceID[:],
Addresses: []string{"best.com:2345"},
Relays: nil,
},
Extra: nil,
},
}
clients := []*DummyClient{c1, c2}
Register("test1", func(uri *url.URL, ann Announcer) (Client, error) {
c := clients[0]
clients = clients[1:]
c.url = uri
return c, nil
})
Register("test2", func(uri *url.URL, ann Announcer) (Client, error) {
c3.url = uri
return c3, nil
})
d := NewDiscoverer(device, []string{}, nil)
d.localBcastStart = time.Time{}
servers := []string{
"test1://123.123.123.123:1234",
"test1://23.23.23.23:234",
"test2://234.234.234.234.2345",
}
d.StartGlobal(servers, nil)
if len(d.clients) != 3 {
t.Fatal("Wrong number of clients")
}
status := d.ExtAnnounceOK()
for _, c := range []*DummyClient{c1, c2, c3} {
if status[c.url.String()] != c.statusRet || c.statusChecks != 1 {
t.Fatal("Wrong status")
}
}
addrs, _ := d.Lookup(device)
if len(addrs) != 2 {
t.Fatal("Wrong number of addresses", addrs)
}
for _, addr := range []string{"test.com:1234", "best.com:2345"} {
found := false
for _, laddr := range addrs {
if laddr == addr {
found = true
break
}
}
if !found {
t.Fatal("Couldn't find", addr)
}
}
for _, c := range []*DummyClient{c1, c2, c3} {
if len(c.lookups) != 1 || c.lookups[0] != device {
t.Fatal("Wrong lookups")
}
}
addrs, _ = d.Lookup(device)
if len(addrs) != 2 {
t.Fatal("Wrong number of addresses", addrs)
}
// Answer should be cached, so number of lookups should have not increased
for _, c := range []*DummyClient{c1, c2, c3} {
if len(c.lookups) != 1 || c.lookups[0] != device {
t.Fatal("Wrong lookups")
}
}
d.StopGlobal()
for _, c := range []*DummyClient{c1, c2, c3} {
if c.stops != 1 {
t.Fatal("Wrong number of stops")
}
}
}

View File

@@ -1,8 +1,75 @@
// Copyright (C) 2014 The Syncthing Authors.
// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// Package discover implements the device discovery protocol.
/*
Package discover implements the local and global device discovery protocols.
Global Discovery
================
Announcements
-------------
A device should announce itself at startup. It does this by an HTTPS POST to
the announce server URL (with the path usually being "/", but this is of
course up to the discovery server). The POST has a JSON payload listing direct
connection addresses (if any) and relay addresses (if any).
{
direct: ["tcp://192.0.2.45:22000", "tcp://:22202"],
relays: [{"url": "relay://192.0.2.99:22028", "latency": 142}]
}
It's OK for either of the "direct" or "relays" fields to be either the empty
list ([]), null, or missing entirely. An announcment with both fields missing
or empty is however not useful...
Any empty or unspecified IP addresses (i.e. addresses like tcp://:22000,
tcp://0.0.0.0:22000, tcp://[::]:22000) are interpreted as referring to the
source IP address of the announcement.
The device ID of the announcing device is not part of the announcement.
Instead, the server requires that the client perform certificate
authentication. The device ID is deduced from the presented certificate.
The server response is empty, with code 200 (OK) on success. If no certificate
was presented, status 403 (Forbidden) is returned. If the posted data doesn't
conform to the expected format, 400 (Bad Request) is returned.
In successfull responses, the server may return a "Reannounce-After" header
containing the number of seconds after which the client should perform a new
announcement.
In error responses, the server may return a "Retry-After" header containing
the number of seconds after which the client should retry.
Performing announcements significantly more often than indicated by the
Reannounce-After or Retry-After headers may result in the client being
throttled. In such cases the server may respond with status code 429 (Too Many
Requests).
Queries
=======
Queries are performed as HTTPS GET requests to the announce server URL. The
requested device ID is passed as the query parameter "device", in canonical
string form, i.e. https://announce.syncthing.net/?device=ABC12345-....
Successfull responses will have status code 200 (OK) and carry a JSON payload
of the same format as the announcement above. The response will not contain
empty or unspecified addresses.
If the "device" query parameter is missing or malformed, the status code 400
(Bad Request) is returned.
If the device ID is of a valid format but not found in the registry, 404 (Not
Found) is returned.
If the client has exceeded a rate limit, the server may respond with 429 (Too
Many Requests).
*/
package discover

385
lib/discover/global.go Normal file
View File

@@ -0,0 +1,385 @@
package discover
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"io"
"net/http"
"net/url"
"strconv"
stdsync "sync"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/events"
)
type globalClient struct {
server string
addrList AddressLister
relayStat RelayStatusProvider
announceClient httpClient
queryClient httpClient
noAnnounce bool
stop chan struct{}
errorHolder
}
type httpClient interface {
Get(url string) (*http.Response, error)
Post(url, ctype string, data io.Reader) (*http.Response, error)
}
const (
defaultReannounceInterval = 30 * time.Minute
announceErrorRetryInterval = 5 * time.Minute
)
type announcement struct {
Direct []string `json:"direct"`
Relays []Relay `json:"relays"`
}
type serverOptions struct {
insecure bool // don't check certificate
noAnnounce bool // don't announce
id string // expected server device ID
}
func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
server, opts, err := parseOptions(server)
if err != nil {
return nil, err
}
var devID protocol.DeviceID
if opts.id != "" {
devID, err = protocol.DeviceIDFromString(opts.id)
if err != nil {
return nil, err
}
}
// The http.Client used for announcements. It needs to have our
// certificate to prove our identity, and may or may not verify the server
// certificate depending on the insecure setting.
var announceClient httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.insecure,
Certificates: []tls.Certificate{cert},
},
},
}
if opts.id != "" {
announceClient = newIDCheckingHTTPClient(announceClient, devID)
}
// The http.Client used for queries. We don't need to present our
// certificate here, so lets not include it. May be insecure if requested.
var queryClient httpClient = &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.insecure,
},
},
}
if opts.id != "" {
queryClient = newIDCheckingHTTPClient(queryClient, devID)
}
cl := &globalClient{
server: server,
addrList: addrList,
relayStat: relayStat,
announceClient: announceClient,
queryClient: queryClient,
noAnnounce: opts.noAnnounce,
stop: make(chan struct{}),
}
cl.setError(errors.New("not announced"))
return cl, nil
}
// Lookup returns the list of addresses where the given device is available;
// direct, and via relays.
func (c *globalClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
qURL, err := url.Parse(c.server)
if err != nil {
return nil, nil, err
}
q := qURL.Query()
q.Set("device", device.String())
qURL.RawQuery = q.Encode()
resp, err := c.queryClient.Get(qURL.String())
if err != nil {
if debug {
l.Debugln("globalClient.Lookup", qURL.String(), err)
}
return nil, nil, err
}
if resp.StatusCode != 200 {
resp.Body.Close()
if debug {
l.Debugln("globalClient.Lookup", qURL.String(), resp.Status)
}
return nil, nil, errors.New(resp.Status)
}
// TODO: Handle 429 and Retry-After?
var ann announcement
err = json.NewDecoder(resp.Body).Decode(&ann)
resp.Body.Close()
return ann.Direct, ann.Relays, err
}
func (c *globalClient) String() string {
return "global@" + c.server
}
func (c *globalClient) Serve() {
if c.noAnnounce {
// We're configured to not do announcements, only lookups. To maintain
// the same interface, we just pause here if Serve() is run.
<-c.stop
return
}
timer := time.NewTimer(0)
defer timer.Stop()
eventSub := events.Default.Subscribe(events.ExternalPortMappingChanged | events.RelayStateChanged)
defer events.Default.Unsubscribe(eventSub)
for {
select {
case <-eventSub.C():
c.sendAnnouncement(timer)
case <-timer.C:
c.sendAnnouncement(timer)
case <-c.stop:
return
}
}
}
func (c *globalClient) sendAnnouncement(timer *time.Timer) {
var ann announcement
if c.addrList != nil {
ann.Direct = c.addrList.ExternalAddresses()
}
if c.relayStat != nil {
for _, relay := range c.relayStat.Relays() {
latency, ok := c.relayStat.RelayStatus(relay)
if ok {
ann.Relays = append(ann.Relays, Relay{
URL: relay,
Latency: int32(latency / time.Millisecond),
})
}
}
}
if len(ann.Direct)+len(ann.Relays) == 0 {
c.setError(errors.New("nothing to announce"))
if debug {
l.Debugln("Nothing to announce")
}
timer.Reset(announceErrorRetryInterval)
return
}
// The marshal doesn't fail, I promise.
postData, _ := json.Marshal(ann)
if debug {
l.Debugf("Announcement: %s", postData)
}
resp, err := c.announceClient.Post(c.server, "application/json", bytes.NewReader(postData))
if err != nil {
if debug {
l.Debugln("announce POST:", err)
}
c.setError(err)
timer.Reset(announceErrorRetryInterval)
return
}
if debug {
l.Debugln("announce POST:", resp.Status)
}
resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode > 299 {
if debug {
l.Debugln("announce POST:", resp.Status)
}
c.setError(errors.New(resp.Status))
if h := resp.Header.Get("Retry-After"); h != "" {
// The server has a recommendation on when we should
// retry. Follow it.
if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
if debug {
l.Debugln("announce Retry-After:", secs, err)
}
timer.Reset(time.Duration(secs) * time.Second)
return
}
}
timer.Reset(announceErrorRetryInterval)
return
}
c.setError(nil)
if h := resp.Header.Get("Reannounce-After"); h != "" {
// The server has a recommendation on when we should
// reannounce. Follow it.
if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
if debug {
l.Debugln("announce Reannounce-After:", secs, err)
}
timer.Reset(time.Duration(secs) * time.Second)
return
}
}
timer.Reset(defaultReannounceInterval)
}
func (c *globalClient) Stop() {
close(c.stop)
}
func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry {
// The globalClient doesn't do caching
return nil
}
// parseOptions parses and strips away any ?query=val options, setting the
// corresponding field in the serverOptions struct. Unknown query options are
// ignored and removed.
func parseOptions(dsn string) (server string, opts serverOptions, err error) {
p, err := url.Parse(dsn)
if err != nil {
return "", serverOptions{}, err
}
// Grab known options from the query string
q := p.Query()
opts.id = q.Get("id")
opts.insecure = opts.id != "" || queryBool(q, "insecure")
opts.noAnnounce = queryBool(q, "noannounce")
// Check for disallowed combinations
if p.Scheme == "http" {
if !opts.insecure {
return "", serverOptions{}, errors.New("http without insecure not supported")
}
if !opts.noAnnounce {
return "", serverOptions{}, errors.New("http without noannounce not supported")
}
} else if p.Scheme != "https" {
return "", serverOptions{}, errors.New("unsupported scheme " + p.Scheme)
}
// Remove the query string
p.RawQuery = ""
server = p.String()
return
}
// queryBool returns the query parameter parsed as a boolean. An empty value
// ("?foo") is considered true, as is any value string except false
// ("?foo=false").
func queryBool(q url.Values, key string) bool {
if _, ok := q[key]; !ok {
return false
}
return q.Get(key) != "false"
}
type idCheckingHTTPClient struct {
httpClient
id protocol.DeviceID
}
func newIDCheckingHTTPClient(client httpClient, id protocol.DeviceID) *idCheckingHTTPClient {
return &idCheckingHTTPClient{
httpClient: client,
id: id,
}
}
func (c *idCheckingHTTPClient) check(resp *http.Response) error {
if resp.TLS == nil {
return errors.New("security: not TLS")
}
if len(resp.TLS.PeerCertificates) == 0 {
return errors.New("security: no certificates")
}
id := protocol.NewDeviceID(resp.TLS.PeerCertificates[0].Raw)
if !id.Equals(c.id) {
return errors.New("security: incorrect device id")
}
return nil
}
func (c *idCheckingHTTPClient) Get(url string) (*http.Response, error) {
resp, err := c.httpClient.Get(url)
if err != nil {
return nil, err
}
if err := c.check(resp); err != nil {
return nil, err
}
return resp, nil
}
func (c *idCheckingHTTPClient) Post(url, ctype string, data io.Reader) (*http.Response, error) {
resp, err := c.httpClient.Post(url, ctype, data)
if err != nil {
return nil, err
}
if err := c.check(resp); err != nil {
return nil, err
}
return resp, nil
}
type errorHolder struct {
err error
mut stdsync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
}
func (e *errorHolder) setError(err error) {
e.mut.Lock()
e.err = err
e.mut.Unlock()
}
func (e *errorHolder) Error() error {
e.mut.Lock()
err := e.err
e.mut.Unlock()
return err
}

253
lib/discover/global_test.go Normal file
View File

@@ -0,0 +1,253 @@
package discover
import (
"crypto/tls"
"io/ioutil"
"net"
"net/http"
"strings"
"testing"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
)
func TestParseOptions(t *testing.T) {
testcases := []struct {
in string
out string
opts serverOptions
}{
{"https://example.com/", "https://example.com/", serverOptions{}},
{"https://example.com/?insecure", "https://example.com/", serverOptions{insecure: true}},
{"https://example.com/?insecure=true", "https://example.com/", serverOptions{insecure: true}},
{"https://example.com/?insecure=yes", "https://example.com/", serverOptions{insecure: true}},
{"https://example.com/?insecure=false&noannounce", "https://example.com/", serverOptions{noAnnounce: true}},
{"https://example.com/?id=abc", "https://example.com/", serverOptions{id: "abc", insecure: true}},
}
for _, tc := range testcases {
res, opts, err := parseOptions(tc.in)
if err != nil {
t.Errorf("Unexpected err %v for %v", err, tc.in)
continue
}
if res != tc.out {
t.Errorf("Incorrect server, %v!= %v for %v", res, tc.out, tc.in)
}
if opts != tc.opts {
t.Errorf("Incorrect options, %v!= %v for %v", opts, tc.opts, tc.in)
}
}
}
func TestGlobalOverHTTP(t *testing.T) {
// HTTP works for queries, but is obviously insecure and we can't do
// announces over it (as we don't present a certificate). As such, http://
// is only allowed in combination with the "insecure" and "noannounce"
// parameters.
if _, err := NewGlobal("http://192.0.2.42/", tls.Certificate{}, nil, nil); err == nil {
t.Fatal("http is not allowed without insecure and noannounce")
}
if _, err := NewGlobal("http://192.0.2.42/?insecure", tls.Certificate{}, nil, nil); err == nil {
t.Fatal("http is not allowed without noannounce")
}
if _, err := NewGlobal("http://192.0.2.42/?noannounce", tls.Certificate{}, nil, nil); err == nil {
t.Fatal("http is not allowed without insecure")
}
// Now lets check that lookups work over HTTP, given the correct options.
list, err := net.Listen("tcp4", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer list.Close()
s := new(fakeDiscoveryServer)
mux := http.NewServeMux()
mux.HandleFunc("/", s.handler)
go http.Serve(list, mux)
direct, relays, err := testLookup("http://" + list.Addr().String() + "?insecure&noannounce")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(direct) != 1 || direct[0] != "tcp://192.0.2.42::22000" {
t.Errorf("incorrect direct list: %+v", direct)
}
if len(relays) != 1 || relays[0] != (Relay{URL: "relay://192.0.2.43:443", Latency: 42}) {
t.Errorf("incorrect relays list: %+v", direct)
}
}
func TestGlobalOverHTTPS(t *testing.T) {
dir, err := ioutil.TempDir("", "syncthing")
if err != nil {
t.Fatal(err)
}
// Generate a server certificate, using fewer bits than usual to hurry the
// process along a bit.
cert, err := tlsutil.NewCertificate(dir+"/cert.pem", dir+"/key.pem", "syncthing", 1024)
if err != nil {
t.Fatal(err)
}
list, err := tls.Listen("tcp4", "127.0.0.1:0", &tls.Config{Certificates: []tls.Certificate{cert}})
if err != nil {
t.Fatal(err)
}
defer list.Close()
s := new(fakeDiscoveryServer)
mux := http.NewServeMux()
mux.HandleFunc("/", s.handler)
go http.Serve(list, mux)
// With default options the lookup code expects the server certificate to
// check out according to the usual CA chains etc. That won't be the case
// here so we expect the lookup to fail.
url := "https://" + list.Addr().String()
if _, _, err := testLookup(url); err == nil {
t.Fatalf("unexpected nil error when we should have got a certificate error")
}
// With "insecure" set, whatever certificate is on the other side should
// be accepted.
url = "https://" + list.Addr().String() + "?insecure"
if direct, relays, err := testLookup(url); err != nil {
t.Fatalf("unexpected error: %v", err)
} else {
if len(direct) != 1 || direct[0] != "tcp://192.0.2.42::22000" {
t.Errorf("incorrect direct list: %+v", direct)
}
if len(relays) != 1 || relays[0] != (Relay{URL: "relay://192.0.2.43:443", Latency: 42}) {
t.Errorf("incorrect relays list: %+v", direct)
}
}
// With "id" set to something incorrect, the checks should fail again.
url = "https://" + list.Addr().String() + "?id=" + protocol.LocalDeviceID.String()
if _, _, err := testLookup(url); err == nil {
t.Fatalf("unexpected nil error for incorrect discovery server ID")
}
// With the correct device ID, the check should pass and we should get a
// lookup response.
id := protocol.NewDeviceID(cert.Certificate[0])
url = "https://" + list.Addr().String() + "?id=" + id.String()
if direct, relays, err := testLookup(url); err != nil {
t.Fatalf("unexpected error: %v", err)
} else {
if len(direct) != 1 || direct[0] != "tcp://192.0.2.42::22000" {
t.Errorf("incorrect direct list: %+v", direct)
}
if len(relays) != 1 || relays[0] != (Relay{URL: "relay://192.0.2.43:443", Latency: 42}) {
t.Errorf("incorrect relays list: %+v", direct)
}
}
}
func TestGlobalAnnounce(t *testing.T) {
dir, err := ioutil.TempDir("", "syncthing")
if err != nil {
t.Fatal(err)
}
// Generate a server certificate, using fewer bits than usual to hurry the
// process along a bit.
cert, err := tlsutil.NewCertificate(dir+"/cert.pem", dir+"/key.pem", "syncthing", 1024)
if err != nil {
t.Fatal(err)
}
list, err := tls.Listen("tcp4", "127.0.0.1:0", &tls.Config{Certificates: []tls.Certificate{cert}})
if err != nil {
t.Fatal(err)
}
defer list.Close()
s := new(fakeDiscoveryServer)
mux := http.NewServeMux()
mux.HandleFunc("/", s.handler)
go http.Serve(list, mux)
url := "https://" + list.Addr().String() + "?insecure"
disco, err := NewGlobal(url, cert, new(fakeAddressLister), new(fakeRelayStatus))
if err != nil {
t.Fatal(err)
}
go disco.Serve()
defer disco.Stop()
// The discovery thing should attempt an announcement immediately. We wait
// for it to succeed, a while.
t0 := time.Now()
for err := disco.Error(); err != nil; err = disco.Error() {
if time.Since(t0) > 10*time.Second {
t.Fatal("announce failed:", err)
}
time.Sleep(100 * time.Millisecond)
}
if !strings.Contains(string(s.announce), "tcp://0.0.0.0:22000") {
t.Errorf("announce missing direct address: %s", s.announce)
}
if !strings.Contains(string(s.announce), "relay://192.0.2.42:443") {
t.Errorf("announce missing relay address: %s", s.announce)
}
}
func testLookup(url string) ([]string, []Relay, error) {
disco, err := NewGlobal(url, tls.Certificate{}, nil, nil)
if err != nil {
return nil, nil, err
}
go disco.Serve()
defer disco.Stop()
return disco.Lookup(protocol.LocalDeviceID)
}
type fakeDiscoveryServer struct {
announce []byte
}
func (s *fakeDiscoveryServer) handler(w http.ResponseWriter, r *http.Request) {
if r.Method == "POST" {
s.announce, _ = ioutil.ReadAll(r.Body)
w.WriteHeader(204)
} else {
w.Header().Set("Content-Type", "application/json")
w.Write([]byte(`{"direct":["tcp://192.0.2.42::22000"], "relays":[{"url": "relay://192.0.2.43:443", "latency": 42}]}`))
}
}
type fakeAddressLister struct{}
func (f *fakeAddressLister) ExternalAddresses() []string {
return []string{"tcp://0.0.0.0:22000"}
}
func (f *fakeAddressLister) AllAddresses() []string {
return []string{"tcp://0.0.0.0:22000", "tcp://192.168.0.1:22000"}
}
type fakeRelayStatus struct{}
func (f *fakeRelayStatus) Relays() []string {
return []string{"relay://192.0.2.42:443"}
}
func (f *fakeRelayStatus) RelayStatus(uri string) (time.Duration, bool) {
return 42 * time.Millisecond, true
}

270
lib/discover/local.go Normal file
View File

@@ -0,0 +1,270 @@
// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"bytes"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"net/url"
"strconv"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/beacon"
"github.com/syncthing/syncthing/lib/events"
"github.com/thejerf/suture"
)
type localClient struct {
*suture.Supervisor
myID protocol.DeviceID
addrList AddressLister
relayStat RelayStatusProvider
name string
beacon beacon.Interface
localBcastStart time.Time
localBcastTick <-chan time.Time
forcedBcastTick chan time.Time
*cache
}
const (
BroadcastInterval = 30 * time.Second
CacheLifeTime = 3 * BroadcastInterval
)
var (
ErrIncorrectMagic = errors.New("incorrect magic number")
)
func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, relayStat RelayStatusProvider) (FinderService, error) {
c := &localClient{
Supervisor: suture.NewSimple("local"),
myID: id,
addrList: addrList,
relayStat: relayStat,
localBcastTick: time.Tick(BroadcastInterval),
forcedBcastTick: make(chan time.Time),
localBcastStart: time.Now(),
cache: newCache(),
}
host, port, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
if len(host) == 0 {
// A broadcast client
c.name = "IPv4 local"
bcPort, err := strconv.Atoi(port)
if err != nil {
return nil, err
}
c.startLocalIPv4Broadcasts(bcPort)
} else {
// A multicast client
c.name = "IPv6 local"
c.startLocalIPv6Multicasts(addr)
}
go c.sendLocalAnnouncements()
return c, nil
}
func (c *localClient) startLocalIPv4Broadcasts(localPort int) {
c.beacon = beacon.NewBroadcast(localPort)
c.Add(c.beacon)
go c.recvAnnouncements(c.beacon)
}
func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) {
c.beacon = beacon.NewMulticast(localMCAddr)
c.Add(c.beacon)
go c.recvAnnouncements(c.beacon)
}
// Lookup returns a list of addresses the device is available at. Local
// discovery never returns relays.
func (c *localClient) Lookup(device protocol.DeviceID) (direct []string, relays []Relay, err error) {
if cache, ok := c.Get(device); ok {
if time.Since(cache.when) < CacheLifeTime {
direct = cache.Direct
relays = cache.Relays
}
}
return
}
func (c *localClient) String() string {
return c.name
}
func (c *localClient) Error() error {
return c.beacon.Error()
}
func (c *localClient) announcementPkt() Announce {
addrs := c.addrList.AllAddresses()
var relays []Relay
for _, relay := range c.relayStat.Relays() {
latency, ok := c.relayStat.RelayStatus(relay)
if ok {
relays = append(relays, Relay{
URL: relay,
Latency: int32(latency / time.Millisecond),
})
}
}
return Announce{
Magic: AnnouncementMagic,
This: Device{
ID: c.myID[:],
Addresses: addrs,
Relays: relays,
},
}
}
func (c *localClient) sendLocalAnnouncements() {
var pkt = c.announcementPkt()
msg := pkt.MustMarshalXDR()
for {
c.beacon.Send(msg)
select {
case <-c.localBcastTick:
case <-c.forcedBcastTick:
}
}
}
func (c *localClient) recvAnnouncements(b beacon.Interface) {
for {
buf, addr := b.Recv()
var pkt Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
if debug {
l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
}
continue
}
if debug {
l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
}
var newDevice bool
if bytes.Compare(pkt.This.ID, c.myID[:]) != 0 {
newDevice = c.registerDevice(addr, pkt.This)
}
if newDevice {
select {
case c.forcedBcastTick <- time.Now():
}
}
}
}
func (c *localClient) registerDevice(src net.Addr, device Device) bool {
var id protocol.DeviceID
copy(id[:], device.ID)
// Remember whether we already had a valid cache entry for this device.
ce, existsAlready := c.Get(id)
isNewDevice := !existsAlready || time.Since(ce.when) > CacheLifeTime
// Any empty or unspecified addresses should be set to the source address
// of the announcement. We also skip any addresses we can't parse.
var validAddresses []string
for _, addr := range device.Addresses {
u, err := url.Parse(addr)
if err != nil {
continue
}
tcpAddr, err := net.ResolveTCPAddr("tcp", u.Host)
if err != nil {
continue
}
if len(tcpAddr.IP) == 0 || tcpAddr.IP.IsUnspecified() {
host, _, err := net.SplitHostPort(src.String())
if err != nil {
continue
}
u.Host = fmt.Sprintf("%s:%d", host, tcpAddr.Port)
validAddresses = append(validAddresses, u.String())
} else {
validAddresses = append(validAddresses, addr)
}
}
c.Set(id, CacheEntry{
Direct: validAddresses,
Relays: device.Relays,
when: time.Now(),
found: true,
})
if isNewDevice {
events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
"device": id.String(),
"addrs": device.Addresses,
"relays": device.Relays,
})
}
return isNewDevice
}
func addrToAddr(addr *net.TCPAddr) string {
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
return fmt.Sprintf(":%c", addr.Port)
} else if bs := addr.IP.To4(); bs != nil {
return fmt.Sprintf("%s:%c", bs.String(), addr.Port)
} else if bs := addr.IP.To16(); bs != nil {
return fmt.Sprintf("[%s]:%c", bs.String(), addr.Port)
}
return ""
}
func resolveAddrs(addrs []string) []string {
var raddrs []string
for _, addrStr := range addrs {
uri, err := url.Parse(addrStr)
if err != nil {
continue
}
addrRes, err := net.ResolveTCPAddr("tcp", uri.Host)
if err != nil {
continue
}
addr := addrToAddr(addrRes)
if len(addr) > 0 {
uri.Host = addr
raddrs = append(raddrs, uri.String())
}
}
return raddrs
}

View File

@@ -5,7 +5,7 @@
// You can obtain one at http://mozilla.org/MPL/2.0/.
//go:generate -command genxdr go run ../../Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate genxdr -o packets_xdr.go packets.go
//go:generate genxdr -o localpackets_xdr.go localpackets.go
package discover
@@ -26,8 +26,8 @@ type Announce struct {
}
type Relay struct {
Address string // max:256
Latency int32
URL string `json:"url"` // max:2083
Latency int32 `json:"latency"`
}
type Device struct {

View File

@@ -192,10 +192,10 @@ Relay Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Address |
| Length of URL |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Address (variable length) \
\ URL (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Latency |
@@ -203,7 +203,7 @@ Relay Structure:
struct Relay {
string Address<256>;
string URL<256>;
int Latency;
}
@@ -234,10 +234,10 @@ func (o Relay) AppendXDR(bs []byte) ([]byte, error) {
}
func (o Relay) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.Address); l > 256 {
return xw.Tot(), xdr.ElementSizeExceeded("Address", l, 256)
if l := len(o.URL); l > 256 {
return xw.Tot(), xdr.ElementSizeExceeded("URL", l, 256)
}
xw.WriteString(o.Address)
xw.WriteString(o.URL)
xw.WriteUint32(uint32(o.Latency))
return xw.Tot(), xw.Error()
}
@@ -254,7 +254,7 @@ func (o *Relay) UnmarshalXDR(bs []byte) error {
}
func (o *Relay) DecodeXDRFrom(xr *xdr.Reader) error {
o.Address = xr.ReadStringMax(256)
o.URL = xr.ReadStringMax(256)
o.Latency = int32(xr.ReadUint32())
return xr.Error()
}