derphttp_test.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package derphttp_test
  4. import (
  5. "bytes"
  6. "context"
  7. "crypto/tls"
  8. "encoding/json"
  9. "errors"
  10. "flag"
  11. "fmt"
  12. "maps"
  13. "net"
  14. "net/http"
  15. "net/http/httptest"
  16. "slices"
  17. "strings"
  18. "sync"
  19. "testing"
  20. "testing/synctest"
  21. "time"
  22. "tailscale.com/derp"
  23. "tailscale.com/derp/derphttp"
  24. "tailscale.com/derp/derpserver"
  25. "tailscale.com/net/memnet"
  26. "tailscale.com/net/netmon"
  27. "tailscale.com/net/netx"
  28. "tailscale.com/tailcfg"
  29. "tailscale.com/tstest"
  30. "tailscale.com/types/key"
  31. )
  32. func TestSendRecv(t *testing.T) {
  33. serverPrivateKey := key.NewNode()
  34. netMon := netmon.NewStatic()
  35. const numClients = 3
  36. var clientPrivateKeys []key.NodePrivate
  37. var clientKeys []key.NodePublic
  38. for range numClients {
  39. priv := key.NewNode()
  40. clientPrivateKeys = append(clientPrivateKeys, priv)
  41. clientKeys = append(clientKeys, priv.Public())
  42. }
  43. s := derpserver.New(serverPrivateKey, t.Logf)
  44. defer s.Close()
  45. httpsrv := &http.Server{
  46. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  47. Handler: derpserver.Handler(s),
  48. }
  49. ln, err := net.Listen("tcp4", "localhost:0")
  50. if err != nil {
  51. t.Fatal(err)
  52. }
  53. serverURL := "http://" + ln.Addr().String()
  54. t.Logf("server URL: %s", serverURL)
  55. go func() {
  56. if err := httpsrv.Serve(ln); err != nil {
  57. if err == http.ErrServerClosed {
  58. return
  59. }
  60. panic(err)
  61. }
  62. }()
  63. var clients []*derphttp.Client
  64. var recvChs []chan []byte
  65. done := make(chan struct{})
  66. var wg sync.WaitGroup
  67. defer func() {
  68. close(done)
  69. for _, c := range clients {
  70. c.Close()
  71. }
  72. wg.Wait()
  73. }()
  74. for i := range numClients {
  75. key := clientPrivateKeys[i]
  76. c, err := derphttp.NewClient(key, serverURL, t.Logf, netMon)
  77. if err != nil {
  78. t.Fatalf("client %d: %v", i, err)
  79. }
  80. if err := c.Connect(context.Background()); err != nil {
  81. t.Fatalf("client %d Connect: %v", i, err)
  82. }
  83. waitConnect(t, c)
  84. clients = append(clients, c)
  85. recvChs = append(recvChs, make(chan []byte))
  86. wg.Add(1)
  87. go func(i int) {
  88. defer wg.Done()
  89. for {
  90. select {
  91. case <-done:
  92. return
  93. default:
  94. }
  95. m, err := c.Recv()
  96. if err != nil {
  97. select {
  98. case <-done:
  99. return
  100. default:
  101. }
  102. t.Logf("client%d: %v", i, err)
  103. break
  104. }
  105. switch m := m.(type) {
  106. default:
  107. t.Errorf("unexpected message type %T", m)
  108. continue
  109. case derp.PeerGoneMessage:
  110. // Ignore.
  111. case derp.ReceivedPacket:
  112. recvChs[i] <- bytes.Clone(m.Data)
  113. }
  114. }
  115. }(i)
  116. }
  117. recv := func(i int, want string) {
  118. t.Helper()
  119. select {
  120. case b := <-recvChs[i]:
  121. if got := string(b); got != want {
  122. t.Errorf("client1.Recv=%q, want %q", got, want)
  123. }
  124. case <-time.After(5 * time.Second):
  125. t.Errorf("client%d.Recv, got nothing, want %q", i, want)
  126. }
  127. }
  128. recvNothing := func(i int) {
  129. t.Helper()
  130. select {
  131. case b := <-recvChs[0]:
  132. t.Errorf("client%d.Recv=%q, want nothing", i, string(b))
  133. default:
  134. }
  135. }
  136. msg1 := []byte("hello 0->1\n")
  137. if err := clients[0].Send(clientKeys[1], msg1); err != nil {
  138. t.Fatal(err)
  139. }
  140. recv(1, string(msg1))
  141. recvNothing(0)
  142. recvNothing(2)
  143. msg2 := []byte("hello 1->2\n")
  144. if err := clients[1].Send(clientKeys[2], msg2); err != nil {
  145. t.Fatal(err)
  146. }
  147. recv(2, string(msg2))
  148. recvNothing(0)
  149. recvNothing(1)
  150. }
  151. func waitConnect(t testing.TB, c *derphttp.Client) {
  152. t.Helper()
  153. if m, err := c.Recv(); err != nil {
  154. t.Fatalf("client first Recv: %v", err)
  155. } else if v, ok := m.(derp.ServerInfoMessage); !ok {
  156. t.Fatalf("client first Recv was unexpected type %T", v)
  157. }
  158. }
  159. func TestPing(t *testing.T) {
  160. serverPrivateKey := key.NewNode()
  161. s := derpserver.New(serverPrivateKey, t.Logf)
  162. defer s.Close()
  163. httpsrv := &http.Server{
  164. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  165. Handler: derpserver.Handler(s),
  166. }
  167. ln, err := net.Listen("tcp4", "localhost:0")
  168. if err != nil {
  169. t.Fatal(err)
  170. }
  171. serverURL := "http://" + ln.Addr().String()
  172. t.Logf("server URL: %s", serverURL)
  173. go func() {
  174. if err := httpsrv.Serve(ln); err != nil {
  175. if err == http.ErrServerClosed {
  176. return
  177. }
  178. panic(err)
  179. }
  180. }()
  181. c, err := derphttp.NewClient(key.NewNode(), serverURL, t.Logf, netmon.NewStatic())
  182. if err != nil {
  183. t.Fatalf("NewClient: %v", err)
  184. }
  185. defer c.Close()
  186. if err := c.Connect(context.Background()); err != nil {
  187. t.Fatalf("client Connect: %v", err)
  188. }
  189. errc := make(chan error, 1)
  190. go func() {
  191. for {
  192. m, err := c.Recv()
  193. if err != nil {
  194. errc <- err
  195. return
  196. }
  197. t.Logf("Recv: %T", m)
  198. }
  199. }()
  200. err = c.Ping(context.Background())
  201. if err != nil {
  202. t.Fatalf("Ping: %v", err)
  203. }
  204. }
  205. const testMeshKey = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
  206. func newTestServer(t *testing.T, k key.NodePrivate) (serverURL string, s *derpserver.Server, ln *memnet.Listener) {
  207. s = derpserver.New(k, t.Logf)
  208. httpsrv := &http.Server{
  209. TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
  210. Handler: derpserver.Handler(s),
  211. }
  212. ln = memnet.Listen("localhost:0")
  213. serverURL = "http://" + ln.Addr().String()
  214. s.SetMeshKey(testMeshKey)
  215. go func() {
  216. if err := httpsrv.Serve(ln); err != nil {
  217. if errors.Is(err, net.ErrClosed) {
  218. return
  219. }
  220. panic(err)
  221. }
  222. }()
  223. return
  224. }
  225. func newWatcherClient(t *testing.T, watcherPrivateKey key.NodePrivate, serverToWatchURL string, ln *memnet.Listener) (c *derphttp.Client) {
  226. c, err := derphttp.NewClient(watcherPrivateKey, serverToWatchURL, t.Logf, netmon.NewStatic())
  227. if err != nil {
  228. t.Fatal(err)
  229. }
  230. k, err := key.ParseDERPMesh(testMeshKey)
  231. if err != nil {
  232. t.Fatal(err)
  233. }
  234. c.MeshKey = k
  235. c.SetURLDialer(ln.Dial)
  236. return
  237. }
  238. // Test that a watcher connection successfully reconnects and processes peer
  239. // updates after a different thread breaks and reconnects the connection, while
  240. // the watcher is waiting on recv().
  241. func TestBreakWatcherConnRecv(t *testing.T) {
  242. synctest.Test(t, func(t *testing.T) {
  243. // Set the wait time before a retry after connection failure to be much lower.
  244. // This needs to be early in the test, for defer to run right at the end after
  245. // the DERP client has finished.
  246. tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
  247. var wg sync.WaitGroup
  248. // Make the watcher server
  249. serverPrivateKey1 := key.NewNode()
  250. _, s1, ln1 := newTestServer(t, serverPrivateKey1)
  251. defer s1.Close()
  252. defer ln1.Close()
  253. // Make the watched server
  254. serverPrivateKey2 := key.NewNode()
  255. serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2)
  256. defer s2.Close()
  257. defer ln2.Close()
  258. // Make the watcher (but it is not connected yet)
  259. watcher := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2)
  260. defer watcher.Close()
  261. ctx, cancel := context.WithCancel(context.Background())
  262. defer cancel()
  263. watcherChan := make(chan int, 1)
  264. defer close(watcherChan)
  265. errChan := make(chan error, 1)
  266. // Start the watcher thread (which connects to the watched server)
  267. wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
  268. go func() {
  269. defer wg.Done()
  270. var peers int
  271. add := func(m derp.PeerPresentMessage) {
  272. t.Logf("add: %v", m.Key.ShortString())
  273. peers++
  274. // Signal that the watcher has run
  275. watcherChan <- peers
  276. }
  277. remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
  278. notifyErr := func(err error) {
  279. select {
  280. case errChan <- err:
  281. case <-ctx.Done():
  282. }
  283. }
  284. watcher.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyErr)
  285. }()
  286. synctest.Wait()
  287. // Wait for the watcher to run, then break the connection and check if it
  288. // reconnected and received peer updates.
  289. for range 10 {
  290. select {
  291. case peers := <-watcherChan:
  292. if peers != 1 {
  293. t.Fatalf("wrong number of peers added during watcher connection: have %d, want 1", peers)
  294. }
  295. case err := <-errChan:
  296. if err.Error() != "derp.Recv: EOF" {
  297. t.Fatalf("expected notifyError connection error to be EOF, got %v", err)
  298. }
  299. }
  300. synctest.Wait()
  301. watcher.BreakConnection(watcher)
  302. // re-establish connection by sending a packet
  303. watcher.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
  304. }
  305. cancel() // Cancel the context to stop the watcher loop.
  306. wg.Wait()
  307. })
  308. }
  309. // Test that a watcher connection successfully reconnects and processes peer
  310. // updates after a different thread breaks and reconnects the connection, while
  311. // the watcher is not waiting on recv().
  312. func TestBreakWatcherConn(t *testing.T) {
  313. synctest.Test(t, func(t *testing.T) {
  314. // Set the wait time before a retry after connection failure to be much lower.
  315. // This needs to be early in the test, for defer to run right at the end after
  316. // the DERP client has finished.
  317. tstest.Replace(t, derphttp.RetryInterval, 50*time.Millisecond)
  318. var wg sync.WaitGroup
  319. // Make the watcher server
  320. serverPrivateKey1 := key.NewNode()
  321. _, s1, ln1 := newTestServer(t, serverPrivateKey1)
  322. defer s1.Close()
  323. defer ln1.Close()
  324. // Make the watched server
  325. serverPrivateKey2 := key.NewNode()
  326. serverURL2, s2, ln2 := newTestServer(t, serverPrivateKey2)
  327. defer s2.Close()
  328. defer ln2.Close()
  329. // Make the watcher (but it is not connected yet)
  330. watcher1 := newWatcherClient(t, serverPrivateKey1, serverURL2, ln2)
  331. defer watcher1.Close()
  332. ctx, cancel := context.WithCancel(context.Background())
  333. watcherChan := make(chan int, 1)
  334. breakerChan := make(chan bool, 1)
  335. errorChan := make(chan error, 1)
  336. // Start the watcher thread (which connects to the watched server)
  337. wg.Add(1) // To avoid using t.Logf after the test ends. See https://golang.org/issue/40343
  338. go func() {
  339. defer wg.Done()
  340. var peers int
  341. add := func(m derp.PeerPresentMessage) {
  342. t.Logf("add: %v", m.Key.ShortString())
  343. peers++
  344. // Signal that the watcher has run
  345. watcherChan <- peers
  346. select {
  347. case <-ctx.Done():
  348. return
  349. // Wait for breaker to run
  350. case <-breakerChan:
  351. }
  352. }
  353. remove := func(m derp.PeerGoneMessage) { t.Logf("remove: %v", m.Peer.ShortString()); peers-- }
  354. notifyError := func(err error) {
  355. errorChan <- err
  356. }
  357. watcher1.RunWatchConnectionLoop(ctx, serverPrivateKey1.Public(), t.Logf, add, remove, notifyError)
  358. }()
  359. synctest.Wait()
  360. // Wait for the watcher to run, then break the connection and check if it
  361. // reconnected and received peer updates.
  362. for range 10 {
  363. select {
  364. case peers := <-watcherChan:
  365. if peers != 1 {
  366. t.Fatalf("wrong number of peers added during watcher connection have %d, want 1", peers)
  367. }
  368. case err := <-errorChan:
  369. if !errors.Is(err, net.ErrClosed) {
  370. t.Fatalf("expected notifyError connection error to fail with ErrClosed, got %v", err)
  371. }
  372. }
  373. synctest.Wait()
  374. watcher1.BreakConnection(watcher1)
  375. // re-establish connection by sending a packet
  376. watcher1.ForwardPacket(key.NodePublic{}, key.NodePublic{}, []byte("bogus"))
  377. // signal that the breaker is done
  378. breakerChan <- true
  379. }
  380. watcher1.Close()
  381. cancel()
  382. wg.Wait()
  383. })
  384. }
  385. func noopAdd(derp.PeerPresentMessage) {}
  386. func noopRemove(derp.PeerGoneMessage) {}
  387. func noopNotifyError(error) {}
  388. func TestRunWatchConnectionLoopServeConnect(t *testing.T) {
  389. defer derphttp.SetTestHookWatchLookConnectResult(nil)
  390. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  391. defer cancel()
  392. priv := key.NewNode()
  393. serverURL, s, ln := newTestServer(t, priv)
  394. defer s.Close()
  395. defer ln.Close()
  396. pub := priv.Public()
  397. watcher := newWatcherClient(t, priv, serverURL, ln)
  398. defer watcher.Close()
  399. // Test connecting to ourselves, and that we get hung up on.
  400. derphttp.SetTestHookWatchLookConnectResult(func(err error, wasSelfConnect bool) bool {
  401. t.Helper()
  402. if err != nil {
  403. t.Fatalf("error connecting to server: %v", err)
  404. }
  405. if !wasSelfConnect {
  406. t.Error("wanted self-connect; wasn't")
  407. }
  408. return false
  409. })
  410. watcher.RunWatchConnectionLoop(ctx, pub, t.Logf, noopAdd, noopRemove, noopNotifyError)
  411. // Test connecting to the server with a zero value for ignoreServerKey,
  412. // so we should always connect.
  413. derphttp.SetTestHookWatchLookConnectResult(func(err error, wasSelfConnect bool) bool {
  414. t.Helper()
  415. if err != nil {
  416. t.Fatalf("error connecting to server: %v", err)
  417. }
  418. if wasSelfConnect {
  419. t.Error("wanted normal connect; got self connect")
  420. }
  421. return false
  422. })
  423. watcher.RunWatchConnectionLoop(ctx, key.NodePublic{}, t.Logf, noopAdd, noopRemove, noopNotifyError)
  424. }
  425. // verify that the LocalAddr method doesn't acquire the mutex.
  426. // See https://github.com/tailscale/tailscale/issues/11519
  427. func TestLocalAddrNoMutex(t *testing.T) {
  428. var c derphttp.Client
  429. _, err := c.LocalAddr()
  430. if got, want := fmt.Sprint(err), "client not connected"; got != want {
  431. t.Errorf("got error %q; want %q", got, want)
  432. }
  433. }
  434. func TestProbe(t *testing.T) {
  435. h := derpserver.Handler(nil)
  436. tests := []struct {
  437. path string
  438. want int
  439. }{
  440. {"/derp/probe", 200},
  441. {"/derp/latency-check", 200},
  442. {"/derp/sdf", http.StatusUpgradeRequired},
  443. }
  444. for _, tt := range tests {
  445. rec := httptest.NewRecorder()
  446. h.ServeHTTP(rec, httptest.NewRequest("GET", tt.path, nil))
  447. if got := rec.Result().StatusCode; got != tt.want {
  448. t.Errorf("for path %q got HTTP status %v; want %v", tt.path, got, tt.want)
  449. }
  450. }
  451. }
  452. func TestNotifyError(t *testing.T) {
  453. defer derphttp.SetTestHookWatchLookConnectResult(nil)
  454. ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
  455. defer cancel()
  456. priv := key.NewNode()
  457. serverURL, s, ln := newTestServer(t, priv)
  458. defer s.Close()
  459. defer ln.Close()
  460. pub := priv.Public()
  461. // Test early error notification when c.connect fails.
  462. watcher := newWatcherClient(t, priv, serverURL, ln)
  463. watcher.SetURLDialer(netx.DialFunc(func(ctx context.Context, network, addr string) (net.Conn, error) {
  464. t.Helper()
  465. return nil, fmt.Errorf("test error: %s", addr)
  466. }))
  467. defer watcher.Close()
  468. derphttp.SetTestHookWatchLookConnectResult(func(err error, wasSelfConnect bool) bool {
  469. t.Helper()
  470. if err == nil {
  471. t.Fatal("expected error connecting to server, got nil")
  472. }
  473. if wasSelfConnect {
  474. t.Error("wanted normal connect; got self connect")
  475. }
  476. return false
  477. })
  478. errChan := make(chan error, 1)
  479. notifyError := func(err error) {
  480. errChan <- err
  481. }
  482. watcher.RunWatchConnectionLoop(ctx, pub, t.Logf, noopAdd, noopRemove, notifyError)
  483. select {
  484. case err := <-errChan:
  485. if !strings.Contains(err.Error(), "test") {
  486. t.Errorf("expected test error, got %v", err)
  487. }
  488. case <-ctx.Done():
  489. t.Fatalf("context done before receiving error: %v", ctx.Err())
  490. }
  491. }
  492. var liveNetworkTest = flag.Bool("live-net-tests", false, "run live network tests")
  493. func TestManualDial(t *testing.T) {
  494. if !*liveNetworkTest {
  495. t.Skip("skipping live network test without --live-net-tests")
  496. }
  497. dm := &tailcfg.DERPMap{}
  498. res, err := http.Get("https://controlplane.tailscale.com/derpmap/default")
  499. if err != nil {
  500. t.Fatalf("fetching DERPMap: %v", err)
  501. }
  502. defer res.Body.Close()
  503. if err := json.NewDecoder(res.Body).Decode(dm); err != nil {
  504. t.Fatalf("decoding DERPMap: %v", err)
  505. }
  506. region := slices.Sorted(maps.Keys(dm.Regions))[0]
  507. netMon := netmon.NewStatic()
  508. rc := derphttp.NewRegionClient(key.NewNode(), t.Logf, netMon, func() *tailcfg.DERPRegion {
  509. return dm.Regions[region]
  510. })
  511. defer rc.Close()
  512. if err := rc.Connect(context.Background()); err != nil {
  513. t.Fatalf("rc.Connect: %v", err)
  514. }
  515. }
  516. func TestURLDial(t *testing.T) {
  517. if !*liveNetworkTest {
  518. t.Skip("skipping live network test without --live-net-tests")
  519. }
  520. dm := &tailcfg.DERPMap{}
  521. res, err := http.Get("https://controlplane.tailscale.com/derpmap/default")
  522. if err != nil {
  523. t.Fatalf("fetching DERPMap: %v", err)
  524. }
  525. defer res.Body.Close()
  526. if err := json.NewDecoder(res.Body).Decode(dm); err != nil {
  527. t.Fatalf("decoding DERPMap: %v", err)
  528. }
  529. // find a valid target DERP host to test against
  530. var hostname string
  531. for _, reg := range dm.Regions {
  532. for _, node := range reg.Nodes {
  533. if !node.STUNOnly && node.CanPort80 && node.CertName == "" || node.CertName == node.HostName {
  534. hostname = node.HostName
  535. break
  536. }
  537. }
  538. if hostname != "" {
  539. break
  540. }
  541. }
  542. netMon := netmon.NewStatic()
  543. c, err := derphttp.NewClient(key.NewNode(), "https://"+hostname+"/", t.Logf, netMon)
  544. if err != nil {
  545. t.Errorf("NewClient: %v", err)
  546. }
  547. defer c.Close()
  548. if err := c.Connect(context.Background()); err != nil {
  549. t.Fatalf("rc.Connect: %v", err)
  550. }
  551. }