Browse Source

ipn/ipnlocal,wgengine/magicsock: wait for magicsock to process pending events on authReconfig

Updates #16369

Signed-off-by: Nick Khyl <[email protected]>
Nick Khyl 8 months ago
parent
commit
9e28bfc69c
4 changed files with 95 additions and 1 deletions
  1. 5 0
      ipn/ipnlocal/local.go
  2. 6 0
      ipn/ipnlocal/local_test.go
  3. 50 1
      ipn/ipnlocal/state_test.go
  4. 34 0
      wgengine/magicsock/magicsock.go

+ 5 - 0
ipn/ipnlocal/local.go

@@ -4853,6 +4853,11 @@ func (b *LocalBackend) readvertiseAppConnectorRoutes() {
 // updates are not currently blocked, based on the cached netmap and
 // user prefs.
 func (b *LocalBackend) authReconfig() {
+	// Wait for magicsock to process pending [eventbus] events,
+	// such as netmap updates. This should be completed before
+	// wireguard-go is reconfigured. See tailscale/tailscale#16369.
+	b.MagicConn().Synchronize()
+
 	b.mu.Lock()
 	blocked := b.blocked
 	prefs := b.pm.CurrentPrefs()

+ 6 - 0
ipn/ipnlocal/local_test.go

@@ -85,6 +85,12 @@ func makeNodeKeyFromID(nodeID tailcfg.NodeID) key.NodePublic {
 	return key.NodePublicFromRaw32(memro.B(raw))
 }
 
+func makeDiscoKeyFromID(nodeID tailcfg.NodeID) (ret key.DiscoPublic) {
+	raw := make([]byte, 32)
+	binary.BigEndian.PutUint64(raw[24:], uint64(nodeID))
+	return key.DiscoPublicFromRaw32(memro.B(raw))
+}
+
 func TestShrinkDefaultRoute(t *testing.T) {
 	tests := []struct {
 		route     string

+ 50 - 1
ipn/ipnlocal/state_test.go

@@ -1114,6 +1114,8 @@ func TestEngineReconfigOnStateChange(t *testing.T) {
 	disconnect := &ipn.MaskedPrefs{Prefs: ipn.Prefs{WantRunning: false}, WantRunningSet: true}
 	node1 := testNetmapForNode(1, "node-1", []netip.Prefix{netip.MustParsePrefix("100.64.1.1/32")})
 	node2 := testNetmapForNode(2, "node-2", []netip.Prefix{netip.MustParsePrefix("100.64.1.2/32")})
+	node3 := testNetmapForNode(3, "node-3", []netip.Prefix{netip.MustParsePrefix("100.64.1.3/32")})
+	node3.Peers = []tailcfg.NodeView{node1.SelfNode, node2.SelfNode}
 	routesWithQuad100 := func(extra ...netip.Prefix) []netip.Prefix {
 		return append(extra, netip.MustParsePrefix("100.100.100.100/32"))
 	}
@@ -1308,6 +1310,40 @@ func TestEngineReconfigOnStateChange(t *testing.T) {
 				Hosts:  hostsFor(node1),
 			},
 		},
+		{
+			name: "Start/Connect/Login/WithPeers",
+			steps: func(t *testing.T, lb *LocalBackend, cc func() *mockControl) {
+				mustDo(t)(lb.Start(ipn.Options{}))
+				mustDo2(t)(lb.EditPrefs(connect))
+				cc().authenticated(node3)
+			},
+			wantState: ipn.Starting,
+			wantCfg: &wgcfg.Config{
+				Name:   "tailscale",
+				NodeID: node3.SelfNode.StableID(),
+				Peers: []wgcfg.Peer{
+					{
+						PublicKey: node1.SelfNode.Key(),
+						DiscoKey:  node1.SelfNode.DiscoKey(),
+					},
+					{
+						PublicKey: node2.SelfNode.Key(),
+						DiscoKey:  node2.SelfNode.DiscoKey(),
+					},
+				},
+				Addresses: node3.SelfNode.Addresses().AsSlice(),
+			},
+			wantRouterCfg: &router.Config{
+				SNATSubnetRoutes: true,
+				NetfilterMode:    preftype.NetfilterOn,
+				LocalAddrs:       node3.SelfNode.Addresses().AsSlice(),
+				Routes:           routesWithQuad100(),
+			},
+			wantDNSCfg: &dns.Config{
+				Routes: map[dnsname.FQDN][]*dnstype.Resolver{},
+				Hosts:  hostsFor(node3),
+			},
+		},
 	}
 
 	for _, tt := range tests {
@@ -1322,8 +1358,18 @@ func TestEngineReconfigOnStateChange(t *testing.T) {
 				t.Errorf("State: got %v; want %v", gotState, tt.wantState)
 			}
 
+			if engine.Config() != nil {
+				for _, p := range engine.Config().Peers {
+					pKey := p.PublicKey.UntypedHexString()
+					_, err := lb.MagicConn().ParseEndpoint(pKey)
+					if err != nil {
+						t.Errorf("ParseEndpoint(%q) failed: %v", pKey, err)
+					}
+				}
+			}
+
 			opts := []cmp.Option{
-				cmpopts.EquateComparable(key.NodePublic{}, netip.Addr{}, netip.Prefix{}),
+				cmpopts.EquateComparable(key.NodePublic{}, key.DiscoPublic{}, netip.Addr{}, netip.Prefix{}),
 			}
 			if diff := cmp.Diff(tt.wantCfg, engine.Config(), opts...); diff != "" {
 				t.Errorf("wgcfg.Config(+got -want): %v", diff)
@@ -1356,6 +1402,8 @@ func testNetmapForNode(userID tailcfg.UserID, name string, addresses []netip.Pre
 		Addresses:         addresses,
 		MachineAuthorized: true,
 	}
+	self.Key = makeNodeKeyFromID(self.ID)
+	self.DiscoKey = makeDiscoKeyFromID(self.ID)
 	return &netmap.NetworkMap{
 		SelfNode: self.View(),
 		Name:     self.Name,
@@ -1403,6 +1451,7 @@ func newLocalBackendWithMockEngineAndControl(t *testing.T, enableLogging bool) (
 
 	magicConn, err := magicsock.NewConn(magicsock.Options{
 		Logf:              logf,
+		EventBus:          sys.Bus.Get(),
 		NetMon:            dialer.NetMon(),
 		Metrics:           sys.UserMetricsRegistry(),
 		HealthTracker:     sys.HealthTracker(),

+ 34 - 0
wgengine/magicsock/magicsock.go

@@ -167,6 +167,8 @@ type Conn struct {
 	filterSub    *eventbus.Subscriber[FilterUpdate]
 	nodeViewsSub *eventbus.Subscriber[NodeViewsUpdate]
 	nodeMutsSub  *eventbus.Subscriber[NodeMutationsUpdate]
+	syncSub      *eventbus.Subscriber[syncPoint]
+	syncPub      *eventbus.Publisher[syncPoint]
 	subsDoneCh   chan struct{} // closed when consumeEventbusTopics returns
 
 	// pconn4 and pconn6 are the underlying UDP sockets used to
@@ -538,6 +540,21 @@ type FilterUpdate struct {
 	*filter.Filter
 }
 
+// syncPoint is an event published over an [eventbus.Bus] by [Conn.Synchronize].
+// It serves as a synchronization point, allowing to wait until magicsock
+// has processed all pending events.
+type syncPoint chan struct{}
+
+// Wait blocks until [syncPoint.Signal] is called.
+func (s syncPoint) Wait() {
+	<-s
+}
+
+// Signal signals the sync point, unblocking the [syncPoint.Wait] call.
+func (s syncPoint) Signal() {
+	close(s)
+}
+
 // newConn is the error-free, network-listening-side-effect-free based
 // of NewConn. Mostly for tests.
 func newConn(logf logger.Logf) *Conn {
@@ -593,10 +610,25 @@ func (c *Conn) consumeEventbusTopics() {
 			c.onNodeViewsUpdate(nodeViews)
 		case nodeMuts := <-c.nodeMutsSub.Events():
 			c.onNodeMutationsUpdate(nodeMuts)
+		case syncPoint := <-c.syncSub.Events():
+			c.dlogf("magicsock: received sync point after reconfig")
+			syncPoint.Signal()
 		}
 	}
 }
 
+// Synchronize waits for all [eventbus] events published
+// prior to this call to be processed by the receiver.
+func (c *Conn) Synchronize() {
+	if c.syncPub == nil {
+		// Eventbus is not used; no need to synchronize (in certain tests).
+		return
+	}
+	sp := syncPoint(make(chan struct{}))
+	c.syncPub.Publish(sp)
+	sp.Wait()
+}
+
 // NewConn creates a magic Conn listening on opts.Port.
 // As the set of possible endpoints for a Conn changes, the
 // callback opts.EndpointsFunc is called.
@@ -624,6 +656,8 @@ func NewConn(opts Options) (*Conn, error) {
 		c.filterSub = eventbus.Subscribe[FilterUpdate](c.eventClient)
 		c.nodeViewsSub = eventbus.Subscribe[NodeViewsUpdate](c.eventClient)
 		c.nodeMutsSub = eventbus.Subscribe[NodeMutationsUpdate](c.eventClient)
+		c.syncSub = eventbus.Subscribe[syncPoint](c.eventClient)
+		c.syncPub = eventbus.Publish[syncPoint](c.eventClient)
 		c.subsDoneCh = make(chan struct{})
 		go c.consumeEventbusTopics()
 	}