Discovery clients now take an announcer, global discovery is delayed

This commit is contained in:
Audrius Butkevicius 2015-07-23 20:01:25 +01:00
parent 8f2db99c86
commit 687fbb0a7e
7 changed files with 62 additions and 32 deletions

View File

@ -250,11 +250,8 @@ func (s *connectionSvc) connect() {
if addr == "dynamic" { if addr == "dynamic" {
if discoverer != nil { if discoverer != nil {
t, r := discoverer.Lookup(deviceID) t, r := discoverer.Lookup(deviceID)
relays = append(relays, r...)
if len(t) == 0 {
continue
}
addrs = append(addrs, t...) addrs = append(addrs, t...)
relays = append(relays, r...)
} }
} else { } else {
addrs = append(addrs, addr) addrs = append(addrs, addr)

View File

@ -920,8 +920,14 @@ func discovery(extPort int, relaySvc *relay.Svc) *discover.Discoverer {
} }
if opts.GlobalAnnEnabled { if opts.GlobalAnnEnabled {
go func() {
// Defer starting global announce server, giving time to connect
// to relay servers.
time.Sleep(5 * time.Second)
l.Infoln("Starting global discovery announcements") l.Infoln("Starting global discovery announcements")
disc.StartGlobal(opts.GlobalAnnServers, uint16(extPort)) disc.StartGlobal(opts.GlobalAnnServers, uint16(extPort))
}()
} }
return disc return disc

View File

@ -14,7 +14,11 @@ import (
"github.com/syncthing/protocol" "github.com/syncthing/protocol"
) )
type Factory func(*url.URL, *Announce) (Client, error) type Announcer interface {
Announcement() Announce
}
type Factory func(*url.URL, Announcer) (Client, error)
var ( var (
factories = make(map[string]Factory) factories = make(map[string]Factory)
@ -26,7 +30,7 @@ func Register(proto string, factory Factory) {
factories[proto] = factory factories[proto] = factory
} }
func New(addr string, pkt *Announce) (Client, error) { func New(addr string, announcer Announcer) (Client, error) {
uri, err := url.Parse(addr) uri, err := url.Parse(addr)
if err != nil { if err != nil {
return nil, err return nil, err
@ -35,7 +39,7 @@ func New(addr string, pkt *Announce) (Client, error) {
if !ok { if !ok {
return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme) return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
} }
client, err := factory(uri, pkt) client, err := factory(uri, announcer)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -24,6 +24,14 @@ func init() {
device, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2") device, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
} }
type FakeAnnouncer struct {
pkt Announce
}
func (f *FakeAnnouncer) Announcement() Announce {
return f.pkt
}
func TestUDP4Success(t *testing.T) { func TestUDP4Success(t *testing.T) {
conn, err := net.ListenUDP("udp4", nil) conn, err := net.ListenUDP("udp4", nil)
if err != nil { if err != nil {
@ -33,7 +41,7 @@ func TestUDP4Success(t *testing.T) {
port := conn.LocalAddr().(*net.UDPAddr).Port port := conn.LocalAddr().(*net.UDPAddr).Port
address := fmt.Sprintf("udp4://127.0.0.1:%d", port) address := fmt.Sprintf("udp4://127.0.0.1:%d", port)
pkt := &Announce{ pkt := Announce{
Magic: AnnouncementMagic, Magic: AnnouncementMagic,
This: Device{ This: Device{
device[:], device[:],
@ -41,8 +49,11 @@ func TestUDP4Success(t *testing.T) {
nil, nil,
}, },
} }
ann := &FakeAnnouncer{
pkt: pkt,
}
client, err := New(address, pkt) client, err := New(address, ann)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -137,7 +148,7 @@ func TestUDP4Failure(t *testing.T) {
address := fmt.Sprintf("udp4://127.0.0.1:%d/?listenaddress=127.0.0.1&retry=5", port) address := fmt.Sprintf("udp4://127.0.0.1:%d/?listenaddress=127.0.0.1&retry=5", port)
pkt := &Announce{ pkt := Announce{
Magic: AnnouncementMagic, Magic: AnnouncementMagic,
This: Device{ This: Device{
device[:], device[:],
@ -145,8 +156,11 @@ func TestUDP4Failure(t *testing.T) {
nil, nil,
}, },
} }
ann := &FakeAnnouncer{
pkt: pkt,
}
client, err := New(address, pkt) client, err := New(address, ann)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -20,12 +20,13 @@ import (
func init() { func init() {
for _, proto := range []string{"udp", "udp4", "udp6"} { for _, proto := range []string{"udp", "udp4", "udp6"} {
Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) { Register(proto, func(uri *url.URL, announcer Announcer) (Client, error) {
c := &UDPClient{ c := &UDPClient{
announcer: announcer,
wg: sync.NewWaitGroup(), wg: sync.NewWaitGroup(),
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
} }
err := c.Start(uri, pkt) err := c.Start(uri)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -37,22 +38,20 @@ func init() {
type UDPClient struct { type UDPClient struct {
url *url.URL url *url.URL
id protocol.DeviceID
stop chan struct{} stop chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
listenAddress *net.UDPAddr listenAddress *net.UDPAddr
globalBroadcastInterval time.Duration globalBroadcastInterval time.Duration
errorRetryInterval time.Duration errorRetryInterval time.Duration
announcer Announcer
status bool status bool
mut sync.RWMutex mut sync.RWMutex
} }
func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error { func (d *UDPClient) Start(uri *url.URL) error {
d.url = uri d.url = uri
d.id = protocol.DeviceIDFromBytes(pkt.This.ID)
d.stop = make(chan struct{}) d.stop = make(chan struct{})
params := uri.Query() params := uri.Query()
@ -79,11 +78,11 @@ func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error {
} }
d.wg.Add(1) d.wg.Add(1)
go d.broadcast(pkt.MustMarshalXDR()) go d.broadcast()
return nil return nil
} }
func (d *UDPClient) broadcast(pkt []byte) { func (d *UDPClient) broadcast() {
defer d.wg.Done() defer d.wg.Done()
conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress) conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
@ -126,7 +125,14 @@ func (d *UDPClient) broadcast(pkt []byte) {
l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote) l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
} }
_, err := conn.WriteTo(pkt, remote) ann := d.announcer.Announcement()
pkt, err := ann.MarshalXDR()
if err != nil {
timer.Reset(d.errorRetryInterval)
continue
}
_, err = conn.WriteTo(pkt, remote)
if err != nil { if err != nil {
if debug { if debug {
l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err) l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
@ -137,7 +143,7 @@ func (d *UDPClient) broadcast(pkt []byte) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
pkt, err := d.Lookup(d.id) pkt, err := d.Lookup(protocol.DeviceIDFromBytes(ann.This.ID))
if err != nil && debug { if err != nil && debug {
l.Debugf("discover %s: broadcast: Self-lookup failed: %v", d.url, err) l.Debugf("discover %s: broadcast: Self-lookup failed: %v", d.url, err)
} else if debug { } else if debug {

View File

@ -144,14 +144,13 @@ func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
} }
d.extPort = extPort d.extPort = extPort
pkt := d.announcementPkt(true)
wg := sync.NewWaitGroup() wg := sync.NewWaitGroup()
clients := make(chan Client, len(servers)) clients := make(chan Client, len(servers))
for _, address := range servers { for _, address := range servers {
wg.Add(1) wg.Add(1)
go func(addr string) { go func(addr string) {
defer wg.Done() defer wg.Done()
client, err := New(addr, pkt) client, err := New(addr, d)
if err != nil { if err != nil {
l.Infoln("Error creating discovery client", addr, err) l.Infoln("Error creating discovery client", addr, err)
return return
@ -318,7 +317,11 @@ func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
return devices return devices
} }
func (d *Discoverer) announcementPkt(allowExternal bool) *Announce { func (d *Discoverer) Announcement() Announce {
return d.announcementPkt(true)
}
func (d *Discoverer) announcementPkt(allowExternal bool) Announce {
var addrs []string var addrs []string
if d.extPort != 0 && allowExternal { if d.extPort != 0 && allowExternal {
addrs = []string{fmt.Sprintf("tcp://:%d", d.extPort)} addrs = []string{fmt.Sprintf("tcp://:%d", d.extPort)}
@ -326,7 +329,7 @@ func (d *Discoverer) announcementPkt(allowExternal bool) *Announce {
addrs = resolveAddrs(d.listenAddrs) addrs = resolveAddrs(d.listenAddrs)
} }
relayAddrs := make([]string, 0) var relayAddrs []string
if d.relaySvc != nil { if d.relaySvc != nil {
status := d.relaySvc.ClientStatus() status := d.relaySvc.ClientStatus()
for uri, ok := range status { for uri, ok := range status {
@ -336,7 +339,7 @@ func (d *Discoverer) announcementPkt(allowExternal bool) *Announce {
} }
} }
return &Announce{ return Announce{
Magic: AnnouncementMagic, Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs, measureLatency(relayAddrs)}, This: Device{d.myID[:], addrs, measureLatency(relayAddrs)},
} }

View File

@ -84,14 +84,14 @@ func TestGlobalDiscovery(t *testing.T) {
clients := []*DummyClient{c1, c2} clients := []*DummyClient{c1, c2}
Register("test1", func(uri *url.URL, pkt *Announce) (Client, error) { Register("test1", func(uri *url.URL, ann Announcer) (Client, error) {
c := clients[0] c := clients[0]
clients = clients[1:] clients = clients[1:]
c.url = uri c.url = uri
return c, nil return c, nil
}) })
Register("test2", func(uri *url.URL, pkt *Announce) (Client, error) { Register("test2", func(uri *url.URL, ann Announcer) (Client, error) {
c3.url = uri c3.url = uri
return c3, nil return c3, nil
}) })