diff --git a/lib/beacon/beacon.go b/lib/beacon/beacon.go index b7436e17..b1296b21 100644 --- a/lib/beacon/beacon.go +++ b/lib/beacon/beacon.go @@ -7,9 +7,13 @@ package beacon import ( + "fmt" "net" + "time" "github.com/thejerf/suture" + + "github.com/syncthing/syncthing/lib/util" ) type recv struct { @@ -19,7 +23,93 @@ type recv struct { type Interface interface { suture.Service + fmt.Stringer Send(data []byte) Recv() ([]byte, net.Addr) Error() error } + +type cast struct { + *suture.Supervisor + name string + reader util.ServiceWithError + writer util.ServiceWithError + outbox chan recv + inbox chan []byte + stopped chan struct{} +} + +// newCast creates a base object for multi- or broadcasting. Afterwards the +// caller needs to set reader and writer with the addReader and addWriter +// methods to get a functional implementation of Interface. +func newCast(name string) *cast { + return &cast{ + Supervisor: suture.New(name, suture.Spec{ + // Don't retry too frenetically: an error to open a socket or + // whatever is usually something that is either permanent or takes + // a while to get solved... + FailureThreshold: 2, + FailureBackoff: 60 * time.Second, + // Only log restarts in debug mode. + Log: func(line string) { + l.Debugln(line) + }, + PassThroughPanics: true, + }), + name: name, + inbox: make(chan []byte), + outbox: make(chan recv, 16), + stopped: make(chan struct{}), + } +} + +func (c *cast) addReader(svc func(chan struct{}) error) { + c.reader = c.createService(svc, "reader") + c.Add(c.reader) +} + +func (c *cast) addWriter(svc func(stop chan struct{}) error) { + c.writer = c.createService(svc, "writer") + c.Add(c.writer) +} + +func (c *cast) createService(svc func(chan struct{}) error, suffix string) util.ServiceWithError { + return util.AsServiceWithError(func(stop chan struct{}) error { + l.Debugln("Starting", c.name, suffix) + err := svc(stop) + l.Debugf("Stopped %v %v: %v", c.name, suffix, err) + return err + }) +} + +func (c *cast) Stop() { + c.Supervisor.Stop() + close(c.stopped) +} + +func (c *cast) String() string { + return fmt.Sprintf("%s@%p", c.name, c) +} + +func (c *cast) Send(data []byte) { + select { + case c.inbox <- data: + case <-c.stopped: + } +} + +func (c *cast) Recv() ([]byte, net.Addr) { + select { + case recv := <-c.outbox: + return recv.data, recv.src + case <-c.stopped: + } + return nil, nil +} + +func (c *cast) Error() error { + if err := c.reader.Error(); err != nil { + return err + } + return c.writer.Error() +} diff --git a/lib/beacon/broadcast.go b/lib/beacon/broadcast.go index c5580f31..969fd63b 100644 --- a/lib/beacon/broadcast.go +++ b/lib/beacon/broadcast.go @@ -7,85 +7,22 @@ package beacon import ( - "fmt" "net" "time" - - "github.com/thejerf/suture" - - "github.com/syncthing/syncthing/lib/util" ) -type Broadcast struct { - *suture.Supervisor - port int - inbox chan []byte - outbox chan recv - br *broadcastReader - bw *broadcastWriter +func NewBroadcast(port int) Interface { + c := newCast("broadcastBeacon") + c.addReader(func(stop chan struct{}) error { + return readBroadcasts(c.outbox, port, stop) + }) + c.addWriter(func(stop chan struct{}) error { + return writeBroadcasts(c.inbox, port, stop) + }) + return c } -func NewBroadcast(port int) *Broadcast { - b := &Broadcast{ - Supervisor: suture.New("broadcastBeacon", suture.Spec{ - // Don't retry too frenetically: an error to open a socket or - // whatever is usually something that is either permanent or takes - // a while to get solved... - FailureThreshold: 2, - FailureBackoff: 60 * time.Second, - // Only log restarts in debug mode. - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }), - port: port, - inbox: make(chan []byte), - outbox: make(chan recv, 16), - } - - b.br = &broadcastReader{ - port: port, - outbox: b.outbox, - } - b.br.ServiceWithError = util.AsServiceWithError(b.br.serve) - b.Add(b.br) - b.bw = &broadcastWriter{ - port: port, - inbox: b.inbox, - } - b.bw.ServiceWithError = util.AsServiceWithError(b.bw.serve) - b.Add(b.bw) - - return b -} - -func (b *Broadcast) Send(data []byte) { - b.inbox <- data -} - -func (b *Broadcast) Recv() ([]byte, net.Addr) { - recv := <-b.outbox - return recv.data, recv.src -} - -func (b *Broadcast) Error() error { - if err := b.br.Error(); err != nil { - return err - } - return b.bw.Error() -} - -type broadcastWriter struct { - util.ServiceWithError - port int - inbox chan []byte -} - -func (w *broadcastWriter) serve(stop chan struct{}) error { - l.Debugln(w, "starting") - defer l.Debugln(w, "stopping") - +func writeBroadcasts(inbox <-chan []byte, port int, stop chan struct{}) error { conn, err := net.ListenUDP("udp4", nil) if err != nil { l.Debugln(err) @@ -104,7 +41,7 @@ func (w *broadcastWriter) serve(stop chan struct{}) error { for { var bs []byte select { - case bs = <-w.inbox: + case bs = <-inbox: case <-stop: return nil } @@ -112,8 +49,7 @@ func (w *broadcastWriter) serve(stop chan struct{}) error { addrs, err := net.InterfaceAddrs() if err != nil { l.Debugln(err) - w.SetError(err) - continue + return err } var dsts []net.IP @@ -133,13 +69,13 @@ func (w *broadcastWriter) serve(stop chan struct{}) error { success := 0 for _, ip := range dsts { - dst := &net.UDPAddr{IP: ip, Port: w.port} + dst := &net.UDPAddr{IP: ip, Port: port} conn.SetWriteDeadline(time.Now().Add(time.Second)) - _, err := conn.WriteTo(bs, dst) + _, err = conn.WriteTo(bs, dst) conn.SetWriteDeadline(time.Time{}) - if err, ok := err.(net.Error); ok && err.Timeout() { + if nerr, ok := err.(net.Error); ok && nerr.Timeout() { // Write timeouts should not happen. We treat it as a fatal // error on the socket. l.Debugln(err) @@ -149,7 +85,6 @@ func (w *broadcastWriter) serve(stop chan struct{}) error { if err != nil { // Some other error that we don't expect. Debug and continue. l.Debugln(err) - w.SetError(err) continue } @@ -157,27 +92,15 @@ func (w *broadcastWriter) serve(stop chan struct{}) error { success++ } - if success > 0 { - w.SetError(nil) + if success == 0 { + l.Debugln("couldn't send any braodcasts") + return err } } } -func (w *broadcastWriter) String() string { - return fmt.Sprintf("broadcastWriter@%p", w) -} - -type broadcastReader struct { - util.ServiceWithError - port int - outbox chan recv -} - -func (r *broadcastReader) serve(stop chan struct{}) error { - l.Debugln(r, "starting") - defer l.Debugln(r, "stopping") - - conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: r.port}) +func readBroadcasts(outbox chan<- recv, port int, stop chan struct{}) error { + conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port}) if err != nil { l.Debugln(err) return err @@ -200,14 +123,12 @@ func (r *broadcastReader) serve(stop chan struct{}) error { return err } - r.SetError(nil) - l.Debugf("recv %d bytes from %s", n, addr) c := make([]byte, n) copy(c, bs) select { - case r.outbox <- recv{c, addr}: + case outbox <- recv{c, addr}: case <-stop: return nil default: @@ -216,10 +137,6 @@ func (r *broadcastReader) serve(stop chan struct{}) error { } } -func (r *broadcastReader) String() string { - return fmt.Sprintf("broadcastReader@%p", r) -} - func bcast(ip *net.IPNet) *net.IPNet { var bc = &net.IPNet{} bc.IP = make([]byte, len(ip.IP)) diff --git a/lib/beacon/multicast.go b/lib/beacon/multicast.go index fe592f85..f3c1e7fd 100644 --- a/lib/beacon/multicast.go +++ b/lib/beacon/multicast.go @@ -8,86 +8,25 @@ package beacon import ( "errors" - "fmt" "net" "time" - "github.com/thejerf/suture" "golang.org/x/net/ipv6" - - "github.com/syncthing/syncthing/lib/util" ) -type Multicast struct { - *suture.Supervisor - inbox chan []byte - outbox chan recv - mr *multicastReader - mw *multicastWriter +func NewMulticast(addr string) Interface { + c := newCast("multicastBeacon") + c.addReader(func(stop chan struct{}) error { + return readMulticasts(c.outbox, addr, stop) + }) + c.addWriter(func(stop chan struct{}) error { + return writeMulticasts(c.inbox, addr, stop) + }) + return c } -func NewMulticast(addr string) *Multicast { - m := &Multicast{ - Supervisor: suture.New("multicastBeacon", suture.Spec{ - // Don't retry too frenetically: an error to open a socket or - // whatever is usually something that is either permanent or takes - // a while to get solved... - FailureThreshold: 2, - FailureBackoff: 60 * time.Second, - // Only log restarts in debug mode. - Log: func(line string) { - l.Debugln(line) - }, - PassThroughPanics: true, - }), - inbox: make(chan []byte), - outbox: make(chan recv, 16), - } - - m.mr = &multicastReader{ - addr: addr, - outbox: m.outbox, - } - m.mr.ServiceWithError = util.AsServiceWithError(m.mr.serve) - m.Add(m.mr) - - m.mw = &multicastWriter{ - addr: addr, - inbox: m.inbox, - } - m.mw.ServiceWithError = util.AsServiceWithError(m.mw.serve) - m.Add(m.mw) - - return m -} - -func (m *Multicast) Send(data []byte) { - m.inbox <- data -} - -func (m *Multicast) Recv() ([]byte, net.Addr) { - recv := <-m.outbox - return recv.data, recv.src -} - -func (m *Multicast) Error() error { - if err := m.mr.Error(); err != nil { - return err - } - return m.mw.Error() -} - -type multicastWriter struct { - util.ServiceWithError - addr string - inbox <-chan []byte -} - -func (w *multicastWriter) serve(stop chan struct{}) error { - l.Debugln(w, "starting") - defer l.Debugln(w, "stopping") - - gaddr, err := net.ResolveUDPAddr("udp6", w.addr) +func writeMulticasts(inbox <-chan []byte, addr string, stop chan struct{}) error { + gaddr, err := net.ResolveUDPAddr("udp6", addr) if err != nil { l.Debugln(err) return err @@ -117,7 +56,7 @@ func (w *multicastWriter) serve(stop chan struct{}) error { for { var bs []byte select { - case bs = <-w.inbox: + case bs = <-inbox: case <-stop: return nil } @@ -137,7 +76,6 @@ func (w *multicastWriter) serve(stop chan struct{}) error { if err != nil { l.Debugln(err, "on write to", gaddr, intf.Name) - w.SetError(err) continue } @@ -152,33 +90,20 @@ func (w *multicastWriter) serve(stop chan struct{}) error { } } - if success > 0 { - w.SetError(nil) + if success == 0 { + return err } } } -func (w *multicastWriter) String() string { - return fmt.Sprintf("multicastWriter@%p", w) -} - -type multicastReader struct { - util.ServiceWithError - addr string - outbox chan<- recv -} - -func (r *multicastReader) serve(stop chan struct{}) error { - l.Debugln(r, "starting") - defer l.Debugln(r, "stopping") - - gaddr, err := net.ResolveUDPAddr("udp6", r.addr) +func readMulticasts(outbox chan<- recv, addr string, stop chan struct{}) error { + gaddr, err := net.ResolveUDPAddr("udp6", addr) if err != nil { l.Debugln(err) return err } - conn, err := net.ListenPacket("udp6", r.addr) + conn, err := net.ListenPacket("udp6", addr) if err != nil { l.Debugln(err) return err @@ -226,21 +151,16 @@ func (r *multicastReader) serve(stop chan struct{}) error { n, _, addr, err := pconn.ReadFrom(bs) if err != nil { l.Debugln(err) - r.SetError(err) - continue + return err } l.Debugf("recv %d bytes from %s", n, addr) c := make([]byte, n) copy(c, bs) select { - case r.outbox <- recv{c, addr}: + case outbox <- recv{c, addr}: default: l.Debugln("dropping message") } } } - -func (r *multicastReader) String() string { - return fmt.Sprintf("multicastReader@%p", r) -} diff --git a/lib/discover/local.go b/lib/discover/local.go index fa560b1d..0b2472f3 100644 --- a/lib/discover/local.go +++ b/lib/discover/local.go @@ -22,6 +22,7 @@ import ( "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/rand" + "github.com/syncthing/syncthing/lib/util" "github.com/thejerf/suture" ) @@ -73,30 +74,20 @@ func NewLocal(id protocol.DeviceID, addr string, addrList AddressLister, evLogge if err != nil { return nil, err } - c.startLocalIPv4Broadcasts(bcPort) + c.beacon = beacon.NewBroadcast(bcPort) } else { // A multicast client c.name = "IPv6 local" - c.startLocalIPv6Multicasts(addr) + c.beacon = beacon.NewMulticast(addr) } + c.Add(c.beacon) + c.Add(util.AsService(c.recvAnnouncements)) - go c.sendLocalAnnouncements() + c.Add(util.AsService(c.sendLocalAnnouncements)) return c, nil } -func (c *localClient) startLocalIPv4Broadcasts(localPort int) { - c.beacon = beacon.NewBroadcast(localPort) - c.Add(c.beacon) - go c.recvAnnouncements(c.beacon) -} - -func (c *localClient) startLocalIPv6Multicasts(localMCAddr string) { - c.beacon = beacon.NewMulticast(localMCAddr) - c.Add(c.beacon) - go c.recvAnnouncements(c.beacon) -} - // Lookup returns a list of addresses the device is available at. func (c *localClient) Lookup(device protocol.DeviceID) (addresses []string, err error) { if cache, ok := c.Get(device); ok { @@ -144,7 +135,7 @@ func (c *localClient) announcementPkt(instanceID int64, msg []byte) ([]byte, boo return msg, true } -func (c *localClient) sendLocalAnnouncements() { +func (c *localClient) sendLocalAnnouncements(stop chan struct{}) { var msg []byte var ok bool instanceID := rand.Int63() @@ -156,13 +147,22 @@ func (c *localClient) sendLocalAnnouncements() { select { case <-c.localBcastTick: case <-c.forcedBcastTick: + case <-stop: + return } } } -func (c *localClient) recvAnnouncements(b beacon.Interface) { +func (c *localClient) recvAnnouncements(stop chan struct{}) { + b := c.beacon warnedAbout := make(map[string]bool) for { + select { + case <-stop: + return + default: + } + buf, addr := b.Recv() if len(buf) < 4 { l.Debugf("discover: short packet from %s")