diff --git a/discover/discover.go b/discover/discover.go index 6e051a4e..e3b4416f 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -24,12 +24,15 @@ type Discoverer struct { listenAddrs []string localBcastIntv time.Duration globalBcastIntv time.Duration + errorRetryIntv time.Duration beacon *beacon.Beacon registry map[protocol.NodeID][]string registryLock sync.RWMutex extServer string extPort uint16 localBcastTick <-chan time.Time + stopGlobal chan struct{} + globalWG sync.WaitGroup forcedBcastTick chan time.Time extAnnounceOK bool extAnnounceOKmut sync.Mutex @@ -54,6 +57,7 @@ func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Disc listenAddrs: addresses, localBcastIntv: 30 * time.Second, globalBcastIntv: 1800 * time.Second, + errorRetryIntv: 60 * time.Second, beacon: b, registry: make(map[protocol.NodeID][]string), } @@ -70,11 +74,20 @@ func (d *Discoverer) StartLocal() { } func (d *Discoverer) StartGlobal(server string, extPort uint16) { + // Wait for any previous announcer to stop before starting a new one. + d.globalWG.Wait() d.extServer = server d.extPort = extPort + d.stopGlobal = make(chan struct{}) + d.globalWG.Add(1) go d.sendExternalAnnouncements() } +func (d *Discoverer) StopGlobal() { + close(d.stopGlobal) + d.globalWG.Wait() +} + func (d *Discoverer) ExtAnnounceOK() bool { d.extAnnounceOKmut.Lock() defer d.extAnnounceOKmut.Unlock() @@ -173,20 +186,19 @@ func (d *Discoverer) sendLocalAnnouncements() { } func (d *Discoverer) sendExternalAnnouncements() { - // this should go in the Discoverer struct - errorRetryIntv := 60 * time.Second + defer d.globalWG.Done() remote, err := net.ResolveUDPAddr("udp", d.extServer) for err != nil { - l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv) - time.Sleep(errorRetryIntv) + l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv) + time.Sleep(d.errorRetryIntv) remote, err = net.ResolveUDPAddr("udp", d.extServer) } conn, err := net.ListenUDP("udp", nil) for err != nil { - l.Warnf("Global discovery: %v; trying again in %v", err, errorRetryIntv) - time.Sleep(errorRetryIntv) + l.Warnf("Global discovery: %v; trying again in %v", err, d.errorRetryIntv) + time.Sleep(d.errorRetryIntv) conn, err = net.ListenUDP("udp", nil) } @@ -201,7 +213,10 @@ func (d *Discoverer) sendExternalAnnouncements() { buf = d.announcementPkt() } - for { + var bcastTick = time.Tick(d.globalBcastIntv) + var errTick <-chan time.Time + + sendOneAnnouncement := func() { var ok bool if debug { @@ -230,11 +245,32 @@ func (d *Discoverer) sendExternalAnnouncements() { d.extAnnounceOKmut.Unlock() if ok { - time.Sleep(d.globalBcastIntv) - } else { - time.Sleep(errorRetryIntv) + errTick = nil + } else if errTick != nil { + errTick = time.Tick(d.errorRetryIntv) } } + + // Announce once, immediately + sendOneAnnouncement() + +loop: + for { + select { + case <-d.stopGlobal: + break loop + + case <-errTick: + sendOneAnnouncement() + + case <-bcastTick: + sendOneAnnouncement() + } + } + + if debug { + l.Debugln("discover: stopping global") + } } func (d *Discoverer) recvAnnouncements() { @@ -295,7 +331,7 @@ func (d *Discoverer) registerNode(addr net.Addr, node Node) bool { } } if debug { - l.Debugf("discover: register: %s -> %#v", node.ID, addrs) + l.Debugf("discover: register: %v -> %#v", node.ID, addrs) } var id protocol.NodeID copy(id[:], node.ID)