|
|
@@ -7,6 +7,7 @@ package testcontrol
|
|
|
|
|
|
import (
|
|
|
"bytes"
|
|
|
+ "context"
|
|
|
crand "crypto/rand"
|
|
|
"encoding/binary"
|
|
|
"encoding/json"
|
|
|
@@ -46,6 +47,7 @@ type Server struct {
|
|
|
mux *http.ServeMux
|
|
|
|
|
|
mu sync.Mutex
|
|
|
+ cond *sync.Cond // lazily initialized by condLocked
|
|
|
pubKey wgkey.Key
|
|
|
privKey wgkey.Private
|
|
|
nodes map[tailcfg.NodeKey]*tailcfg.Node
|
|
|
@@ -68,6 +70,47 @@ func (s *Server) NumNodes() int {
|
|
|
return len(s.nodes)
|
|
|
}
|
|
|
|
|
|
+// condLocked lazily initializes and returns s.cond.
|
|
|
+// s.mu must be held.
|
|
|
+func (s *Server) condLocked() *sync.Cond {
|
|
|
+ if s.cond == nil {
|
|
|
+ s.cond = sync.NewCond(&s.mu)
|
|
|
+ }
|
|
|
+ return s.cond
|
|
|
+}
|
|
|
+
|
|
|
+// AwaitNodeInMapRequest waits for node k to be stuck in a map poll.
|
|
|
+// It returns an error if and only if the context is done first.
|
|
|
+func (s *Server) AwaitNodeInMapRequest(ctx context.Context, k tailcfg.NodeKey) error {
|
|
|
+ s.mu.Lock()
|
|
|
+ defer s.mu.Unlock()
|
|
|
+ cond := s.condLocked()
|
|
|
+
|
|
|
+ done := make(chan struct{})
|
|
|
+ defer close(done)
|
|
|
+ go func() {
|
|
|
+ select {
|
|
|
+ case <-done:
|
|
|
+ case <-ctx.Done():
|
|
|
+ cond.Broadcast()
|
|
|
+ }
|
|
|
+ }()
|
|
|
+
|
|
|
+ for {
|
|
|
+ node := s.nodeLocked(k)
|
|
|
+ if node == nil {
|
|
|
+ return errors.New("unknown node key")
|
|
|
+ }
|
|
|
+ if _, ok := s.updates[node.ID]; ok {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ cond.Wait()
|
|
|
+ if err := ctx.Err(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// AddPingRequest sends the ping pr to nodeKeyDst. It reports whether it did so. That is,
|
|
|
// it reports whether nodeKeyDst was connected.
|
|
|
func (s *Server) AddPingRequest(nodeKeyDst tailcfg.NodeKey, pr *tailcfg.PingRequest) bool {
|
|
|
@@ -85,8 +128,7 @@ func (s *Server) AddPingRequest(nodeKeyDst tailcfg.NodeKey, pr *tailcfg.PingRequ
|
|
|
s.pingReqsToAdd[nodeKeyDst] = pr
|
|
|
nodeID := node.ID
|
|
|
oldUpdatesCh := s.updates[nodeID]
|
|
|
- sendUpdate(oldUpdatesCh, updateDebugInjection)
|
|
|
- return true
|
|
|
+ return sendUpdate(oldUpdatesCh, updateDebugInjection)
|
|
|
}
|
|
|
|
|
|
type AuthPath struct {
|
|
|
@@ -414,17 +456,19 @@ func (s *Server) updateLocked(source string, peers []tailcfg.NodeID) {
|
|
|
}
|
|
|
|
|
|
// sendUpdate sends updateType to dst if dst is non-nil and
|
|
|
-// has capacity.
|
|
|
-func sendUpdate(dst chan<- updateType, updateType updateType) {
|
|
|
+// has capacity. It reports whether a value was sent.
|
|
|
+func sendUpdate(dst chan<- updateType, updateType updateType) bool {
|
|
|
if dst == nil {
|
|
|
- return
|
|
|
+ return false
|
|
|
}
|
|
|
// The dst channel has a buffer size of 1.
|
|
|
// If we fail to insert an update into the buffer that
|
|
|
// means there is already an update pending.
|
|
|
select {
|
|
|
case dst <- updateType:
|
|
|
+ return true
|
|
|
default:
|
|
|
+ return false
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -489,6 +533,7 @@ func (s *Server) serveMap(w http.ResponseWriter, r *http.Request, mkey tailcfg.M
|
|
|
sendUpdate(oldUpdatesCh, updateSelfChanged)
|
|
|
}
|
|
|
s.updateLocked("serveMap", peersToUpdate)
|
|
|
+ s.condLocked().Broadcast()
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
// ReadOnly implies no streaming, as it doesn't
|