Browse Source

More resilient broadcast handling (fixes #1907)

My theory is that some error condition on the socket results in it
blocking for writes, which maybe also blocks reads... This separates the
two into separate services with their own socket, with restarts and
retries as appropriates on write timeouts and read/write errors. It
should be more robust, hopefully, but I have a hard time testing the
actual error conditions...
Jakob Borg 10 years ago
parent
commit
297769ef57
2 changed files with 149 additions and 28 deletions
  1. 147 19
      internal/beacon/broadcast.go
  2. 2 9
      internal/discover/discover.go

+ 147 - 19
internal/beacon/broadcast.go

@@ -6,31 +6,51 @@
 
 package beacon
 
-import "net"
+import (
+	"fmt"
+	"net"
+	"time"
+
+	"github.com/thejerf/suture"
+)
 
 type Broadcast struct {
-	conn   *net.UDPConn
+	*suture.Supervisor
 	port   int
 	inbox  chan []byte
 	outbox chan recv
 }
 
-func NewBroadcast(port int) (*Broadcast, error) {
-	conn, err := net.ListenUDP("udp4", &net.UDPAddr{Port: port})
-	if err != nil {
-		return nil, err
-	}
+func NewBroadcast(port int) *Broadcast {
 	b := &Broadcast{
-		conn:   conn,
+		Supervisor: suture.New("broadcastBeacon", suture.Spec{
+			// Don't retry too frenetically: an error to open a socket or
+			// whatever is usually something that is either permanent or takes
+			// a while to get solved...
+			FailureThreshold: 2,
+			FailureBackoff:   60 * time.Second,
+			// Only log restarts in debug mode.
+			Log: func(line string) {
+				if debug {
+					l.Debugln(line)
+				}
+			},
+		}),
 		port:   port,
 		inbox:  make(chan []byte),
 		outbox: make(chan recv, 16),
 	}
 
-	go genericReader(b.conn, b.outbox)
-	go b.writer()
-
-	return b, nil
+	b.Add(&broadcastReader{
+		port:   port,
+		outbox: b.outbox,
+	})
+	b.Add(&broadcastWriter{
+		port:  port,
+		inbox: b.inbox,
+	})
+
+	return b
 }
 
 func (b *Broadcast) Send(data []byte) {
@@ -42,13 +62,37 @@ func (b *Broadcast) Recv() ([]byte, net.Addr) {
 	return recv.data, recv.src
 }
 
-func (b *Broadcast) writer() {
-	for bs := range b.inbox {
+type broadcastWriter struct {
+	port   int
+	inbox  chan []byte
+	conn   *net.UDPConn
+	failed bool // Have we already logged a failure reason?
+}
+
+func (w *broadcastWriter) Serve() {
+	if debug {
+		l.Debugln(w, "starting")
+		defer l.Debugln(w, "stopping")
+	}
 
+	var err error
+	w.conn, err = net.ListenUDP("udp4", nil)
+	if err != nil {
+		if !w.failed {
+			l.Warnln("Local discovery over IPv4 unavailable:", err)
+			w.failed = true
+		}
+		return
+	}
+	defer w.conn.Close()
+
+	w.failed = false
+
+	for bs := range w.inbox {
 		addrs, err := net.InterfaceAddrs()
 		if err != nil {
 			if debug {
-				l.Debugln("Broadcast: interface addresses:", err)
+				l.Debugln("Local discovery (broadcast writer):", err)
 			}
 			continue
 		}
@@ -71,13 +115,27 @@ func (b *Broadcast) writer() {
 		}
 
 		for _, ip := range dsts {
-			dst := &net.UDPAddr{IP: ip, Port: b.port}
-
-			_, err := b.conn.WriteTo(bs, dst)
-			if err != nil {
+			dst := &net.UDPAddr{IP: ip, Port: w.port}
+
+			w.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
+			_, err := w.conn.WriteTo(bs, dst)
+			if err, ok := err.(net.Error); ok && err.Timeout() {
+				// Write timeouts should not happen. We treat it as a fatal
+				// error on the socket.
+				l.Infoln("Local discovery (broadcast writer):", err)
+				w.failed = true
+				return
+			} else if err, ok := err.(net.Error); ok && err.Temporary() {
+				// A transient error. Lets hope for better luck in the future.
 				if debug {
 					l.Debugln(err)
 				}
+				continue
+			} else if err != nil {
+				// Some other error that we don't expect. Bail and retry.
+				l.Infoln("Local discovery (broadcast writer):", err)
+				w.failed = true
+				return
 			} else if debug {
 				l.Debugf("sent %d bytes to %s", len(bs), dst)
 			}
@@ -85,6 +143,76 @@ func (b *Broadcast) writer() {
 	}
 }
 
+func (w *broadcastWriter) Stop() {
+	w.conn.Close()
+}
+
+func (w *broadcastWriter) String() string {
+	return fmt.Sprintf("broadcastWriter@%p", w)
+}
+
+type broadcastReader struct {
+	port   int
+	outbox chan recv
+	conn   *net.UDPConn
+	failed bool
+}
+
+func (r *broadcastReader) Serve() {
+	if debug {
+		l.Debugln(r, "starting")
+		defer l.Debugln(r, "stopping")
+	}
+
+	var err error
+	r.conn, err = net.ListenUDP("udp4", &net.UDPAddr{Port: r.port})
+	if err != nil {
+		if !r.failed {
+			l.Warnln("Local discovery over IPv4 unavailable:", err)
+			r.failed = true
+		}
+		return
+	}
+	defer r.conn.Close()
+
+	bs := make([]byte, 65536)
+	for {
+		n, addr, err := r.conn.ReadFrom(bs)
+		if err != nil {
+			if !r.failed {
+				l.Infoln("Local discovery (broadcast reader):", err)
+				r.failed = true
+			}
+			return
+		}
+
+		r.failed = false
+
+		if debug {
+			l.Debugf("recv %d bytes from %s", n, addr)
+		}
+
+		c := make([]byte, n)
+		copy(c, bs)
+		select {
+		case r.outbox <- recv{c, addr}:
+		default:
+			if debug {
+				l.Debugln("dropping message")
+			}
+		}
+	}
+
+}
+
+func (r *broadcastReader) Stop() {
+	r.conn.Close()
+}
+
+func (r *broadcastReader) String() string {
+	return fmt.Sprintf("broadcastReader@%p", r)
+}
+
 func bcast(ip *net.IPNet) *net.IPNet {
 	var bc = &net.IPNet{}
 	bc.IP = make([]byte, len(ip.IP))

+ 2 - 9
internal/discover/discover.go

@@ -86,17 +86,10 @@ func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
 }
 
 func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
-	bb, err := beacon.NewBroadcast(localPort)
-	if err != nil {
-		if debug {
-			l.Debugln("discover: Start local v4:", err)
-		}
-		l.Infoln("Local discovery over IPv4 unavailable")
-		return
-	}
-
+	bb := beacon.NewBroadcast(localPort)
 	d.beacons = append(d.beacons, bb)
 	go d.recvAnnouncements(bb)
+	bb.ServeBackground()
 }
 
 func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {