diff --git a/discover/discover.go b/discover/discover.go index d620b632..e0d2e593 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -25,8 +25,9 @@ type Discoverer struct { localBcastIntv time.Duration globalBcastIntv time.Duration errorRetryIntv time.Duration + cacheLifetime time.Duration beacon *beacon.Beacon - registry map[protocol.NodeID][]string + registry map[protocol.NodeID][]cacheEntry registryLock sync.RWMutex extServer string extPort uint16 @@ -39,6 +40,11 @@ type Discoverer struct { globalBcastStop chan bool } +type cacheEntry struct { + addr string + seen time.Time +} + var ( ErrIncorrectMagic = errors.New("incorrect magic number") ) @@ -59,8 +65,9 @@ func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Disc localBcastIntv: 30 * time.Second, globalBcastIntv: 1800 * time.Second, errorRetryIntv: 60 * time.Second, + cacheLifetime: 5 * time.Minute, beacon: b, - registry: make(map[protocol.NodeID][]string), + registry: make(map[protocol.NodeID][]cacheEntry), } go disc.recvAnnouncements() @@ -97,14 +104,28 @@ func (d *Discoverer) ExtAnnounceOK() bool { func (d *Discoverer) Lookup(node protocol.NodeID) []string { d.registryLock.Lock() - addr, ok := d.registry[node] + cached := d.filterCached(d.registry[node]) d.registryLock.Unlock() - if ok { - return addr + if len(cached) > 0 { + addrs := make([]string, len(cached)) + for i := range cached { + addrs[i] = cached[i].addr + } + return addrs } else if len(d.extServer) != 0 { - // We might want to cache this, but not permanently so it needs some intelligence - return d.externalLookup(node) + addrs := d.externalLookup(node) + cached = make([]cacheEntry, len(addrs)) + for i := range addrs { + cached[i] = cacheEntry{ + addr: addrs[i], + seen: time.Now(), + } + } + + d.registryLock.Lock() + d.registry[node] = cached + d.registryLock.Unlock() } return nil } @@ -119,11 +140,11 @@ func (d *Discoverer) Hint(node string, addrs []string) { }) } -func (d *Discoverer) All() map[protocol.NodeID][]string { +func (d *Discoverer) All() map[protocol.NodeID][]cacheEntry { d.registryLock.RLock() - nodes := make(map[protocol.NodeID][]string, len(d.registry)) + nodes := make(map[protocol.NodeID][]cacheEntry, len(d.registry)) for node, addrs := range d.registry { - addrsCopy := make([]string, len(addrs)) + addrsCopy := make([]cacheEntry, len(addrs)) copy(addrsCopy, addrs) nodes[node] = addrsCopy } @@ -163,21 +184,10 @@ func (d *Discoverer) sendLocalAnnouncements() { Magic: AnnouncementMagic, This: Node{d.myID[:], addrs}, } + msg := pkt.MarshalXDR() for { - pkt.Extra = nil - d.registryLock.RLock() - for node, addrs := range d.registry { - if len(pkt.Extra) == 16 { - break - } - - anode := Node{node[:], resolveAddrs(addrs)} - pkt.Extra = append(pkt.Extra, anode) - } - d.registryLock.RUnlock() - - d.beacon.Send(pkt.MarshalXDR()) + d.beacon.Send(msg) select { case <-d.localBcastTick: @@ -279,7 +289,7 @@ func (d *Discoverer) recvAnnouncements() { buf, addr := d.beacon.Recv() if debug { - l.Debugf("discover: read announcement:\n%s", hex.Dump(buf)) + l.Debugf("discover: read announcement from %s:\n%s", addr, hex.Dump(buf)) } var pkt Announce @@ -288,20 +298,9 @@ func (d *Discoverer) recvAnnouncements() { continue } - if debug { - l.Debugf("discover: parsed announcement: %#v", pkt) - } - var newNode bool if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 { newNode = d.registerNode(addr, pkt.This) - for _, node := range pkt.Extra { - if bytes.Compare(node.ID, d.myID[:]) != 0 { - if d.registerNode(nil, node) { - newNode = true - } - } - } } if newNode { @@ -313,41 +312,57 @@ func (d *Discoverer) recvAnnouncements() { } func (d *Discoverer) registerNode(addr net.Addr, node Node) bool { - var addrs []string + var id protocol.NodeID + copy(id[:], node.ID) + + d.registryLock.RLock() + current := d.filterCached(d.registry[id]) + d.registryLock.RUnlock() + + orig := current + for _, a := range node.Addresses { var nodeAddr string if len(a.IP) > 0 { nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port) - addrs = append(addrs, nodeAddr) } else if addr != nil { ua := addr.(*net.UDPAddr) ua.Port = int(a.Port) nodeAddr = ua.String() - addrs = append(addrs, nodeAddr) } - } - if len(addrs) == 0 { - if debug { - l.Debugln("discover: no valid address for", node.ID) + for i := range current { + if current[i].addr == nodeAddr { + current[i].seen = time.Now() + goto done + } } + current = append(current, cacheEntry{ + addr: nodeAddr, + seen: time.Now(), + }) + done: } + if debug { - l.Debugf("discover: register: %v -> %#v", node.ID, addrs) + l.Debugf("discover: register: %v -> %v", id, current) } - var id protocol.NodeID - copy(id[:], node.ID) + d.registryLock.Lock() - _, seen := d.registry[id] - d.registry[id] = addrs + d.registry[id] = current d.registryLock.Unlock() - if !seen { + if len(current) > len(orig) { + addrs := make([]string, len(current)) + for i := range current { + addrs[i] = current[i].addr + } events.Default.Log(events.NodeDiscovered, map[string]interface{}{ "node": id.String(), "addrs": addrs, }) } - return !seen + + return len(current) > len(orig) } func (d *Discoverer) externalLookup(node protocol.NodeID) []string { @@ -411,10 +426,6 @@ func (d *Discoverer) externalLookup(node protocol.NodeID) []string { return nil } - if debug { - l.Debugf("discover: parsed external: %#v", pkt) - } - var addrs []string for _, a := range pkt.This.Addresses { nodeAddr := fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port) @@ -423,6 +434,21 @@ func (d *Discoverer) externalLookup(node protocol.NodeID) []string { return addrs } +func (d *Discoverer) filterCached(c []cacheEntry) []cacheEntry { + for i := 0; i < len(c); { + if ago := time.Since(c[i].seen); ago > d.cacheLifetime { + if debug { + l.Debugf("removing cached address %s: seen %v ago", c[i].addr, ago) + } + c[i] = c[len(c)-1] + c = c[:len(c)-1] + } else { + i++ + } + } + return c +} + func addrToAddr(addr *net.TCPAddr) Address { if len(addr.IP) == 0 || addr.IP.IsUnspecified() { return Address{Port: uint16(addr.Port)}