Browse Source

Merge pull request #1933 from calmh/fix-1907

More resilient broadcast handling (fixes #1907)
Audrius Butkevicius 10 years ago
parent
commit
983d7ec265
2 changed files with 149 additions and 28 deletions
  1. 147 19
      internal/beacon/broadcast.go
  2. 2 9
      internal/discover/discover.go

+ 147 - 19
internal/beacon/broadcast.go

@@ -6,31 +6,51 @@
 
 package beacon
 
-import "net"
+import (
+	"fmt"
+	"net"
+	"time"
+
+	"github.com/thejerf/suture"
+)
 
 type Broadcast struct {
-	conn   *net.UDPConn
+	*suture.Supervisor
 	port   int
 	inbox  chan []byte
 	outbox chan recv
 }
 
-func NewBroadcast(port int) (*Broadcast, error) {
-	conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port})
-	if err != nil {
-		return nil, err
-	}
+func NewBroadcast(port int) *Broadcast {
 	b := &Broadcast{
-		conn:   conn,
+		Supervisor: suture.New("broadcastBeacon", suture.Spec{
+			// Don't retry too frenetically: an error to open a socket or
+			// whatever is usually something that is either permanent or takes
+			// a while to get solved...
+			FailureThreshold: 2,
+			FailureBackoff:   60 * time.Second,
+			// Only log restarts in debug mode.
+			Log: func(line string) {
+				if debug {
+					l.Debugln(line)
+				}
+			},
+		}),
 		port:   port,
 		inbox:  make(chan []byte),
 		outbox: make(chan recv, 16),
 	}
 
-	go genericReader(b.conn, b.outbox)
-	go b.writer()
-
-	return b, nil
+	b.Add(&broadcastReader{
+		port:   port,
+		outbox: b.outbox,
+	})
+	b.Add(&broadcastWriter{
+		port:  port,
+		inbox: b.inbox,
+	})
+
+	return b
 }
 
 func (b *Broadcast) Send(data []byte) {
@@ -42,13 +62,37 @@ func (b *Broadcast) Recv() ([]byte, net.Addr) {
 	return recv.data, recv.src
 }
 
-func (b *Broadcast) writer() {
-	for bs := range b.inbox {
+type broadcastWriter struct {
+	port   int
+	inbox  chan []byte
+	conn   *net.UDPConn
+	failed bool // Have we already logged a failure reason?
+}
+
+func (w *broadcastWriter) Serve() {
+	if debug {
+		l.Debugln(w, "starting")
+		defer l.Debugln(w, "stopping")
+	}
 
+	var err error
+	w.conn, err = net.ListenUDP("udp4", nil)
+	if err != nil {
+		if !w.failed {
+			l.Warnln("Local discovery over IPv4 unavailable:", err)
+			w.failed = true
+		}
+		return
+	}
+	defer w.conn.Close()
+
+	w.failed = false
+
+	for bs := range w.inbox {
 		addrs, err := net.InterfaceAddrs()
 		if err != nil {
 			if debug {
-				l.Debugln("Broadcast: interface addresses:", err)
+				l.Debugln("Local discovery (broadcast writer):", err)
 			}
 			continue
 		}
@@ -71,13 +115,27 @@ func (b *Broadcast) writer() {
 		}
 
 		for _, ip := range dsts {
-			dst := &net.UDPAddr{IP: ip, Port: b.port}
-
-			_, err := b.conn.WriteTo(bs, dst)
-			if err != nil {
+			dst := &net.UDPAddr{IP: ip, Port: w.port}
+
+			w.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			_, err := w.conn.WriteTo(bs, dst)
+			if err, ok := err.(net.Error); ok && err.Timeout() {
+				// Write timeouts should not happen. We treat it as a fatal
+				// error on the socket.
+				l.Infoln("Local discovery (broadcast writer):", err)
+				w.failed = true
+				return
+			} else if err, ok := err.(net.Error); ok && err.Temporary() {
+				// A transient error. Lets hope for better luck in the future.
 				if debug {
 					l.Debugln(err)
 				}
+				continue
+			} else if err != nil {
+				// Some other error that we don't expect. Bail and retry.
+				l.Infoln("Local discovery (broadcast writer):", err)
+				w.failed = true
+				return
 			} else if debug {
 				l.Debugf("sent %d bytes to %s", len(bs), dst)
 			}
@@ -85,6 +143,76 @@ func (b *Broadcast) writer() {
 	}
 }
 
+func (w *broadcastWriter) Stop() {
+	w.conn.Close()
+}
+
+func (w *broadcastWriter) String() string {
+	return fmt.Sprintf("broadcastWriter@%p", w)
+}
+
+type broadcastReader struct {
+	port   int
+	outbox chan recv
+	conn   *net.UDPConn
+	failed bool
+}
+
+func (r *broadcastReader) Serve() {
+	if debug {
+		l.Debugln(r, "starting")
+		defer l.Debugln(r, "stopping")
+	}
+
+	var err error
+	r.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: r.port})
+	if err != nil {
+		if !r.failed {
+			l.Warnln("Local discovery over IPv4 unavailable:", err)
+			r.failed = true
+		}
+		return
+	}
+	defer r.conn.Close()
+
+	bs := make([]byte, 65536)
+	for {
+		n, addr, err := r.conn.ReadFrom(bs)
+		if err != nil {
+			if !r.failed {
+				l.Infoln("Local discovery (broadcast reader):", err)
+				r.failed = true
+			}
+			return
+		}
+
+		r.failed = false
+
+		if debug {
+			l.Debugf("recv %d bytes from %s", n, addr)
+		}
+
+		c := make([]byte, n)
+		copy(c, bs)
+		select {
+		case r.outbox <- recv{c, addr}:
+		default:
+			if debug {
+				l.Debugln("dropping message")
+			}
+		}
+	}
+
+}
+
+func (r *broadcastReader) Stop() {
+	r.conn.Close()
+}
+
+func (r *broadcastReader) String() string {
+	return fmt.Sprintf("broadcastReader@%p", r)
+}
+
 func bcast(ip *net.IPNet) *net.IPNet {
 	var bc = &net.IPNet{}
 	bc.IP = make([]byte, len(ip.IP))

+ 2 - 9
internal/discover/discover.go

@@ -86,17 +86,10 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
 }
 
 func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
-	bb, err := beacon.NewBroadcast(localPort)
-	if err != nil {
-		if debug {
-			l.Debugln("discover: Start local v4:", err)
-		}
-		l.Infoln("Local discovery over IPv4 unavailable")
-		return
-	}
-
+	bb := beacon.NewBroadcast(localPort)
 	d.beacons = append(d.beacons, bb)
 	go d.recvAnnouncements(bb)
+	bb.ServeBackground()
 }
 
 func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {