|
|
@@ -8,6 +8,7 @@ import (
|
|
|
"context"
|
|
|
"crypto/tls"
|
|
|
"encoding/json"
|
|
|
+ "errors"
|
|
|
"flag"
|
|
|
"fmt"
|
|
|
"maps"
|
|
|
@@ -18,11 +19,13 @@ import (
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
+ "testing/synctest"
|
|
|
"time"
|
|
|
|
|
|
"tailscale.com/derp"
|
|
|
"tailscale.com/derp/derphttp"
|
|
|
"tailscale.com/derp/derpserver"
|
|
|
+ "tailscale.com/net/memnet"
|
|
|
"tailscale.com/net/netmon"
|
|
|
"tailscale.com/net/netx"
|
|
|
"tailscale.com/tailcfg"
|
|
|
@@ -224,24 +227,21 @@ func TestPing(t *testing.T) {
|
|
|
|
|
|
const testMeshKey = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
|
|
|
|
|
-func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server) {
|
|
|
+func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server, ln *memnet.Listener) {
|
|
|
s = derpserver.New(k, t.Logf)
|
|
|
httpsrv := &http.Server{
|
|
|
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
|
|
|
Handler: derpserver.Handler(s),
|
|
|
}
|
|
|
|
|
|
- ln, err := net.Listen("tcp4", "localhost:0")
|
|
|
- if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
- }
|
|
|
+ ln = memnet.Listen("localhost:0")
|
|
|
+
|
|
|
serverURL = "http://" + ln.Addr().String()
|
|
|
s.SetMeshKey(testMeshKey)
|
|
|
|
|
|
go func() {
|
|
|
if err := httpsrv.Serve(ln); err != nil {
|
|
|
- if err == http.ErrServerClosed {
|
|
|
- t.Logf("server closed")
|
|
|
+ if errors.Is(err, net.ErrClosed) {
|
|
|
return
|
|
|
}
|
|
|
panic(err)
|
|
|
@@ -250,7 +250,7 @@ func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpse
|
|
|
return
|
|
|
}
|
|
|
|
|
|
-func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string) (c *derphttp.Client) {
|
|
|
+func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string, ln *memnet.Listener) (c *derphttp.Client) {
|
|
|
c, err := derphttp.NewClient(watcherPrivateKey, serverToWatchURL, t.Logf, netmon.NewStatic())
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
@@ -260,6 +260,7 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
c.MeshKey = k
|
|
|
+ c.SetURLDialer(ln.Dial)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
@@ -267,170 +268,171 @@ func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToW
|
|
|
// updates after a different thread breaks and reconnects the connection, while
|
|
|
// the watcher is waiting on recv().
|
|
|
func TestBreakWatcherConnRecv(t *testing.T) {
|
|
|
- // TODO(bradfitz): use synctest + memnet instead
|
|
|
-
|
|
|
- // Set the wait time before a retry after connection failure to be much lower.
|
|
|
- // This needs to be early in the test, for defer to run right at the end after
|
|
|
- // the DERP client has finished.
|
|
|
- tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
|
|
|
-
|
|
|
- var wg sync.WaitGroup
|
|
|
- // Make the watcher server
|
|
|
- serverPrivateKey1 := key.NewNode()
|
|
|
- _, s1 := newTestServer(t, serverPrivateKey1)
|
|
|
- defer s1.Close()
|
|
|
-
|
|
|
- // Make the watched server
|
|
|
- serverPrivateKey2 := key.NewNode()
|
|
|
- serverURL2, s2 := newTestServer(t, serverPrivateKey2)
|
|
|
- defer s2.Close()
|
|
|
-
|
|
|
- // Make the watcher (but it is not connected yet)
|
|
|
- watcher := newWatcherClient(t, serverPrivateKey1, serverURL2)
|
|
|
- defer watcher.Close()
|
|
|
+ synctest.Test(t, func(t *testing.T) {
|
|
|
+ // Set the wait time before a retry after connection failure to be much lower.
|
|
|
+ // This needs to be early in the test, for defer to run right at the end after
|
|
|
+ // the DERP client has finished.
|
|
|
+ tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ // Make the watcher server
|
|
|
+ serverPrivateKey1 := key.NewNode()
|
|
|
+ _, s1, ln1 := newTestServer(t, serverPrivateKey1)
|
|
|
+ defer s1.Close()
|
|
|
+ defer ln1.Close()
|
|
|
+
|
|
|
+ // Make the watched server
|
|
|
+ serverPrivateKey2 := key.NewNode()
|
|
|
+ serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2)
|
|
|
+ defer s2.Close()
|
|
|
+ defer ln2.Close()
|
|
|
+
|
|
|
+ // Make the watcher (but it is not connected yet)
|
|
|
+ watcher := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2)
|
|
|
+ defer watcher.Close()
|
|
|
+
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ defer cancel()
|
|
|
+
|
|
|
+ watcherChan := make(chan int, 1)
|
|
|
+ defer close(watcherChan)
|
|
|
+ errChan := make(chan error, 1)
|
|
|
+
|
|
|
+ // Start the watcher thread (which connects to the watched server)
|
|
|
+ wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ var peers int
|
|
|
+ add := func(m derp.PeerPresentMessage) {
|
|
|
+ t.Logf("add: %v", m.Key.ShortString())
|
|
|
+ peers++
|
|
|
+ // Signal that the watcher has run
|
|
|
+ watcherChan <- peers
|
|
|
+ }
|
|
|
+ remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
|
|
|
+ notifyErr := func(err error) {
|
|
|
+ select {
|
|
|
+ case errChan <- err:
|
|
|
+ case <-ctx.Done():
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
- defer cancel()
|
|
|
+ watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr)
|
|
|
+ }()
|
|
|
|
|
|
- watcherChan := make(chan int, 1)
|
|
|
- defer close(watcherChan)
|
|
|
- errChan := make(chan error, 1)
|
|
|
+ synctest.Wait()
|
|
|
|
|
|
- // Start the watcher thread (which connects to the watched server)
|
|
|
- wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
|
|
|
- go func() {
|
|
|
- defer wg.Done()
|
|
|
- var peers int
|
|
|
- add := func(m derp.PeerPresentMessage) {
|
|
|
- t.Logf("add: %v", m.Key.ShortString())
|
|
|
- peers++
|
|
|
- // Signal that the watcher has run
|
|
|
- watcherChan <- peers
|
|
|
- }
|
|
|
- remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
|
|
|
- notifyErr := func(err error) {
|
|
|
+ // Wait for the watcher to run, then break the connection and check if it
|
|
|
+ // reconnected and received peer updates.
|
|
|
+ for range 10 {
|
|
|
select {
|
|
|
- case errChan <- err:
|
|
|
- case <-ctx.Done():
|
|
|
+ case peers := <-watcherChan:
|
|
|
+ if peers != 1 {
|
|
|
+ t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers)
|
|
|
+ }
|
|
|
+ case err := <-errChan:
|
|
|
+ if err.Error() != "derp.Recv: EOF" {
|
|
|
+ t.Fatalf("expected notifyError connection error to be EOF, got %v", err)
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr)
|
|
|
- }()
|
|
|
|
|
|
- timer := time.NewTimer(5 * time.Second)
|
|
|
- defer timer.Stop()
|
|
|
+ synctest.Wait()
|
|
|
|
|
|
- // Wait for the watcher to run, then break the connection and check if it
|
|
|
- // reconnected and received peer updates.
|
|
|
- for range 10 {
|
|
|
- select {
|
|
|
- case peers := <-watcherChan:
|
|
|
- if peers != 1 {
|
|
|
- t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers)
|
|
|
- }
|
|
|
- case err := <-errChan:
|
|
|
- if !strings.Contains(err.Error(), "use of closed network connection") {
|
|
|
- t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err)
|
|
|
- }
|
|
|
- case <-timer.C:
|
|
|
- t.Fatalf("watcher did not process the peer update")
|
|
|
+ watcher.BreakConnection(watcher)
|
|
|
+ // re-establish connection by sending a packet
|
|
|
+ watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
|
|
|
}
|
|
|
- timer.Reset(5 * time.Second)
|
|
|
- watcher.BreakConnection(watcher)
|
|
|
- // re-establish connection by sending a packet
|
|
|
- watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
|
|
|
- }
|
|
|
- cancel() // Cancel the context to stop the watcher loop.
|
|
|
- wg.Wait()
|
|
|
+ cancel() // Cancel the context to stop the watcher loop.
|
|
|
+ wg.Wait()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
// Test that a watcher connection successfully reconnects and processes peer
|
|
|
// updates after a different thread breaks and reconnects the connection, while
|
|
|
// the watcher is not waiting on recv().
|
|
|
func TestBreakWatcherConn(t *testing.T) {
|
|
|
- // TODO(bradfitz): use synctest + memnet instead
|
|
|
-
|
|
|
- // Set the wait time before a retry after connection failure to be much lower.
|
|
|
- // This needs to be early in the test, for defer to run right at the end after
|
|
|
- // the DERP client has finished.
|
|
|
- tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
|
|
|
-
|
|
|
- var wg sync.WaitGroup
|
|
|
- // Make the watcher server
|
|
|
- serverPrivateKey1 := key.NewNode()
|
|
|
- _, s1 := newTestServer(t, serverPrivateKey1)
|
|
|
- defer s1.Close()
|
|
|
-
|
|
|
- // Make the watched server
|
|
|
- serverPrivateKey2 := key.NewNode()
|
|
|
- serverURL2, s2 := newTestServer(t, serverPrivateKey2)
|
|
|
- defer s2.Close()
|
|
|
-
|
|
|
- // Make the watcher (but it is not connected yet)
|
|
|
- watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2)
|
|
|
- defer watcher1.Close()
|
|
|
+ synctest.Test(t, func(t *testing.T) {
|
|
|
+ // Set the wait time before a retry after connection failure to be much lower.
|
|
|
+ // This needs to be early in the test, for defer to run right at the end after
|
|
|
+ // the DERP client has finished.
|
|
|
+ tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
|
|
|
+
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ // Make the watcher server
|
|
|
+ serverPrivateKey1 := key.NewNode()
|
|
|
+ _, s1, ln1 := newTestServer(t, serverPrivateKey1)
|
|
|
+ defer s1.Close()
|
|
|
+ defer ln1.Close()
|
|
|
+
|
|
|
+ // Make the watched server
|
|
|
+ serverPrivateKey2 := key.NewNode()
|
|
|
+ serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2)
|
|
|
+ defer s2.Close()
|
|
|
+ defer ln2.Close()
|
|
|
+
|
|
|
+ // Make the watcher (but it is not connected yet)
|
|
|
+ watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2)
|
|
|
+ defer watcher1.Close()
|
|
|
+
|
|
|
+ ctx, cancel := context.WithCancel(context.Background())
|
|
|
+
|
|
|
+ watcherChan := make(chan int, 1)
|
|
|
+ breakerChan := make(chan bool, 1)
|
|
|
+ errorChan := make(chan error, 1)
|
|
|
+
|
|
|
+ // Start the watcher thread (which connects to the watched server)
|
|
|
+ wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
|
|
|
+ go func() {
|
|
|
+ defer wg.Done()
|
|
|
+ var peers int
|
|
|
+ add := func(m derp.PeerPresentMessage) {
|
|
|
+ t.Logf("add: %v", m.Key.ShortString())
|
|
|
+ peers++
|
|
|
+ // Signal that the watcher has run
|
|
|
+ watcherChan <- peers
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return
|
|
|
+ // Wait for breaker to run
|
|
|
+ case <-breakerChan:
|
|
|
+ }
|
|
|
+ }
|
|
|
+ remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
|
|
|
+ notifyError := func(err error) {
|
|
|
+ errorChan <- err
|
|
|
+ }
|
|
|
|
|
|
- ctx, cancel := context.WithCancel(context.Background())
|
|
|
+ watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError)
|
|
|
+ }()
|
|
|
|
|
|
- watcherChan := make(chan int, 1)
|
|
|
- breakerChan := make(chan bool, 1)
|
|
|
- errorChan := make(chan error, 1)
|
|
|
+ synctest.Wait()
|
|
|
|
|
|
- // Start the watcher thread (which connects to the watched server)
|
|
|
- wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
|
|
|
- go func() {
|
|
|
- defer wg.Done()
|
|
|
- var peers int
|
|
|
- add := func(m derp.PeerPresentMessage) {
|
|
|
- t.Logf("add: %v", m.Key.ShortString())
|
|
|
- peers++
|
|
|
- // Signal that the watcher has run
|
|
|
- watcherChan <- peers
|
|
|
+ // Wait for the watcher to run, then break the connection and check if it
|
|
|
+ // reconnected and received peer updates.
|
|
|
+ for range 10 {
|
|
|
select {
|
|
|
- case <-ctx.Done():
|
|
|
- return
|
|
|
- // Wait for breaker to run
|
|
|
- case <-breakerChan:
|
|
|
+ case peers := <-watcherChan:
|
|
|
+ if peers != 1 {
|
|
|
+ t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers)
|
|
|
+ }
|
|
|
+ case err := <-errorChan:
|
|
|
+ if !errors.Is(err, net.ErrClosed) {
|
|
|
+ t.Fatalf("expected notifyError connection error to fail with ErrClosed, got %v", err)
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
|
|
|
- notifyError := func(err error) {
|
|
|
- errorChan <- err
|
|
|
- }
|
|
|
-
|
|
|
- watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError)
|
|
|
- }()
|
|
|
|
|
|
- timer := time.NewTimer(5 * time.Second)
|
|
|
- defer timer.Stop()
|
|
|
+ synctest.Wait()
|
|
|
|
|
|
- // Wait for the watcher to run, then break the connection and check if it
|
|
|
- // reconnected and received peer updates.
|
|
|
- for range 10 {
|
|
|
- select {
|
|
|
- case peers := <-watcherChan:
|
|
|
- if peers != 1 {
|
|
|
- t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers)
|
|
|
- }
|
|
|
- case err := <-errorChan:
|
|
|
- if !strings.Contains(err.Error(), "use of closed network connection") {
|
|
|
- t.Fatalf("expected notifyError connection error to contain 'use of closed network connection', got %v", err)
|
|
|
- }
|
|
|
- case <-timer.C:
|
|
|
- t.Fatalf("watcher did not process the peer update")
|
|
|
+ watcher1.BreakConnection(watcher1)
|
|
|
+ // re-establish connection by sending a packet
|
|
|
+ watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
|
|
|
+ // signal that the breaker is done
|
|
|
+ breakerChan <- true
|
|
|
}
|
|
|
- watcher1.BreakConnection(watcher1)
|
|
|
- // re-establish connection by sending a packet
|
|
|
- watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
|
|
|
- // signal that the breaker is done
|
|
|
- breakerChan <- true
|
|
|
-
|
|
|
- timer.Reset(5 * time.Second)
|
|
|
- }
|
|
|
- watcher1.Close()
|
|
|
- cancel()
|
|
|
- wg.Wait()
|
|
|
+ watcher1.Close()
|
|
|
+ cancel()
|
|
|
+ wg.Wait()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
func noopAdd(derp.PeerPresentMessage) {}
|
|
|
@@ -444,12 +446,13 @@ func TestRunWatchConnectionLoopServeConnect(t *testing.T) {
|
|
|
defer cancel()
|
|
|
|
|
|
priv := key.NewNode()
|
|
|
- serverURL, s := newTestServer(t, priv)
|
|
|
+ serverURL, s, ln := newTestServer(t, priv)
|
|
|
defer s.Close()
|
|
|
+ defer ln.Close()
|
|
|
|
|
|
pub := priv.Public()
|
|
|
|
|
|
- watcher := newWatcherClient(t, priv, serverURL)
|
|
|
+ watcher := newWatcherClient(t, priv, serverURL, ln)
|
|
|
defer watcher.Close()
|
|
|
|
|
|
// Test connecting to ourselves, and that we get hung up on.
|
|
|
@@ -518,13 +521,14 @@ func TestNotifyError(t *testing.T) {
|
|
|
defer cancel()
|
|
|
|
|
|
priv := key.NewNode()
|
|
|
- serverURL, s := newTestServer(t, priv)
|
|
|
+ serverURL, s, ln := newTestServer(t, priv)
|
|
|
defer s.Close()
|
|
|
+ defer ln.Close()
|
|
|
|
|
|
pub := priv.Public()
|
|
|
|
|
|
// Test early error notification when c.connect fails.
|
|
|
- watcher := newWatcherClient(t, priv, serverURL)
|
|
|
+ watcher := newWatcherClient(t, priv, serverURL, ln)
|
|
|
watcher.SetURLDialer(netx.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
|
t.Helper()
|
|
|
return nil, fmt.Errorf("test error: %s", addr)
|