Browse Source

Enable discovery gossiping

Jakob Borg 11 years ago
parent
commit
1d602b9efa
5 changed files with 203 additions and 59 deletions
  1. 35 11
      discover/PROTOCOL.md
  2. 97 35
      discover/discover.go
  3. 7 2
      discover/packets.go
  4. 55 7
      discover/packets_xdr.go
  5. 9 4
      xdr/reader.go

+ 35 - 11
discover/PROTOCOL.md

@@ -36,12 +36,28 @@ The Announcement packet has the following structure:
      0                   1                   2                   3
      0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |                   Magic Number (0x029E4C77)                   |
+    |                      Magic (0x029E4C77)                       |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |                       Length of Node ID                       |
+    /                                                               /
+    \                        Node Structure                         \
+    /                                                               /
+    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+    |                     Number of Extra Nodes                     |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     /                                                               /
-    \                   Node ID (variable length)                   \
+    \                 Zero or more Node Structures                  \
+    /                                                               /
+    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+    Node Structure:
+
+     0                   1                   2                   3
+     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+    |                         Length of ID                          |
+    +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+    /                                                               /
+    \                     ID (variable length)                      \
     /                                                               /
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
     |                      Number of Addresses                      |
@@ -62,29 +78,37 @@ The Announcement packet has the following structure:
     \                     IP (variable length)                      \
     /                                                               /
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-    |          Port Number          |            0x0000             |
+    |             Port              |            0x0000             |
     +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
 
 This is the XDR encoding of:
 
     struct Announcement {
-        unsigned int MagicNumber;
-        string NodeID<>;
+        unsigned int Magic;
+        Node This;
+        Node Extra<>;
+    }
+
+    struct Node {
+        string ID<>;
         Address Addresses<>;
     }
 
     struct Address {
         opaque IP<>;
-        unsigned short PortNumber;
+        unsigned short Port;
     }
 
-NodeID is padded to a multiple of 32 bits and all fields are in sent in
-network (big endian) byte order. In the Address structure, the IP field
-can be of three differnt kinds;
+The first Node structure contains information about the sending node.
+The following zero or more Extra nodes contain information about other
+nodes known to the sending node.
+
+In the Address structure, the IP field can be of three differnt kinds;
 
  - A zero length indicates that the IP address should be taken from the
    source address of the announcement packet, be it IPv4 or IPv6. The
-   source address must be a valid unicast address.
+   source address must be a valid unicast address. This is only valid
+   in the first node structure, not in the list of extras.
 
  - A four byte length indicates that the address is an IPv4 unicast
    address.

+ 97 - 35
discover/discover.go

@@ -4,6 +4,7 @@ import (
 	"encoding/hex"
 	"errors"
 	"fmt"
+	"io"
 	"log"
 	"net"
 	"sync"
@@ -108,18 +109,34 @@ func (d *Discoverer) announcementPkt() []byte {
 		}
 	}
 	var pkt = AnnounceV2{
-		Magic:     AnnouncementMagicV2,
-		NodeID:    d.myID,
-		Addresses: addrs,
+		Magic: AnnouncementMagicV2,
+		This:  Node{d.myID, addrs},
 	}
 	return pkt.MarshalXDR()
 }
 
 func (d *Discoverer) sendLocalAnnouncements() {
-	var buf = d.announcementPkt()
+	var addrs = resolveAddrs(d.listenAddrs)
+
+	var pkt = AnnounceV2{
+		Magic: AnnouncementMagicV2,
+		This:  Node{d.myID, addrs},
+	}
 
 	for {
-		d.beacon.Send(buf)
+		pkt.Extra = nil
+		d.registryLock.RLock()
+		for node, addrs := range d.registry {
+			if len(pkt.Extra) == 16 {
+				break
+			}
+
+			anode := Node{node, resolveAddrs(addrs)}
+			pkt.Extra = append(pkt.Extra, anode)
+		}
+		d.registryLock.RUnlock()
+
+		d.beacon.Send(pkt.MarshalXDR())
 
 		select {
 		case <-d.localBcastTick:
@@ -144,9 +161,8 @@ func (d *Discoverer) sendExternalAnnouncements() {
 	var buf []byte
 	if d.extPort != 0 {
 		var pkt = AnnounceV2{
-			Magic:     AnnouncementMagicV2,
-			NodeID:    d.myID,
-			Addresses: []Address{{Port: d.extPort}},
+			Magic: AnnouncementMagicV2,
+			This:  Node{d.myID, []Address{{Port: d.extPort}}},
 		}
 		buf = pkt.MarshalXDR()
 	} else {
@@ -203,7 +219,7 @@ func (d *Discoverer) recvAnnouncements() {
 
 		var pkt AnnounceV2
 		err := pkt.UnmarshalXDR(buf)
-		if err != nil {
+		if err != nil && err != io.EOF {
 			continue
 		}
 
@@ -211,35 +227,55 @@ func (d *Discoverer) recvAnnouncements() {
 			dlog.Printf("parsed announcement: %#v", pkt)
 		}
 
-		if pkt.NodeID != d.myID {
-			var addrs []string
-			for _, a := range pkt.Addresses {
-				var nodeAddr string
-				if len(a.IP) > 0 {
-					nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
-				} else {
-					ua := addr.(*net.UDPAddr)
-					ua.Port = int(a.Port)
-					nodeAddr = ua.String()
-				}
-				addrs = append(addrs, nodeAddr)
-			}
-			if debug {
-				dlog.Printf("register: %#v", addrs)
+		var newNode bool
+		if pkt.This.ID != d.myID {
+			n := d.registerNode(addr, pkt.This)
+			newNode = newNode || n
+		}
+		for _, node := range pkt.Extra {
+			if node.ID != d.myID {
+				n := d.registerNode(nil, node)
+				newNode = newNode || n
 			}
-			d.registryLock.Lock()
-			_, seen := d.registry[pkt.NodeID]
-			if !seen {
-				select {
-				case d.forcedBcastTick <- time.Now():
-				}
+		}
+
+		if newNode {
+			select {
+			case d.forcedBcastTick <- time.Now():
 			}
-			d.registry[pkt.NodeID] = addrs
-			d.registryLock.Unlock()
 		}
 	}
 }
 
+func (d *Discoverer) registerNode(addr net.Addr, node Node) bool {
+	var addrs []string
+	for _, a := range node.Addresses {
+		var nodeAddr string
+		if len(a.IP) > 0 {
+			nodeAddr = fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
+			addrs = append(addrs, nodeAddr)
+		} else if addr != nil {
+			ua := addr.(*net.UDPAddr)
+			ua.Port = int(a.Port)
+			nodeAddr = ua.String()
+			addrs = append(addrs, nodeAddr)
+		}
+	}
+	if len(addrs) == 0 {
+		if debug {
+			dlog.Println("no valid address for", node.ID)
+		}
+	}
+	if debug {
+		dlog.Printf("register: %s -> %#v", node.ID, addrs)
+	}
+	d.registryLock.Lock()
+	_, seen := d.registry[node.ID]
+	d.registry[node.ID] = addrs
+	d.registryLock.Unlock()
+	return !seen
+}
+
 func (d *Discoverer) externalLookup(node string) []string {
 	extIP, err := net.ResolveUDPAddr("udp", d.extServer)
 	if err != nil {
@@ -268,7 +304,7 @@ func (d *Discoverer) externalLookup(node string) []string {
 	}
 	buffers.Put(buf)
 
-	buf = buffers.Get(256)
+	buf = buffers.Get(2048)
 	defer buffers.Put(buf)
 
 	n, err := conn.Read(buf)
@@ -287,7 +323,7 @@ func (d *Discoverer) externalLookup(node string) []string {
 
 	var pkt AnnounceV2
 	err = pkt.UnmarshalXDR(buf[:n])
-	if err != nil {
+	if err != nil && err != io.EOF {
 		log.Println("discover/external/decode:", err)
 		return nil
 	}
@@ -297,9 +333,35 @@ func (d *Discoverer) externalLookup(node string) []string {
 	}
 
 	var addrs []string
-	for _, a := range pkt.Addresses {
+	for _, a := range pkt.This.Addresses {
 		nodeAddr := fmt.Sprintf("%s:%d", net.IP(a.IP), a.Port)
 		addrs = append(addrs, nodeAddr)
 	}
 	return addrs
 }
+
+func addrToAddr(addr *net.TCPAddr) Address {
+	if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
+		return Address{Port: uint16(addr.Port)}
+	} else if bs := addr.IP.To4(); bs != nil {
+		return Address{IP: bs, Port: uint16(addr.Port)}
+	} else if bs := addr.IP.To16(); bs != nil {
+		return Address{IP: bs, Port: uint16(addr.Port)}
+	}
+	return Address{}
+}
+
+func resolveAddrs(addrs []string) []Address {
+	var raddrs []Address
+	for _, addrStr := range addrs {
+		addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
+		if err != nil {
+			continue
+		}
+		addr := addrToAddr(addrRes)
+		if len(addr.IP) > 0 {
+			raddrs = append(raddrs, addr)
+		}
+	}
+	return raddrs
+}

+ 7 - 2
discover/packets.go

@@ -11,8 +11,13 @@ type QueryV2 struct {
 }
 
 type AnnounceV2 struct {
-	Magic     uint32
-	NodeID    string    // max:64
+	Magic uint32
+	This  Node
+	Extra []Node // max:16
+}
+
+type Node struct {
+	ID        string    // max:64
 	Addresses []Address // max:16
 }
 

+ 55 - 7
discover/packets_xdr.go

@@ -59,10 +59,59 @@ func (o AnnounceV2) MarshalXDR() []byte {
 
 func (o AnnounceV2) encodeXDR(xw *xdr.Writer) (int, error) {
 	xw.WriteUint32(o.Magic)
-	if len(o.NodeID) > 64 {
+	o.This.encodeXDR(xw)
+	if len(o.Extra) > 16 {
 		return xw.Tot(), xdr.ErrElementSizeExceeded
 	}
-	xw.WriteString(o.NodeID)
+	xw.WriteUint32(uint32(len(o.Extra)))
+	for i := range o.Extra {
+		o.Extra[i].encodeXDR(xw)
+	}
+	return xw.Tot(), xw.Error()
+}
+
+func (o *AnnounceV2) DecodeXDR(r io.Reader) error {
+	xr := xdr.NewReader(r)
+	return o.decodeXDR(xr)
+}
+
+func (o *AnnounceV2) UnmarshalXDR(bs []byte) error {
+	var buf = bytes.NewBuffer(bs)
+	var xr = xdr.NewReader(buf)
+	return o.decodeXDR(xr)
+}
+
+func (o *AnnounceV2) decodeXDR(xr *xdr.Reader) error {
+	o.Magic = xr.ReadUint32()
+	(&o.This).decodeXDR(xr)
+	_ExtraSize := int(xr.ReadUint32())
+	if _ExtraSize > 16 {
+		return xdr.ErrElementSizeExceeded
+	}
+	o.Extra = make([]Node, _ExtraSize)
+	for i := range o.Extra {
+		(&o.Extra[i]).decodeXDR(xr)
+	}
+	return xr.Error()
+}
+
+func (o Node) EncodeXDR(w io.Writer) (int, error) {
+	var xw = xdr.NewWriter(w)
+	return o.encodeXDR(xw)
+}
+
+func (o Node) MarshalXDR() []byte {
+	var buf bytes.Buffer
+	var xw = xdr.NewWriter(&buf)
+	o.encodeXDR(xw)
+	return buf.Bytes()
+}
+
+func (o Node) encodeXDR(xw *xdr.Writer) (int, error) {
+	if len(o.ID) > 64 {
+		return xw.Tot(), xdr.ErrElementSizeExceeded
+	}
+	xw.WriteString(o.ID)
 	if len(o.Addresses) > 16 {
 		return xw.Tot(), xdr.ErrElementSizeExceeded
 	}
@@ -73,20 +122,19 @@ func (o AnnounceV2) encodeXDR(xw *xdr.Writer) (int, error) {
 	return xw.Tot(), xw.Error()
 }
 
-func (o *AnnounceV2) DecodeXDR(r io.Reader) error {
+func (o *Node) DecodeXDR(r io.Reader) error {
 	xr := xdr.NewReader(r)
 	return o.decodeXDR(xr)
 }
 
-func (o *AnnounceV2) UnmarshalXDR(bs []byte) error {
+func (o *Node) UnmarshalXDR(bs []byte) error {
 	var buf = bytes.NewBuffer(bs)
 	var xr = xdr.NewReader(buf)
 	return o.decodeXDR(xr)
 }
 
-func (o *AnnounceV2) decodeXDR(xr *xdr.Reader) error {
-	o.Magic = xr.ReadUint32()
-	o.NodeID = xr.ReadStringMax(64)
+func (o *Node) decodeXDR(xr *xdr.Reader) error {
+	o.ID = xr.ReadStringMax(64)
 	_AddressesSize := int(xr.ReadUint32())
 	if _AddressesSize > 16 {
 		return xdr.ErrElementSizeExceeded

+ 9 - 4
xdr/reader.go

@@ -72,20 +72,25 @@ func (r *Reader) ReadUint16() uint16 {
 }
 
 func (r *Reader) ReadUint32() uint32 {
+	var n int
 	if r.err != nil {
 		return 0
 	}
-	_, r.err = io.ReadFull(r.r, r.b[:4])
-	r.tot += 4
+	n, r.err = io.ReadFull(r.r, r.b[:4])
+	if n < 4 {
+		return 0
+	}
+	r.tot += n
 	return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24
 }
 
 func (r *Reader) ReadUint64() uint64 {
+	var n int
 	if r.err != nil {
 		return 0
 	}
-	_, r.err = io.ReadFull(r.r, r.b[:8])
-	r.tot += 8
+	n, r.err = io.ReadFull(r.r, r.b[:8])
+	r.tot += n
 	return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 |
 		uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56
 }