From 69f8ac6b56058c0fa93a6d48c187fe935dc2e32b Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Thu, 20 Nov 2014 21:24:11 +0000 Subject: [PATCH] Change to URL based announce server addresses (fixes #943) --- internal/config/config.go | 18 +- internal/config/config_test.go | 4 +- internal/config/testdata/v7.xml | 12 ++ internal/discover/client.go | 59 ++++++ internal/discover/client_udp.go | 250 +++++++++++++++++++++++ internal/discover/discover.go | 345 ++++++++++---------------------- 6 files changed, 445 insertions(+), 243 deletions(-) create mode 100644 internal/config/testdata/v7.xml create mode 100644 internal/discover/client.go create mode 100644 internal/discover/client_udp.go diff --git a/internal/config/config.go b/internal/config/config.go index 21fe2da9..07679f09 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,7 +34,7 @@ import ( var l = logger.DefaultLogger -const CurrentVersion = 6 +const CurrentVersion = 7 type Configuration struct { Version int `xml:"version,attr"` @@ -160,7 +160,7 @@ type FolderDeviceConfiguration struct { type OptionsConfiguration struct { ListenAddress []string `xml:"listenAddress" default:"0.0.0.0:22000"` - GlobalAnnServers []string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22026"` + GlobalAnnServers []string `xml:"globalAnnounceServer" default:"udp4://announce.syncthing.net:22026"` GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"` LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"` LocalAnnPort int `xml:"localAnnouncePort" default:"21025"` @@ -308,6 +308,11 @@ func (cfg *Configuration) prepare(myID protocol.DeviceID) { convertV5V6(cfg) } + // Upgrade to v7 configuration if appropriate + if cfg.Version == 6 { + convertV6V7(cfg) + } + // Hash old cleartext passwords if len(cfg.GUI.Password) > 0 && cfg.GUI.Password[0] != '$' { hash, err := bcrypt.GenerateFromPassword([]byte(cfg.GUI.Password), 0) @@ -397,6 +402,15 @@ func ChangeRequiresRestart(from, to Configuration) bool { return false } +func convertV6V7(cfg *Configuration) { + // Migrate announce server addresses to the new URL based format + for i := range cfg.Options.GlobalAnnServers { + cfg.Options.GlobalAnnServers[i] = "udp4://" + cfg.Options.GlobalAnnServers[i] + } + + cfg.Version = 7 +} + func convertV5V6(cfg *Configuration) { // Added ".stfolder" file at folder roots to identify mount issues // Doesn't affect the config itself, but uses config migrations to identify diff --git a/internal/config/config_test.go b/internal/config/config_test.go index 5de36139..a7d3b6b9 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -36,7 +36,7 @@ func init() { func TestDefaultValues(t *testing.T) { expected := OptionsConfiguration{ ListenAddress: []string{"0.0.0.0:22000"}, - GlobalAnnServers: []string{"announce.syncthing.net:22026"}, + GlobalAnnServers: []string{"udp4://announce.syncthing.net:22026"}, GlobalAnnEnabled: true, LocalAnnEnabled: true, LocalAnnPort: 21025, @@ -139,7 +139,7 @@ func TestNoListenAddress(t *testing.T) { func TestOverriddenValues(t *testing.T) { expected := OptionsConfiguration{ ListenAddress: []string{":23000"}, - GlobalAnnServers: []string{"syncthing.nym.se:22026"}, + GlobalAnnServers: []string{"udp4://syncthing.nym.se:22026"}, GlobalAnnEnabled: false, LocalAnnEnabled: false, LocalAnnPort: 42123, diff --git a/internal/config/testdata/v7.xml b/internal/config/testdata/v7.xml new file mode 100644 index 00000000..ec5bc6c1 --- /dev/null +++ b/internal/config/testdata/v7.xml @@ -0,0 +1,12 @@ + + + + + + +
a
+
+ +
b
+
+
diff --git a/internal/discover/client.go b/internal/discover/client.go new file mode 100644 index 00000000..a506395f --- /dev/null +++ b/internal/discover/client.go @@ -0,0 +1,59 @@ +// Copyright (C) 2014 The Syncthing Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program. If not, see . + +package discover + +import ( + "fmt" + "net/url" + "time" + + "github.com/syncthing/syncthing/internal/protocol" +) + +type Factory func(*url.URL, *Announce) (Client, error) + +var ( + factories = make(map[string]Factory) + DefaultErrorRetryInternval = 60 * time.Second + DefaultGlobalBroadcastInterval = 1800 * time.Second +) + +func Register(proto string, factory Factory) { + factories[proto] = factory +} + +func New(addr string, pkt *Announce) (Client, error) { + uri, err := url.Parse(addr) + if err != nil { + return nil, err + } + factory, ok := factories[uri.Scheme] + if !ok { + return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme) + } + client, err := factory(uri, pkt) + if err != nil { + return nil, err + } + return client, nil +} + +type Client interface { + Lookup(device protocol.DeviceID) []string + StatusOK() bool + Address() string + Stop() +} diff --git a/internal/discover/client_udp.go b/internal/discover/client_udp.go new file mode 100644 index 00000000..79e5f698 --- /dev/null +++ b/internal/discover/client_udp.go @@ -0,0 +1,250 @@ +// Copyright (C) 2014 The Syncthing Authors. +// +// This program is free software: you can redistribute it and/or modify it +// under the terms of the GNU General Public License as published by the Free +// Software Foundation, either version 3 of the License, or (at your option) +// any later version. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for +// more details. +// +// You should have received a copy of the GNU General Public License along +// with this program. If not, see . + +package discover + +import ( + "encoding/hex" + "io" + "net" + "net/url" + "strconv" + "sync" + "time" + + "github.com/syncthing/syncthing/internal/protocol" +) + +func init() { + for _, proto := range []string{"udp", "udp4", "udp6"} { + Register(proto, func(uri *url.URL, pkt *Announce) (Client, error) { + c := &UDPClient{} + err := c.Start(uri, pkt) + if err != nil { + return nil, err + } + return c, nil + }) + } +} + +type UDPClient struct { + url *url.URL + + id protocol.DeviceID + + stop chan struct{} + wg sync.WaitGroup + listenAddress *net.UDPAddr + + globalBroadcastInterval time.Duration + errorRetryInterval time.Duration + + status bool + mut sync.RWMutex +} + +func (d *UDPClient) Start(uri *url.URL, pkt *Announce) error { + d.url = uri + d.id = protocol.DeviceIDFromBytes(pkt.This.ID) + d.stop = make(chan struct{}) + + params := uri.Query() + // The address must not have a port, as otherwise both announce and lookup + // sockets would try to bind to the same port. + addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0") + if err != nil { + return err + } + d.listenAddress = addr + + broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0) + if err != nil { + d.globalBroadcastInterval = DefaultGlobalBroadcastInterval + } else { + d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second + } + + retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0) + if err != nil { + d.errorRetryInterval = DefaultErrorRetryInternval + } else { + d.errorRetryInterval = time.Duration(retrySeconds) * time.Second + } + + d.wg.Add(1) + go d.broadcast(pkt.MustMarshalXDR()) + return nil +} + +func (d *UDPClient) broadcast(pkt []byte) { + defer d.wg.Done() + timer := time.NewTimer(0) + + conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress) + for err != nil { + timer.Reset(d.errorRetryInterval) + l.Warnf("Global UDP discovery (%s): %v; trying again in %v", d.url, err, d.errorRetryInterval) + select { + case <-d.stop: + return + case <-timer.C: + } + conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress) + } + defer conn.Close() + + remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host) + for err != nil { + timer.Reset(d.errorRetryInterval) + l.Warnf("Global UDP discovery (%s): %v; trying again in %v", d.url, err, d.errorRetryInterval) + select { + case <-d.stop: + return + case <-timer.C: + } + remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host) + } + + timer.Reset(0) + + for { + select { + case <-d.stop: + return + + case <-timer.C: + var ok bool + + if debug { + l.Debugf("Global UDP discovery (%s): send announcement -> %v\n%s", d.url, remote, hex.Dump(pkt)) + } + + _, err := conn.WriteTo(pkt, remote) + if err != nil { + if debug { + l.Debugf("discover %s: warning: %s", d.url, err) + } + ok = false + } else { + // Verify that the announce server responds positively for our device ID + + time.Sleep(1 * time.Second) + + res := d.Lookup(d.id) + if debug { + l.Debugf("discover %s: external lookup check: %v", d.url, res) + } + ok = len(res) > 0 + } + + d.mut.Lock() + d.status = ok + d.mut.Unlock() + + if ok { + timer.Reset(d.globalBroadcastInterval) + } else { + timer.Reset(d.errorRetryInterval) + } + } + } +} + +func (d *UDPClient) Lookup(device protocol.DeviceID) []string { + extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host) + if err != nil { + if debug { + l.Debugf("discover %s: %v; no external lookup", d.url, err) + } + return nil + } + + conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP) + if err != nil { + if debug { + l.Debugf("discover %s: %v; no external lookup", d.url, err) + } + return nil + } + defer conn.Close() + + err = conn.SetDeadline(time.Now().Add(5 * time.Second)) + if err != nil { + if debug { + l.Debugf("discover %s: %v; no external lookup", d.url, err) + } + return nil + } + + buf := Query{QueryMagic, device[:]}.MustMarshalXDR() + _, err = conn.Write(buf) + if err != nil { + if debug { + l.Debugf("discover %s: %v; no external lookup", d.url, err) + } + return nil + } + + buf = make([]byte, 2048) + n, err := conn.Read(buf) + if err != nil { + if err, ok := err.(net.Error); ok && err.Timeout() { + // Expected if the server doesn't know about requested device ID + return nil + } + if debug { + l.Debugf("discover %s: %v; no external lookup", d.url, err) + } + return nil + } + + if debug { + l.Debugf("discover %s: read external:\n%s", d.url, hex.Dump(buf[:n])) + } + + var pkt Announce + err = pkt.UnmarshalXDR(buf[:n]) + if err != nil && err != io.EOF { + if debug { + l.Debugln("discover %s:", d.url, err) + } + return nil + } + + var addrs []string + for _, a := range pkt.This.Addresses { + deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port))) + addrs = append(addrs, deviceAddr) + } + return addrs +} + +func (d *UDPClient) Stop() { + if d.stop != nil { + close(d.stop) + d.wg.Wait() + } +} + +func (d *UDPClient) StatusOK() bool { + d.mut.RLock() + defer d.mut.RUnlock() + return d.status +} + +func (d *UDPClient) Address() string { + return d.url.String() +} diff --git a/internal/discover/discover.go b/internal/discover/discover.go index 92fd7376..840ae960 100644 --- a/internal/discover/discover.go +++ b/internal/discover/discover.go @@ -31,25 +31,21 @@ import ( ) type Discoverer struct { - myID protocol.DeviceID - listenAddrs []string - localBcastIntv time.Duration - localBcastStart time.Time - globalBcastIntv time.Duration - errorRetryIntv time.Duration - cacheLifetime time.Duration - broadcastBeacon beacon.Interface - multicastBeacon beacon.Interface - registry map[protocol.DeviceID][]CacheEntry - registryLock sync.RWMutex - extServers []string - extPort uint16 - localBcastTick <-chan time.Time - stopGlobal chan struct{} - globalWG sync.WaitGroup - forcedBcastTick chan time.Time - extAnnounceOK map[string]bool - extAnnounceOKmut sync.Mutex + myID protocol.DeviceID + listenAddrs []string + localBcastIntv time.Duration + localBcastStart time.Time + cacheLifetime time.Duration + broadcastBeacon beacon.Interface + multicastBeacon beacon.Interface + registry map[protocol.DeviceID][]CacheEntry + registryLock sync.RWMutex + extPort uint16 + localBcastTick <-chan time.Time + forcedBcastTick chan time.Time + + clients []Client + mut sync.RWMutex } type CacheEntry struct { @@ -63,14 +59,11 @@ var ( func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer { return &Discoverer{ - myID: id, - listenAddrs: addresses, - localBcastIntv: 30 * time.Second, - globalBcastIntv: 1800 * time.Second, - errorRetryIntv: 60 * time.Second, - cacheLifetime: 5 * time.Minute, - registry: make(map[protocol.DeviceID][]CacheEntry), - extAnnounceOK: make(map[string]bool), + myID: id, + listenAddrs: addresses, + localBcastIntv: 30 * time.Second, + cacheLifetime: 5 * time.Minute, + registry: make(map[protocol.DeviceID][]CacheEntry), } } @@ -112,38 +105,60 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) { } func (d *Discoverer) StartGlobal(servers []string, extPort uint16) { - // Wait for any previous announcer to stop before starting a new one. - d.globalWG.Wait() - d.extServers = servers + d.mut.Lock() + defer d.mut.Unlock() + + if len(d.clients) > 0 { + d.stopGlobal() + } + d.extPort = extPort - d.stopGlobal = make(chan struct{}) - d.globalWG.Add(1) - go func() { - defer d.globalWG.Done() + pkt := d.announcementPkt() + wg := sync.WaitGroup{} + clients := make(chan Client, len(servers)) + for _, address := range servers { + wg.Add(1) + go func(addr string) { + defer wg.Done() + client, err := New(addr, pkt) + if err != nil { + l.Infoln("Error creating discovery client", addr, err) + return + } + clients <- client + }(address) + } - buf := d.announcementPkt() + wg.Wait() + close(clients) - for _, extServer := range d.extServers { - d.globalWG.Add(1) - go func(server string) { - d.sendExternalAnnouncements(server, buf) - d.globalWG.Done() - }(extServer) - } - }() -} - -func (d *Discoverer) StopGlobal() { - if d.stopGlobal != nil { - close(d.stopGlobal) - d.globalWG.Wait() + for client := range clients { + d.clients = append(d.clients, client) } } +func (d *Discoverer) StopGlobal() { + d.mut.Lock() + defer d.mut.Unlock() + d.stopGlobal() +} + +func (d *Discoverer) stopGlobal() { + for _, client := range d.clients { + client.Stop() + } + d.clients = []Client{} +} + func (d *Discoverer) ExtAnnounceOK() map[string]bool { - d.extAnnounceOKmut.Lock() - defer d.extAnnounceOKmut.Unlock() - return d.extAnnounceOK + d.mut.RLock() + defer d.mut.RUnlock() + + ret := make(map[string]bool) + for _, client := range d.clients { + ret[client.Address()] = client.StatusOK() + } + return ret } func (d *Discoverer) Lookup(device protocol.DeviceID) []string { @@ -151,22 +166,47 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string { cached := d.filterCached(d.registry[device]) d.registryLock.RUnlock() + d.mut.RLock() + defer d.mut.RUnlock() + + var addrs []string if len(cached) > 0 { - addrs := make([]string, len(cached)) + addrs = make([]string, len(cached)) for i := range cached { addrs[i] = cached[i].Address } - return addrs - } else if len(d.extServers) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv { + } else if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv { // Only perform external lookups if we have at least one external - // server and one local announcement interval has passed. This is to - // avoid finding local peers on their remote address at startup. - addrs := d.externalLookup(device) - cached = make([]CacheEntry, len(addrs)) - for i := range addrs { - cached[i] = CacheEntry{ - Address: addrs[i], - Seen: time.Now(), + // server client and one local announcement interval has passed. This is + // to avoid finding local peers on their remote address at startup. + results := make(chan []string, len(d.clients)) + wg := sync.WaitGroup{} + for _, client := range d.clients { + wg.Add(1) + go func(c Client) { + defer wg.Done() + results <- c.Lookup(device) + }(client) + } + + wg.Wait() + close(results) + + cached := []CacheEntry{} + seen := make(map[string]struct{}) + now := time.Now() + + for result := range results { + for _, addr := range result { + _, ok := seen[addr] + if !ok { + cached = append(cached, CacheEntry{ + Address: addr, + Seen: now, + }) + seen[addr] = struct{}{} + addrs = append(addrs, addr) + } } } @@ -174,7 +214,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) []string { d.registry[device] = cached d.registryLock.Unlock() } - return nil + return addrs } func (d *Discoverer) Hint(device string, addrs []string) { @@ -199,7 +239,7 @@ func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry { return devices } -func (d *Discoverer) announcementPkt() []byte { +func (d *Discoverer) announcementPkt() *Announce { var addrs []Address if d.extPort != 0 { addrs = []Address{{Port: d.extPort}} @@ -221,11 +261,10 @@ func (d *Discoverer) announcementPkt() []byte { } } } - var pkt = Announce{ + return &Announce{ Magic: AnnouncementMagic, This: Device{d.myID[:], addrs}, } - return pkt.MustMarshalXDR() } func (d *Discoverer) sendLocalAnnouncements() { @@ -252,80 +291,6 @@ func (d *Discoverer) sendLocalAnnouncements() { } } -func (d *Discoverer) sendExternalAnnouncements(extServer string, buf []byte) { - timer := time.NewTimer(0) - - conn, err := net.ListenUDP("udp", nil) - for err != nil { - timer.Reset(d.errorRetryIntv) - l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv) - select { - case <-d.stopGlobal: - return - case <-timer.C: - } - conn, err = net.ListenUDP("udp", nil) - } - - remote, err := net.ResolveUDPAddr("udp", extServer) - for err != nil { - timer.Reset(d.errorRetryIntv) - l.Warnf("Global discovery: %s: %v; trying again in %v", extServer, err, d.errorRetryIntv) - select { - case <-d.stopGlobal: - return - case <-timer.C: - } - remote, err = net.ResolveUDPAddr("udp", extServer) - } - - // Delay the first announcement until after a full local announcement - // cycle, to increase the chance of other peers finding us locally first. - timer.Reset(d.localBcastIntv) - - for { - select { - case <-d.stopGlobal: - return - - case <-timer.C: - var ok bool - - if debug { - l.Debugf("discover: send announcement -> %v\n%s", remote, hex.Dump(buf)) - } - - _, err := conn.WriteTo(buf, remote) - if err != nil { - if debug { - l.Debugln("discover: %s: warning:", extServer, err) - } - ok = false - } else { - // Verify that the announce server responds positively for our device ID - - time.Sleep(1 * time.Second) - res := d.externalLookupOnServer(extServer, d.myID) - - if debug { - l.Debugln("discover:", extServer, "external lookup check:", res) - } - ok = len(res) > 0 - } - - d.extAnnounceOKmut.Lock() - d.extAnnounceOK[extServer] = ok - d.extAnnounceOKmut.Unlock() - - if ok { - timer.Reset(d.globalBcastIntv) - } else { - timer.Reset(d.errorRetryIntv) - } - } - } -} - func (d *Discoverer) recvAnnouncements(b beacon.Interface) { for { buf, addr := b.Recv() @@ -406,104 +371,6 @@ func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool { return len(current) > len(orig) } -func (d *Discoverer) externalLookup(device protocol.DeviceID) []string { - // Buffer up to as many answers as we have servers to query. - results := make(chan []string, len(d.extServers)) - - // Query all servers. - wg := sync.WaitGroup{} - for _, extServer := range d.extServers { - wg.Add(1) - go func(server string) { - result := d.externalLookupOnServer(server, device) - if debug { - l.Debugln("discover:", result, "from", server, "for", device) - } - results <- result - wg.Done() - }(extServer) - } - - wg.Wait() - close(results) - - addrs := []string{} - for result := range results { - addrs = append(addrs, result...) - } - - return addrs -} - -func (d *Discoverer) externalLookupOnServer(extServer string, device protocol.DeviceID) []string { - extIP, err := net.ResolveUDPAddr("udp", extServer) - if err != nil { - if debug { - l.Debugf("discover: %s: %v; no external lookup", extServer, err) - } - return nil - } - - conn, err := net.DialUDP("udp", nil, extIP) - if err != nil { - if debug { - l.Debugf("discover: %s: %v; no external lookup", extServer, err) - } - return nil - } - defer conn.Close() - - err = conn.SetDeadline(time.Now().Add(5 * time.Second)) - if err != nil { - if debug { - l.Debugf("discover: %s: %v; no external lookup", extServer, err) - } - return nil - } - - buf := Query{QueryMagic, device[:]}.MustMarshalXDR() - _, err = conn.Write(buf) - if err != nil { - if debug { - l.Debugf("discover: %s: %v; no external lookup", extServer, err) - } - return nil - } - - buf = make([]byte, 2048) - n, err := conn.Read(buf) - if err != nil { - if err, ok := err.(net.Error); ok && err.Timeout() { - // Expected if the server doesn't know about requested device ID - return nil - } - if debug { - l.Debugf("discover: %s: %v; no external lookup", extServer, err) - } - return nil - } - - if debug { - l.Debugf("discover: %s: read external:\n%s", extServer, hex.Dump(buf[:n])) - } - - var pkt Announce - err = pkt.UnmarshalXDR(buf[:n]) - if err != nil && err != io.EOF { - if debug { - l.Debugln("discover:", extServer, err) - } - return nil - } - - var addrs []string - for _, a := range pkt.This.Addresses { - deviceAddr := net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port))) - addrs = append(addrs, deviceAddr) - } - return addrs -} - func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry { for i := 0; i < len(c); { if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {