Browse Source

wgengine/netstack: block link writes when full rather than drop

Originally identified by Coder and documented in their blog post, this
implementation differs slightly as our link endpoint was introduced for
a different purpose, but the behavior is the same: apply backpressure
rather than dropping packets. This reduces the negative impact of large
packet count bursts substantially. An alternative would be to swell the
size of the channel buffer substantially, however that's largely just
moving where buffering occurs and may lead to reduced signalling back to
lower layer or upstream congestion controls.

Updates #9707
Updates #10408
Updates #12393
Updates tailscale/corp#24483
Updates tailscale/corp#25169

Signed-off-by: James Tucker <[email protected]>
James Tucker 1 year ago
parent
commit
4903d6c80b
1 changed files with 27 additions and 22 deletions
  1. 27 22
      wgengine/netstack/link_endpoint.go

+ 27 - 22
wgengine/netstack/link_endpoint.go

@@ -16,19 +16,27 @@ import (
 )
 
 type queue struct {
-	// TODO(jwhited): evaluate performance with mu as Mutex and/or alternative
-	//  non-channel buffer.
-	c      chan *stack.PacketBuffer
-	mu     sync.RWMutex // mu guards closed
+	// TODO(jwhited): evaluate performance with a non-channel buffer.
+	c chan *stack.PacketBuffer
+
+	closeOnce sync.Once
+	closedCh  chan struct{}
+
+	mu     sync.RWMutex
 	closed bool
 }
 
 func (q *queue) Close() {
+	q.closeOnce.Do(func() {
+		close(q.closedCh)
+	})
+
 	q.mu.Lock()
 	defer q.mu.Unlock()
-	if !q.closed {
-		close(q.c)
+	if q.closed {
+		return
 	}
+	close(q.c)
 	q.closed = true
 }
 
@@ -51,26 +59,27 @@ func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
 }
 
 func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
-	// q holds the PacketBuffer.
 	q.mu.RLock()
 	defer q.mu.RUnlock()
 	if q.closed {
 		return &tcpip.ErrClosedForSend{}
 	}
-
-	wrote := false
 	select {
 	case q.c <- pkt.IncRef():
-		wrote = true
-	default:
-		// TODO(jwhited): reconsider/count
+		return nil
+	case <-q.closedCh:
 		pkt.DecRef()
+		return &tcpip.ErrClosedForSend{}
 	}
+}
 
-	if wrote {
-		return nil
+func (q *queue) Drain() int {
+	c := 0
+	for pkt := range q.c {
+		pkt.DecRef()
+		c++
 	}
-	return &tcpip.ErrNoBufferSpace{}
+	return c
 }
 
 func (q *queue) Num() int {
@@ -107,7 +116,8 @@ func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, supported
 	le := &linkEndpoint{
 		supportedGRO: supportedGRO,
 		q: &queue{
-			c: make(chan *stack.PacketBuffer, size),
+			c:        make(chan *stack.PacketBuffer, size),
+			closedCh: make(chan struct{}),
 		},
 		mtu:      mtu,
 		linkAddr: linkAddr,
@@ -164,12 +174,7 @@ func (l *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
 
 // Drain removes all outbound packets from the channel and counts them.
 func (l *linkEndpoint) Drain() int {
-	c := 0
-	for pkt := l.Read(); pkt != nil; pkt = l.Read() {
-		pkt.DecRef()
-		c++
-	}
-	return c
+	return l.q.Drain()
 }
 
 // NumQueued returns the number of packets queued for outbound.