浏览代码

wgengine/magicsock: send CallMeMaybeVia for relay endpoints (#16360)

If we acted as the allocator we are responsible for signaling it to the
remote peer in a CallMeMaybeVia message over DERP.

Updates tailscale/corp#27502

Signed-off-by: Jordan Whited <[email protected]>
Jordan Whited 8 月之前
父节点
当前提交
31eebdb0f8
共有 1 个文件被更改,包括 34 次插入4 次删除
  1. 34 4
      wgengine/magicsock/relaymanager.go

+ 34 - 4
wgengine/magicsock/relaymanager.go

@@ -30,8 +30,9 @@ import (
 //
 // [relayManager] methods can be called by [Conn] and [endpoint] while their .mu
 // mutexes are held. Therefore, in order to avoid deadlocks, [relayManager] must
-// never attempt to acquire those mutexes, including synchronous calls back
-// towards [Conn] or [endpoint] methods that acquire them.
+// never attempt to acquire those mutexes synchronously from its runLoop(),
+// including synchronous calls back towards [Conn] or [endpoint] methods that
+// acquire them.
 type relayManager struct {
 	initOnce sync.Once
 
@@ -584,9 +585,37 @@ func (r *relayManager) handleNewServerEndpointRunLoop(newServerEndpoint newRelay
 	byServerDisco[newServerEndpoint.se.ServerDisco] = work
 	r.handshakeWorkByServerDiscoVNI[sdv] = work
 
+	if newServerEndpoint.server.IsValid() {
+		// Send CallMeMaybeVia to the remote peer if we allocated this endpoint.
+		go r.sendCallMeMaybeVia(work.ep, work.se)
+	}
+
 	go r.handshakeServerEndpoint(work)
 }
 
+// sendCallMeMaybeVia sends a [disco.CallMeMaybeVia] to ep over DERP. It must be
+// called as part of a goroutine independent from runLoop(), for 2 reasons:
+//  1. it acquires ep.mu (refer to [relayManager] docs for reasoning)
+//  2. it makes a networking syscall, which can introduce unwanted backpressure
+func (r *relayManager) sendCallMeMaybeVia(ep *endpoint, se udprelay.ServerEndpoint) {
+	ep.mu.Lock()
+	derpAddr := ep.derpAddr
+	ep.mu.Unlock()
+	epDisco := ep.disco.Load()
+	if epDisco == nil || !derpAddr.IsValid() {
+		return
+	}
+	callMeMaybeVia := &disco.CallMeMaybeVia{
+		ServerDisco:         se.ServerDisco,
+		LamportID:           se.LamportID,
+		VNI:                 se.VNI,
+		BindLifetime:        se.BindLifetime.Duration,
+		SteadyStateLifetime: se.SteadyStateLifetime.Duration,
+		AddrPorts:           se.AddrPorts,
+	}
+	ep.c.sendDiscoMessage(epAddr{ap: derpAddr}, ep.publicKey, epDisco.key, callMeMaybeVia, discoVerboseLog)
+}
+
 func (r *relayManager) handshakeServerEndpoint(work *relayHandshakeWork) {
 	done := relayEndpointHandshakeWorkDoneEvent{work: work}
 	r.ensureDiscoInfoFor(work)
@@ -779,8 +808,9 @@ func (r *relayManager) allocateSingleServer(ctx context.Context, wg *sync.WaitGr
 		se, err := doAllocate(ctx, server, [2]key.DiscoPublic{ep.c.discoPublic, remoteDisco.key})
 		if err == nil {
 			relayManagerInputEvent(r, ctx, &r.newServerEndpointCh, newRelayServerEndpointEvent{
-				ep: ep,
-				se: se,
+				ep:     ep,
+				se:     se,
+				server: server, // we allocated this endpoint (vs CallMeMaybeVia reception), mark it as such
 			})
 			return
 		}