Perform external queries

This commit is contained in:
Jakob Borg
2013-12-22 17:13:51 -05:00
parent e48222ada0
commit 31ea72dbb3
4 changed files with 282 additions and 111 deletions

View File

@@ -20,6 +20,12 @@ following format:
\ NodeID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of IP |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ IP (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
This is the XDR encoding of:
@@ -31,11 +37,15 @@ struct Announcement {
(Hence NodeID is padded to a multiple of 32 bits)
The sending node's address is not encoded -- it is taken to be the source
address of the announcement. Every time such a packet is received, a local
table that maps NodeID to Address is updated. When the local node wants to
connect to another node with the address specification 'dynamic', this table is
consulted.
The sending node's address is not encoded in local announcement -- the Length
of IP field is set to zero and the address is taken to be the source address of
the announcement. In announcement packets sent by a discovery server in
response to a query, the IP is present and the length is either 4 (IPv4) or 16
(IPv6).
Every time such a packet is received, a local table that maps NodeID to Address
is updated. When the local node wants to connect to another node with the
address specification 'dynamic', this table is consulted.
For external discovery, an identical packet is sent every 30 minutes to the
external discovery server. The server keeps information for up to 60 minutes.
@@ -71,8 +81,6 @@ server from being used as an amplifier in a DDoS attack.)
package discover
import (
"encoding/binary"
"errors"
"fmt"
"log"
"net"
@@ -86,11 +94,6 @@ const (
QueryMagic = 0x19760309
)
var (
errBadMagic = errors.New("bad magic")
errFormat = errors.New("incorrect packet format")
)
type Discoverer struct {
MyID string
ListenPort int
@@ -104,13 +107,7 @@ type Discoverer struct {
extServer string
}
type packet struct {
magic uint32 // AnnouncementMagic or QueryMagic
port uint16 // unset if magic == QueryMagic
id string
}
// We tolerate a certain amount of errors because we might be running in
// We tolerate a certain amount of errors because we might be running on
// laptops that sleep and wake, have intermittent network connectivity, etc.
// When we hit this many errors in succession, we stop.
const maxErrors = 30
@@ -149,7 +146,7 @@ func NewDiscoverer(id string, port int, extPort int, extServer string) (*Discove
func (d *Discoverer) sendAnnouncements() {
remote4 := &net.UDPAddr{IP: net.IP{255, 255, 255, 255}, Port: AnnouncementPort}
buf := encodePacket(packet{AnnouncementMagic, uint16(d.ListenPort), d.MyID})
buf := encodePacket(packet{AnnouncementMagic, uint16(d.ListenPort), d.MyID, nil})
go d.writeAnnouncements(buf, remote4, d.BroadcastIntv)
}
@@ -160,7 +157,7 @@ func (d *Discoverer) sendExtAnnouncements() {
return
}
buf := encodePacket(packet{AnnouncementMagic, uint16(d.ExtListenPort), d.MyID})
buf := encodePacket(packet{AnnouncementMagic, uint16(d.ExtListenPort), d.MyID, nil})
for _, extIP := range extIPs {
remote4 := &net.UDPAddr{IP: extIP, Port: AnnouncementPort}
go d.writeAnnouncements(buf, remote4, d.ExtBroadcastIntv)
@@ -215,93 +212,77 @@ func (d *Discoverer) recvAnnouncements() {
log.Println("discover/read: stopping due to too many errors:", err)
}
func (d *Discoverer) externalLookup(node string) (string, bool) {
extIPs, err := net.LookupIP(d.extServer)
if err != nil {
log.Printf("discover/external: %v; no external lookup", err)
return "", false
}
var res = make(chan string, len(extIPs))
var failed = 0
for _, extIP := range extIPs {
remote := &net.UDPAddr{IP: extIP, Port: AnnouncementPort}
conn, err := net.DialUDP("udp", nil, remote)
if err != nil {
log.Printf("discover/external: %v; no external lookup", err)
failed++
continue
}
_, err = conn.Write(encodePacket(packet{QueryMagic, 0, node, nil}))
if err != nil {
log.Printf("discover/external: %v; no external lookup", err)
failed++
continue
}
go func() {
var buf = make([]byte, 1024)
_, err = conn.Read(buf)
if err != nil {
log.Printf("discover/external/read: %v; no external lookup", err)
return
}
pkt, err := decodePacket(buf)
if err != nil {
log.Printf("discover/external/read: %v; no external lookup", err)
return
}
if pkt.magic != AnnouncementMagic {
log.Printf("discover/external/read: bad magic; no external lookup", err)
return
}
res <- fmt.Sprintf("%s:%d", ipStr(pkt.ip), pkt.port)
}()
}
if failed == len(extIPs) {
// no point in waiting
return "", false
}
select {
case r := <-res:
return r, true
case <-time.After(5 * time.Second):
return "", false
}
}
func (d *Discoverer) Lookup(node string) (string, bool) {
d.registryLock.Lock()
defer d.registryLock.Unlock()
addr, ok := d.registry[node]
return addr, ok
}
func encodePacket(pkt packet) []byte {
var idbs = []byte(pkt.id)
var l = len(idbs) + pad(len(idbs)) + 4 + 4
if pkt.magic == AnnouncementMagic {
l += 4
}
var buf = make([]byte, l)
var offset = 0
binary.BigEndian.PutUint32(buf[offset:], pkt.magic)
offset += 4
if pkt.magic == AnnouncementMagic {
binary.BigEndian.PutUint16(buf[offset:], uint16(pkt.port))
offset += 4
}
binary.BigEndian.PutUint32(buf[offset:], uint32(len(idbs)))
offset += 4
copy(buf[offset:], idbs)
return buf
}
func decodePacket(buf []byte) (*packet, error) {
var p packet
var offset int
if len(buf) < 4 {
// short packet
return nil, errFormat
}
p.magic = binary.BigEndian.Uint32(buf[offset:])
offset += 4
if p.magic != AnnouncementMagic && p.magic != QueryMagic {
return nil, errBadMagic
}
if p.magic == AnnouncementMagic {
if len(buf) < offset+4 {
// short packet
return nil, errFormat
}
p.port = binary.BigEndian.Uint16(buf[offset:])
offset += 2
reserved := binary.BigEndian.Uint16(buf[offset:])
if reserved != 0 {
return nil, errFormat
}
offset += 2
}
if len(buf) < offset+4 {
// short packet
return nil, errFormat
}
l := binary.BigEndian.Uint32(buf[offset:])
offset += 4
if len(buf) < offset+int(l)+pad(int(l)) {
// short packet
return nil, errFormat
}
idbs := buf[offset : offset+int(l)]
p.id = string(idbs)
offset += int(l) + pad(int(l))
if len(buf[offset:]) > 0 {
// extra data
return nil, errFormat
}
return &p, nil
}
func pad(l int) int {
d := l % 4
if d == 0 {
return 0
}
return 4 - d
d.registryLock.Unlock()
if ok {
return addr, true
} else if len(d.extServer) != 0 {
// We might want to cache this, but not permanently so it needs some intelligence
return d.externalLookup(node)
}
return "", false
}