derp_client.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package derp
  4. import (
  5. "bufio"
  6. "encoding/binary"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net/netip"
  12. "sync"
  13. "time"
  14. "go4.org/mem"
  15. "golang.org/x/time/rate"
  16. "tailscale.com/syncs"
  17. "tailscale.com/tstime"
  18. "tailscale.com/types/key"
  19. "tailscale.com/types/logger"
  20. )
  21. // Client is a DERP client.
  22. type Client struct {
  23. serverKey key.NodePublic // of the DERP server; not a machine or node key
  24. privateKey key.NodePrivate
  25. publicKey key.NodePublic // of privateKey
  26. logf logger.Logf
  27. nc Conn
  28. br *bufio.Reader
  29. meshKey key.DERPMesh
  30. canAckPings bool
  31. isProber bool
  32. wmu sync.Mutex // hold while writing to bw
  33. bw *bufio.Writer
  34. rate *rate.Limiter // if non-nil, rate limiter to use
  35. // Owned by Recv:
  36. peeked int // bytes to discard on next Recv
  37. readErr syncs.AtomicValue[error] // sticky (set by Recv)
  38. clock tstime.Clock
  39. }
  40. // ClientOpt is an option passed to NewClient.
  41. type ClientOpt interface {
  42. update(*clientOpt)
  43. }
  44. type clientOptFunc func(*clientOpt)
  45. func (f clientOptFunc) update(o *clientOpt) { f(o) }
  46. // clientOpt are the options passed to newClient.
  47. type clientOpt struct {
  48. MeshKey key.DERPMesh
  49. ServerPub key.NodePublic
  50. CanAckPings bool
  51. IsProber bool
  52. }
  53. // MeshKey returns a ClientOpt to pass to the DERP server during connect to get
  54. // access to join the mesh.
  55. //
  56. // An empty key means to not use a mesh key.
  57. func MeshKey(k key.DERPMesh) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.MeshKey = k }) }
  58. // IsProber returns a ClientOpt to pass to the DERP server during connect to
  59. // declare that this client is a a prober.
  60. func IsProber(v bool) ClientOpt { return clientOptFunc(func(o *clientOpt) { o.IsProber = v }) }
  61. // ServerPublicKey returns a ClientOpt to declare that the server's DERP public key is known.
  62. // If key is the zero value, the returned ClientOpt is a no-op.
  63. func ServerPublicKey(key key.NodePublic) ClientOpt {
  64. return clientOptFunc(func(o *clientOpt) { o.ServerPub = key })
  65. }
  66. // CanAckPings returns a ClientOpt to set whether it advertises to the
  67. // server that it's capable of acknowledging ping requests.
  68. func CanAckPings(v bool) ClientOpt {
  69. return clientOptFunc(func(o *clientOpt) { o.CanAckPings = v })
  70. }
  71. func NewClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf logger.Logf, opts ...ClientOpt) (*Client, error) {
  72. var opt clientOpt
  73. for _, o := range opts {
  74. if o == nil {
  75. return nil, errors.New("nil ClientOpt")
  76. }
  77. o.update(&opt)
  78. }
  79. return newClient(privateKey, nc, brw, logf, opt)
  80. }
  81. func newClient(privateKey key.NodePrivate, nc Conn, brw *bufio.ReadWriter, logf logger.Logf, opt clientOpt) (*Client, error) {
  82. c := &Client{
  83. privateKey: privateKey,
  84. publicKey: privateKey.Public(),
  85. logf: logf,
  86. nc: nc,
  87. br: brw.Reader,
  88. bw: brw.Writer,
  89. meshKey: opt.MeshKey,
  90. canAckPings: opt.CanAckPings,
  91. isProber: opt.IsProber,
  92. clock: tstime.StdClock{},
  93. }
  94. if opt.ServerPub.IsZero() {
  95. if err := c.recvServerKey(); err != nil {
  96. return nil, fmt.Errorf("derp.Client: failed to receive server key: %v", err)
  97. }
  98. } else {
  99. c.serverKey = opt.ServerPub
  100. }
  101. if err := c.sendClientKey(); err != nil {
  102. return nil, fmt.Errorf("derp.Client: failed to send client key: %v", err)
  103. }
  104. return c, nil
  105. }
  106. func (c *Client) PublicKey() key.NodePublic { return c.publicKey }
  107. func (c *Client) recvServerKey() error {
  108. var buf [40]byte
  109. t, flen, err := readFrame(c.br, 1<<10, buf[:])
  110. if err == io.ErrShortBuffer {
  111. // For future-proofing, allow server to send more in its greeting.
  112. err = nil
  113. }
  114. if err != nil {
  115. return err
  116. }
  117. if flen < uint32(len(buf)) || t != FrameServerKey || string(buf[:len(Magic)]) != Magic {
  118. return errors.New("invalid server greeting")
  119. }
  120. c.serverKey = key.NodePublicFromRaw32(mem.B(buf[len(Magic):]))
  121. return nil
  122. }
  123. func (c *Client) parseServerInfo(b []byte) (*ServerInfo, error) {
  124. const maxLength = NonceLen + MaxInfoLen
  125. fl := len(b)
  126. if fl < NonceLen {
  127. return nil, fmt.Errorf("short serverInfo frame")
  128. }
  129. if fl > maxLength {
  130. return nil, fmt.Errorf("long serverInfo frame")
  131. }
  132. msg, ok := c.privateKey.OpenFrom(c.serverKey, b)
  133. if !ok {
  134. return nil, fmt.Errorf("failed to open naclbox from server key %s", c.serverKey)
  135. }
  136. info := new(ServerInfo)
  137. if err := json.Unmarshal(msg, info); err != nil {
  138. return nil, fmt.Errorf("invalid JSON: %v", err)
  139. }
  140. return info, nil
  141. }
  142. // ClientInfo is the information a DERP client sends to the server
  143. // about itself when it connects.
  144. type ClientInfo struct {
  145. // MeshKey optionally specifies a pre-shared key used by
  146. // trusted clients. It's required to subscribe to the
  147. // connection list & forward packets. It's empty for regular
  148. // users.
  149. MeshKey key.DERPMesh `json:"meshKey,omitempty,omitzero"`
  150. // Version is the DERP protocol version that the client was built with.
  151. // See the ProtocolVersion const.
  152. Version int `json:"version,omitempty"`
  153. // CanAckPings is whether the client declares it's able to ack
  154. // pings.
  155. CanAckPings bool
  156. // IsProber is whether this client is a prober.
  157. IsProber bool `json:",omitempty"`
  158. }
  159. // Equal reports if two clientInfo values are equal.
  160. func (c *ClientInfo) Equal(other *ClientInfo) bool {
  161. if c == nil || other == nil {
  162. return c == other
  163. }
  164. if c.Version != other.Version || c.CanAckPings != other.CanAckPings || c.IsProber != other.IsProber {
  165. return false
  166. }
  167. return c.MeshKey.Equal(other.MeshKey)
  168. }
  169. func (c *Client) sendClientKey() error {
  170. msg, err := json.Marshal(ClientInfo{
  171. Version: ProtocolVersion,
  172. MeshKey: c.meshKey,
  173. CanAckPings: c.canAckPings,
  174. IsProber: c.isProber,
  175. })
  176. if err != nil {
  177. return err
  178. }
  179. msgbox := c.privateKey.SealTo(c.serverKey, msg)
  180. buf := make([]byte, 0, KeyLen+len(msgbox))
  181. buf = c.publicKey.AppendTo(buf)
  182. buf = append(buf, msgbox...)
  183. return WriteFrame(c.bw, FrameClientInfo, buf)
  184. }
  185. // ServerPublicKey returns the server's public key.
  186. func (c *Client) ServerPublicKey() key.NodePublic { return c.serverKey }
  187. // Send sends a packet to the Tailscale node identified by dstKey.
  188. //
  189. // It is an error if the packet is larger than 64KB.
  190. func (c *Client) Send(dstKey key.NodePublic, pkt []byte) error { return c.send(dstKey, pkt) }
  191. func (c *Client) send(dstKey key.NodePublic, pkt []byte) (ret error) {
  192. defer func() {
  193. if ret != nil {
  194. ret = fmt.Errorf("derp.Send: %w", ret)
  195. }
  196. }()
  197. if len(pkt) > MaxPacketSize {
  198. return fmt.Errorf("packet too big: %d", len(pkt))
  199. }
  200. c.wmu.Lock()
  201. defer c.wmu.Unlock()
  202. if c.rate != nil {
  203. pktLen := FrameHeaderLen + key.NodePublicRawLen + len(pkt)
  204. if !c.rate.AllowN(c.clock.Now(), pktLen) {
  205. return nil // drop
  206. }
  207. }
  208. if err := WriteFrameHeader(c.bw, FrameSendPacket, uint32(key.NodePublicRawLen+len(pkt))); err != nil {
  209. return err
  210. }
  211. if _, err := c.bw.Write(dstKey.AppendTo(nil)); err != nil {
  212. return err
  213. }
  214. if _, err := c.bw.Write(pkt); err != nil {
  215. return err
  216. }
  217. return c.bw.Flush()
  218. }
  219. func (c *Client) ForwardPacket(srcKey, dstKey key.NodePublic, pkt []byte) (err error) {
  220. defer func() {
  221. if err != nil {
  222. err = fmt.Errorf("derp.ForwardPacket: %w", err)
  223. }
  224. }()
  225. if len(pkt) > MaxPacketSize {
  226. return fmt.Errorf("packet too big: %d", len(pkt))
  227. }
  228. c.wmu.Lock()
  229. defer c.wmu.Unlock()
  230. timer := c.clock.AfterFunc(5*time.Second, c.writeTimeoutFired)
  231. defer timer.Stop()
  232. if err := WriteFrameHeader(c.bw, FrameForwardPacket, uint32(KeyLen*2+len(pkt))); err != nil {
  233. return err
  234. }
  235. if _, err := c.bw.Write(srcKey.AppendTo(nil)); err != nil {
  236. return err
  237. }
  238. if _, err := c.bw.Write(dstKey.AppendTo(nil)); err != nil {
  239. return err
  240. }
  241. if _, err := c.bw.Write(pkt); err != nil {
  242. return err
  243. }
  244. return c.bw.Flush()
  245. }
  246. func (c *Client) writeTimeoutFired() { c.nc.Close() }
  247. func (c *Client) SendPing(data [8]byte) error {
  248. return c.sendPingOrPong(FramePing, data)
  249. }
  250. func (c *Client) SendPong(data [8]byte) error {
  251. return c.sendPingOrPong(FramePong, data)
  252. }
  253. func (c *Client) sendPingOrPong(typ FrameType, data [8]byte) error {
  254. c.wmu.Lock()
  255. defer c.wmu.Unlock()
  256. if err := WriteFrameHeader(c.bw, typ, 8); err != nil {
  257. return err
  258. }
  259. if _, err := c.bw.Write(data[:]); err != nil {
  260. return err
  261. }
  262. return c.bw.Flush()
  263. }
  264. // NotePreferred sends a packet that tells the server whether this
  265. // client is the user's preferred server. This is only used in the
  266. // server for stats.
  267. func (c *Client) NotePreferred(preferred bool) (err error) {
  268. defer func() {
  269. if err != nil {
  270. err = fmt.Errorf("derp.NotePreferred: %v", err)
  271. }
  272. }()
  273. c.wmu.Lock()
  274. defer c.wmu.Unlock()
  275. if err := WriteFrameHeader(c.bw, FrameNotePreferred, 1); err != nil {
  276. return err
  277. }
  278. var b byte = 0x00
  279. if preferred {
  280. b = 0x01
  281. }
  282. if err := c.bw.WriteByte(b); err != nil {
  283. return err
  284. }
  285. return c.bw.Flush()
  286. }
  287. // WatchConnectionChanges sends a request to subscribe to the peer's connection list.
  288. // It's a fatal error if the client wasn't created using MeshKey.
  289. func (c *Client) WatchConnectionChanges() error {
  290. c.wmu.Lock()
  291. defer c.wmu.Unlock()
  292. if err := WriteFrameHeader(c.bw, FrameWatchConns, 0); err != nil {
  293. return err
  294. }
  295. return c.bw.Flush()
  296. }
  297. // ClosePeer asks the server to close target's TCP connection.
  298. // It's a fatal error if the client wasn't created using MeshKey.
  299. func (c *Client) ClosePeer(target key.NodePublic) error {
  300. c.wmu.Lock()
  301. defer c.wmu.Unlock()
  302. return WriteFrame(c.bw, FrameClosePeer, target.AppendTo(nil))
  303. }
  304. // ReceivedMessage represents a type returned by Client.Recv. Unless
  305. // otherwise documented, the returned message aliases the byte slice
  306. // provided to Recv and thus the message is only as good as that
  307. // buffer, which is up to the caller.
  308. type ReceivedMessage interface {
  309. msg()
  310. }
  311. // ReceivedPacket is a ReceivedMessage representing an incoming packet.
  312. type ReceivedPacket struct {
  313. Source key.NodePublic
  314. // Data is the received packet bytes. It aliases the memory
  315. // passed to Client.Recv.
  316. Data []byte
  317. }
  318. func (ReceivedPacket) msg() {}
  319. // PeerGoneMessage is a ReceivedMessage that indicates that the client
  320. // identified by the underlying public key is not connected to this
  321. // server.
  322. //
  323. // It has only historically been sent by the server when the client
  324. // connection count decremented from 1 to 0 and not from e.g. 2 to 1.
  325. // See https://github.com/tailscale/tailscale/issues/13566 for details.
  326. type PeerGoneMessage struct {
  327. Peer key.NodePublic
  328. Reason PeerGoneReasonType
  329. }
  330. func (PeerGoneMessage) msg() {}
  331. // PeerPresentMessage is a ReceivedMessage that indicates that the client is
  332. // connected to the server. (Only used by trusted mesh clients)
  333. //
  334. // It will be sent to client watchers for every new connection from a client,
  335. // even if the client's already connected with that public key.
  336. // See https://github.com/tailscale/tailscale/issues/13566 for PeerPresentMessage
  337. // and PeerGoneMessage not being 1:1.
  338. type PeerPresentMessage struct {
  339. // Key is the public key of the client.
  340. Key key.NodePublic
  341. // IPPort is the remote IP and port of the client.
  342. IPPort netip.AddrPort
  343. // Flags is a bitmask of info about the client.
  344. Flags PeerPresentFlags
  345. }
  346. func (PeerPresentMessage) msg() {}
  347. // ServerInfoMessage is sent by the server upon first connect.
  348. type ServerInfoMessage struct {
  349. // TokenBucketBytesPerSecond is how many bytes per second the
  350. // server says it will accept, including all framing bytes.
  351. //
  352. // Zero means unspecified. There might be a limit, but the
  353. // client need not try to respect it.
  354. TokenBucketBytesPerSecond int
  355. // TokenBucketBytesBurst is how many bytes the server will
  356. // allow to burst, temporarily violating
  357. // TokenBucketBytesPerSecond.
  358. //
  359. // Zero means unspecified. There might be a limit, but the
  360. // client need not try to respect it.
  361. TokenBucketBytesBurst int
  362. }
  363. func (ServerInfoMessage) msg() {}
  364. // PingMessage is a request from a client or server to reply to the
  365. // other side with a PongMessage with the given payload.
  366. type PingMessage [8]byte
  367. func (PingMessage) msg() {}
  368. // PongMessage is a reply to a PingMessage from a client or server
  369. // with the payload sent previously in a PingMessage.
  370. type PongMessage [8]byte
  371. func (PongMessage) msg() {}
  372. // KeepAliveMessage is a one-way empty message from server to client, just to
  373. // keep the connection alive. It's like a PingMessage, but doesn't solicit
  374. // a reply from the client.
  375. type KeepAliveMessage struct{}
  376. func (KeepAliveMessage) msg() {}
  377. // HealthMessage is a one-way message from server to client, declaring the
  378. // connection health state.
  379. type HealthMessage struct {
  380. // Problem, if non-empty, is a description of why the connection
  381. // is unhealthy.
  382. //
  383. // The empty string means the connection is healthy again.
  384. //
  385. // The default condition is healthy, so the server doesn't
  386. // broadcast a HealthMessage until a problem exists.
  387. Problem string
  388. }
  389. func (HealthMessage) msg() {}
  390. // ServerRestartingMessage is a one-way message from server to client,
  391. // advertising that the server is restarting.
  392. type ServerRestartingMessage struct {
  393. // ReconnectIn is an advisory duration that the client should wait
  394. // before attempting to reconnect. It might be zero.
  395. // It exists for the server to smear out the reconnects.
  396. ReconnectIn time.Duration
  397. // TryFor is an advisory duration for how long the client
  398. // should attempt to reconnect before giving up and proceeding
  399. // with its normal connection failure logic. The interval
  400. // between retries is undefined for now.
  401. // A server should not send a TryFor duration more than a few
  402. // seconds.
  403. TryFor time.Duration
  404. }
  405. func (ServerRestartingMessage) msg() {}
  406. // Recv reads a message from the DERP server.
  407. //
  408. // The returned message may alias memory owned by the Client; it
  409. // should only be accessed until the next call to Client.
  410. //
  411. // Once Recv returns an error, the Client is dead forever.
  412. func (c *Client) Recv() (m ReceivedMessage, err error) {
  413. return c.recvTimeout(120 * time.Second)
  414. }
  415. func (c *Client) recvTimeout(timeout time.Duration) (m ReceivedMessage, err error) {
  416. readErr := c.readErr.Load()
  417. if readErr != nil {
  418. return nil, readErr
  419. }
  420. defer func() {
  421. if err != nil {
  422. err = fmt.Errorf("derp.Recv: %w", err)
  423. c.readErr.Store(err)
  424. }
  425. }()
  426. for {
  427. c.nc.SetReadDeadline(time.Now().Add(timeout))
  428. // Discard any peeked bytes from a previous Recv call.
  429. if c.peeked != 0 {
  430. if n, err := c.br.Discard(c.peeked); err != nil || n != c.peeked {
  431. // Documented to never fail, but might as well check.
  432. return nil, fmt.Errorf("bufio.Reader.Discard(%d bytes): got %v, %v", c.peeked, n, err)
  433. }
  434. c.peeked = 0
  435. }
  436. t, n, err := ReadFrameHeader(c.br)
  437. if err != nil {
  438. return nil, err
  439. }
  440. if n > 1<<20 {
  441. return nil, fmt.Errorf("unexpectedly large frame of %d bytes returned", n)
  442. }
  443. var b []byte // frame payload (past the 5 byte header)
  444. // If the frame fits in our bufio.Reader buffer, just use it.
  445. // In practice it's 4KB (from derphttp.Client's bufio.NewReader(httpConn)) and
  446. // in practive, WireGuard packets (and thus DERP frames) are under 1.5KB.
  447. // So this is the common path.
  448. if int(n) <= c.br.Size() {
  449. b, err = c.br.Peek(int(n))
  450. c.peeked = int(n)
  451. } else {
  452. // But if for some reason we read a large DERP message (which isn't necessarily
  453. // a WireGuard packet), then just allocate memory for it.
  454. // TODO(bradfitz): use a pool if large frames ever happen in practice.
  455. b = make([]byte, n)
  456. _, err = io.ReadFull(c.br, b)
  457. }
  458. if err != nil {
  459. return nil, err
  460. }
  461. switch t {
  462. default:
  463. continue
  464. case FrameServerInfo:
  465. // Server sends this at start-up. Currently unused.
  466. // Just has a JSON message saying "version: 2",
  467. // but the protocol seems extensible enough as-is without
  468. // needing to wait an RTT to discover the version at startup.
  469. // We'd prefer to give the connection to the client (magicsock)
  470. // to start writing as soon as possible.
  471. si, err := c.parseServerInfo(b)
  472. if err != nil {
  473. return nil, fmt.Errorf("invalid server info frame: %v", err)
  474. }
  475. sm := ServerInfoMessage{
  476. TokenBucketBytesPerSecond: si.TokenBucketBytesPerSecond,
  477. TokenBucketBytesBurst: si.TokenBucketBytesBurst,
  478. }
  479. c.setSendRateLimiter(sm)
  480. return sm, nil
  481. case FrameKeepAlive:
  482. // A one-way keep-alive message that doesn't require an acknowledgement.
  483. // This predated framePing/framePong.
  484. return KeepAliveMessage{}, nil
  485. case FramePeerGone:
  486. if n < KeyLen {
  487. c.logf("[unexpected] dropping short peerGone frame from DERP server")
  488. continue
  489. }
  490. // Backward compatibility for the older peerGone without reason byte
  491. reason := PeerGoneReasonDisconnected
  492. if n > KeyLen {
  493. reason = PeerGoneReasonType(b[KeyLen])
  494. }
  495. pg := PeerGoneMessage{
  496. Peer: key.NodePublicFromRaw32(mem.B(b[:KeyLen])),
  497. Reason: reason,
  498. }
  499. return pg, nil
  500. case FramePeerPresent:
  501. remain := b
  502. chunk, remain, ok := cutLeadingN(remain, KeyLen)
  503. if !ok {
  504. c.logf("[unexpected] dropping short peerPresent frame from DERP server")
  505. continue
  506. }
  507. var msg PeerPresentMessage
  508. msg.Key = key.NodePublicFromRaw32(mem.B(chunk))
  509. const ipLen = 16
  510. const portLen = 2
  511. chunk, remain, ok = cutLeadingN(remain, ipLen+portLen)
  512. if !ok {
  513. // Older server which didn't send the IP.
  514. return msg, nil
  515. }
  516. msg.IPPort = netip.AddrPortFrom(
  517. netip.AddrFrom16([16]byte(chunk[:ipLen])).Unmap(),
  518. binary.BigEndian.Uint16(chunk[ipLen:]),
  519. )
  520. chunk, _, ok = cutLeadingN(remain, 1)
  521. if !ok {
  522. // Older server which doesn't send PeerPresentFlags.
  523. return msg, nil
  524. }
  525. msg.Flags = PeerPresentFlags(chunk[0])
  526. return msg, nil
  527. case FrameRecvPacket:
  528. var rp ReceivedPacket
  529. if n < KeyLen {
  530. c.logf("[unexpected] dropping short packet from DERP server")
  531. continue
  532. }
  533. rp.Source = key.NodePublicFromRaw32(mem.B(b[:KeyLen]))
  534. rp.Data = b[KeyLen:n]
  535. return rp, nil
  536. case FramePing:
  537. var pm PingMessage
  538. if n < 8 {
  539. c.logf("[unexpected] dropping short ping frame")
  540. continue
  541. }
  542. copy(pm[:], b[:])
  543. return pm, nil
  544. case FramePong:
  545. var pm PongMessage
  546. if n < 8 {
  547. c.logf("[unexpected] dropping short ping frame")
  548. continue
  549. }
  550. copy(pm[:], b[:])
  551. return pm, nil
  552. case FrameHealth:
  553. return HealthMessage{Problem: string(b[:])}, nil
  554. case FrameRestarting:
  555. var m ServerRestartingMessage
  556. if n < 8 {
  557. c.logf("[unexpected] dropping short server restarting frame")
  558. continue
  559. }
  560. m.ReconnectIn = time.Duration(binary.BigEndian.Uint32(b[0:4])) * time.Millisecond
  561. m.TryFor = time.Duration(binary.BigEndian.Uint32(b[4:8])) * time.Millisecond
  562. return m, nil
  563. }
  564. }
  565. }
  566. func (c *Client) setSendRateLimiter(sm ServerInfoMessage) {
  567. c.wmu.Lock()
  568. defer c.wmu.Unlock()
  569. if sm.TokenBucketBytesPerSecond == 0 {
  570. c.rate = nil
  571. } else {
  572. c.rate = rate.NewLimiter(
  573. rate.Limit(sm.TokenBucketBytesPerSecond),
  574. sm.TokenBucketBytesBurst)
  575. }
  576. }
  577. // LocalAddr returns the TCP connection's local address.
  578. //
  579. // If the client is broken in some previously detectable way, it
  580. // returns an error.
  581. func (c *Client) LocalAddr() (netip.AddrPort, error) {
  582. readErr, _ := c.readErr.Load().(error)
  583. if readErr != nil {
  584. return netip.AddrPort{}, readErr
  585. }
  586. if c.nc == nil {
  587. return netip.AddrPort{}, errors.New("nil conn")
  588. }
  589. a := c.nc.LocalAddr()
  590. if a == nil {
  591. return netip.AddrPort{}, errors.New("nil addr")
  592. }
  593. return netip.ParseAddrPort(a.String())
  594. }
  595. func cutLeadingN(b []byte, n int) (chunk, remain []byte, ok bool) {
  596. if len(b) >= n {
  597. return b[:n], b[n:], true
  598. }
  599. return nil, b, false
  600. }