link_endpoint.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package netstack
  4. import (
  5. "context"
  6. "sync"
  7. "gvisor.dev/gvisor/pkg/tcpip"
  8. "gvisor.dev/gvisor/pkg/tcpip/header"
  9. "gvisor.dev/gvisor/pkg/tcpip/stack"
  10. "tailscale.com/net/packet"
  11. "tailscale.com/types/ipproto"
  12. "tailscale.com/wgengine/netstack/gro"
  13. )
  14. type queue struct {
  15. // TODO(jwhited): evaluate performance with mu as Mutex and/or alternative
  16. // non-channel buffer.
  17. c chan *stack.PacketBuffer
  18. mu sync.RWMutex // mu guards closed
  19. closed bool
  20. }
  21. func (q *queue) Close() {
  22. q.mu.Lock()
  23. defer q.mu.Unlock()
  24. if !q.closed {
  25. close(q.c)
  26. }
  27. q.closed = true
  28. }
  29. func (q *queue) Read() *stack.PacketBuffer {
  30. select {
  31. case p := <-q.c:
  32. return p
  33. default:
  34. return nil
  35. }
  36. }
  37. func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
  38. select {
  39. case pkt := <-q.c:
  40. return pkt
  41. case <-ctx.Done():
  42. return nil
  43. }
  44. }
  45. func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
  46. // q holds the PacketBuffer.
  47. q.mu.RLock()
  48. defer q.mu.RUnlock()
  49. if q.closed {
  50. return &tcpip.ErrClosedForSend{}
  51. }
  52. wrote := false
  53. select {
  54. case q.c <- pkt.IncRef():
  55. wrote = true
  56. default:
  57. // TODO(jwhited): reconsider/count
  58. pkt.DecRef()
  59. }
  60. if wrote {
  61. return nil
  62. }
  63. return &tcpip.ErrNoBufferSpace{}
  64. }
  65. func (q *queue) Num() int {
  66. return len(q.c)
  67. }
  68. var _ stack.LinkEndpoint = (*linkEndpoint)(nil)
  69. var _ stack.GSOEndpoint = (*linkEndpoint)(nil)
  70. type supportedGRO int
  71. const (
  72. groNotSupported supportedGRO = iota
  73. tcpGROSupported
  74. )
  75. // linkEndpoint implements stack.LinkEndpoint and stack.GSOEndpoint. Outbound
  76. // packets written by gVisor towards Tailscale are stored in a channel.
  77. // Inbound is fed to gVisor via injectInbound or gro. This is loosely
  78. // modeled after gvisor.dev/pkg/tcpip/link/channel.Endpoint.
  79. type linkEndpoint struct {
  80. SupportedGSOKind stack.SupportedGSO
  81. supportedGRO supportedGRO
  82. mu sync.RWMutex // mu guards the following fields
  83. dispatcher stack.NetworkDispatcher
  84. linkAddr tcpip.LinkAddress
  85. mtu uint32
  86. q *queue // outbound
  87. }
  88. func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, supportedGRO supportedGRO) *linkEndpoint {
  89. le := &linkEndpoint{
  90. supportedGRO: supportedGRO,
  91. q: &queue{
  92. c: make(chan *stack.PacketBuffer, size),
  93. },
  94. mtu: mtu,
  95. linkAddr: linkAddr,
  96. }
  97. return le
  98. }
  99. // gro attempts to enqueue p on g if l supports a GRO kind matching the
  100. // transport protocol carried in p. gro may allocate g if it is nil. gro can
  101. // either return the existing g, a newly allocated one, or nil. Callers are
  102. // responsible for calling Flush() on the returned value if it is non-nil once
  103. // they have finished iterating through all GRO candidates for a given vector.
  104. // If gro allocates a *gro.GRO it will have l's stack.NetworkDispatcher set via
  105. // SetDispatcher().
  106. func (l *linkEndpoint) gro(p *packet.Parsed, g *gro.GRO) *gro.GRO {
  107. if l.supportedGRO == groNotSupported || p.IPProto != ipproto.TCP {
  108. // IPv6 may have extension headers preceding a TCP header, but we trade
  109. // for a fast path and assume p cannot be coalesced in such a case.
  110. l.injectInbound(p)
  111. return g
  112. }
  113. if g == nil {
  114. l.mu.RLock()
  115. d := l.dispatcher
  116. l.mu.RUnlock()
  117. g = gro.NewGRO()
  118. g.SetDispatcher(d)
  119. }
  120. g.Enqueue(p)
  121. return g
  122. }
  123. // Close closes l. Further packet injections will return an error, and all
  124. // pending packets are discarded. Close may be called concurrently with
  125. // WritePackets.
  126. func (l *linkEndpoint) Close() {
  127. l.mu.Lock()
  128. l.dispatcher = nil
  129. l.mu.Unlock()
  130. l.q.Close()
  131. l.Drain()
  132. }
  133. // Read does non-blocking read one packet from the outbound packet queue.
  134. func (l *linkEndpoint) Read() *stack.PacketBuffer {
  135. return l.q.Read()
  136. }
  137. // ReadContext does blocking read for one packet from the outbound packet queue.
  138. // It can be cancelled by ctx, and in this case, it returns nil.
  139. func (l *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
  140. return l.q.ReadContext(ctx)
  141. }
  142. // Drain removes all outbound packets from the channel and counts them.
  143. func (l *linkEndpoint) Drain() int {
  144. c := 0
  145. for pkt := l.Read(); pkt != nil; pkt = l.Read() {
  146. pkt.DecRef()
  147. c++
  148. }
  149. return c
  150. }
  151. // NumQueued returns the number of packets queued for outbound.
  152. func (l *linkEndpoint) NumQueued() int {
  153. return l.q.Num()
  154. }
  155. func (l *linkEndpoint) injectInbound(p *packet.Parsed) {
  156. l.mu.RLock()
  157. d := l.dispatcher
  158. l.mu.RUnlock()
  159. if d == nil {
  160. return
  161. }
  162. pkt := gro.RXChecksumOffload(p)
  163. if pkt == nil {
  164. return
  165. }
  166. d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, pkt)
  167. pkt.DecRef()
  168. }
  169. // Attach saves the stack network-layer dispatcher for use later when packets
  170. // are injected.
  171. func (l *linkEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
  172. l.mu.Lock()
  173. defer l.mu.Unlock()
  174. l.dispatcher = dispatcher
  175. }
  176. // IsAttached implements stack.LinkEndpoint.IsAttached.
  177. func (l *linkEndpoint) IsAttached() bool {
  178. l.mu.RLock()
  179. defer l.mu.RUnlock()
  180. return l.dispatcher != nil
  181. }
  182. // MTU implements stack.LinkEndpoint.MTU.
  183. func (l *linkEndpoint) MTU() uint32 {
  184. l.mu.RLock()
  185. defer l.mu.RUnlock()
  186. return l.mtu
  187. }
  188. // SetMTU implements stack.LinkEndpoint.SetMTU.
  189. func (l *linkEndpoint) SetMTU(mtu uint32) {
  190. l.mu.Lock()
  191. defer l.mu.Unlock()
  192. l.mtu = mtu
  193. }
  194. // Capabilities implements stack.LinkEndpoint.Capabilities.
  195. func (l *linkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
  196. // We are required to offload RX checksum validation for the purposes of
  197. // GRO.
  198. return stack.CapabilityRXChecksumOffload
  199. }
  200. // GSOMaxSize implements stack.GSOEndpoint.
  201. func (*linkEndpoint) GSOMaxSize() uint32 {
  202. // This an increase from 32k returned by channel.Endpoint.GSOMaxSize() to
  203. // 64k, which improves throughput.
  204. return (1 << 16) - 1
  205. }
  206. // SupportedGSO implements stack.GSOEndpoint.
  207. func (l *linkEndpoint) SupportedGSO() stack.SupportedGSO {
  208. return l.SupportedGSOKind
  209. }
  210. // MaxHeaderLength returns the maximum size of the link layer header. Given it
  211. // doesn't have a header, it just returns 0.
  212. func (*linkEndpoint) MaxHeaderLength() uint16 {
  213. return 0
  214. }
  215. // LinkAddress returns the link address of this endpoint.
  216. func (l *linkEndpoint) LinkAddress() tcpip.LinkAddress {
  217. l.mu.RLock()
  218. defer l.mu.RUnlock()
  219. return l.linkAddr
  220. }
  221. // SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
  222. func (l *linkEndpoint) SetLinkAddress(addr tcpip.LinkAddress) {
  223. l.mu.Lock()
  224. defer l.mu.Unlock()
  225. l.linkAddr = addr
  226. }
  227. // WritePackets stores outbound packets into the channel.
  228. // Multiple concurrent calls are permitted.
  229. func (l *linkEndpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
  230. n := 0
  231. // TODO(jwhited): evaluate writing a stack.PacketBufferList instead of a
  232. // single packet. We can split 2 x 64K GSO across
  233. // wireguard-go/conn.IdealBatchSize (128 slots) @ 1280 MTU, and non-GSO we
  234. // could do more. Read API would need to change to take advantage. Verify
  235. // gVisor limits around max number of segments packed together. Since we
  236. // control MTU (and by effect TCP MSS in gVisor) we *shouldn't* expect to
  237. // ever overflow 128 slots (see wireguard-go/tun.ErrTooManySegments usage).
  238. for _, pkt := range pkts.AsSlice() {
  239. if err := l.q.Write(pkt); err != nil {
  240. if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
  241. return 0, err
  242. }
  243. break
  244. }
  245. n++
  246. }
  247. return n, nil
  248. }
  249. // Wait implements stack.LinkEndpoint.Wait.
  250. func (*linkEndpoint) Wait() {}
  251. // ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
  252. func (*linkEndpoint) ARPHardwareType() header.ARPHardwareType {
  253. return header.ARPHardwareNone
  254. }
  255. // AddHeader implements stack.LinkEndpoint.AddHeader.
  256. func (*linkEndpoint) AddHeader(*stack.PacketBuffer) {}
  257. // ParseHeader implements stack.LinkEndpoint.ParseHeader.
  258. func (*linkEndpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
  259. // SetOnCloseAction implements stack.LinkEndpoint.
  260. func (*linkEndpoint) SetOnCloseAction(func()) {}