link_endpoint.go 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package netstack
  4. import (
  5. "context"
  6. "sync"
  7. "gvisor.dev/gvisor/pkg/tcpip"
  8. "gvisor.dev/gvisor/pkg/tcpip/header"
  9. "gvisor.dev/gvisor/pkg/tcpip/stack"
  10. "tailscale.com/feature/buildfeatures"
  11. "tailscale.com/net/packet"
  12. "tailscale.com/types/ipproto"
  13. "tailscale.com/wgengine/netstack/gro"
  14. )
  15. type queue struct {
  16. // TODO(jwhited): evaluate performance with a non-channel buffer.
  17. c chan *stack.PacketBuffer
  18. closeOnce sync.Once
  19. closedCh chan struct{}
  20. mu sync.RWMutex
  21. closed bool
  22. }
  23. func (q *queue) Close() {
  24. q.closeOnce.Do(func() {
  25. close(q.closedCh)
  26. })
  27. q.mu.Lock()
  28. defer q.mu.Unlock()
  29. if q.closed {
  30. return
  31. }
  32. close(q.c)
  33. q.closed = true
  34. }
  35. func (q *queue) Read() *stack.PacketBuffer {
  36. select {
  37. case p := <-q.c:
  38. return p
  39. default:
  40. return nil
  41. }
  42. }
  43. func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
  44. select {
  45. case pkt := <-q.c:
  46. return pkt
  47. case <-ctx.Done():
  48. return nil
  49. }
  50. }
  51. func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
  52. q.mu.RLock()
  53. defer q.mu.RUnlock()
  54. if q.closed {
  55. return &tcpip.ErrClosedForSend{}
  56. }
  57. select {
  58. case q.c <- pkt.IncRef():
  59. return nil
  60. case <-q.closedCh:
  61. pkt.DecRef()
  62. return &tcpip.ErrClosedForSend{}
  63. }
  64. }
  65. func (q *queue) Drain() int {
  66. c := 0
  67. for pkt := range q.c {
  68. pkt.DecRef()
  69. c++
  70. }
  71. return c
  72. }
  73. func (q *queue) Num() int {
  74. return len(q.c)
  75. }
  76. var _ stack.LinkEndpoint = (*linkEndpoint)(nil)
  77. var _ stack.GSOEndpoint = (*linkEndpoint)(nil)
  78. type supportedGRO int
  79. const (
  80. groNotSupported supportedGRO = iota
  81. tcpGROSupported
  82. )
  83. // linkEndpoint implements stack.LinkEndpoint and stack.GSOEndpoint. Outbound
  84. // packets written by gVisor towards Tailscale are stored in a channel.
  85. // Inbound is fed to gVisor via injectInbound or gro. This is loosely
  86. // modeled after gvisor.dev/pkg/tcpip/link/channel.Endpoint.
  87. type linkEndpoint struct {
  88. SupportedGSOKind stack.SupportedGSO
  89. supportedGRO supportedGRO
  90. mu sync.RWMutex // mu guards the following fields
  91. dispatcher stack.NetworkDispatcher
  92. linkAddr tcpip.LinkAddress
  93. mtu uint32
  94. q *queue // outbound
  95. }
  96. func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, supportedGRO supportedGRO) *linkEndpoint {
  97. le := &linkEndpoint{
  98. supportedGRO: supportedGRO,
  99. q: &queue{
  100. c: make(chan *stack.PacketBuffer, size),
  101. closedCh: make(chan struct{}),
  102. },
  103. mtu: mtu,
  104. linkAddr: linkAddr,
  105. }
  106. return le
  107. }
  108. // gro attempts to enqueue p on g if ep supports a GRO kind matching the
  109. // transport protocol carried in p. gro may allocate g if it is nil. gro can
  110. // either return the existing g, a newly allocated one, or nil. Callers are
  111. // responsible for calling Flush() on the returned value if it is non-nil once
  112. // they have finished iterating through all GRO candidates for a given vector.
  113. // If gro allocates a *gro.GRO it will have ep's stack.NetworkDispatcher set via
  114. // SetDispatcher().
  115. func (ep *linkEndpoint) gro(p *packet.Parsed, g *gro.GRO) *gro.GRO {
  116. if !buildfeatures.HasGRO || ep.supportedGRO == groNotSupported || p.IPProto != ipproto.TCP {
  117. // IPv6 may have extension headers preceding a TCP header, but we trade
  118. // for a fast path and assume p cannot be coalesced in such a case.
  119. ep.injectInbound(p)
  120. return g
  121. }
  122. if g == nil {
  123. ep.mu.RLock()
  124. d := ep.dispatcher
  125. ep.mu.RUnlock()
  126. g = gro.NewGRO()
  127. g.SetDispatcher(d)
  128. }
  129. g.Enqueue(p)
  130. return g
  131. }
  132. // Close closes l. Further packet injections will return an error, and all
  133. // pending packets are discarded. Close may be called concurrently with
  134. // WritePackets.
  135. func (ep *linkEndpoint) Close() {
  136. ep.mu.Lock()
  137. ep.dispatcher = nil
  138. ep.mu.Unlock()
  139. ep.q.Close()
  140. ep.Drain()
  141. }
  142. // Read does non-blocking read one packet from the outbound packet queue.
  143. func (ep *linkEndpoint) Read() *stack.PacketBuffer {
  144. return ep.q.Read()
  145. }
  146. // ReadContext does blocking read for one packet from the outbound packet queue.
  147. // It can be cancelled by ctx, and in this case, it returns nil.
  148. func (ep *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
  149. return ep.q.ReadContext(ctx)
  150. }
  151. // Drain removes all outbound packets from the channel and counts them.
  152. func (ep *linkEndpoint) Drain() int {
  153. return ep.q.Drain()
  154. }
  155. // NumQueued returns the number of packets queued for outbound.
  156. func (ep *linkEndpoint) NumQueued() int {
  157. return ep.q.Num()
  158. }
  159. func (ep *linkEndpoint) injectInbound(p *packet.Parsed) {
  160. ep.mu.RLock()
  161. d := ep.dispatcher
  162. ep.mu.RUnlock()
  163. if d == nil || !buildfeatures.HasNetstack {
  164. return
  165. }
  166. pkt := gro.RXChecksumOffload(p)
  167. if pkt == nil {
  168. return
  169. }
  170. d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, pkt)
  171. pkt.DecRef()
  172. }
  173. // Attach saves the stack network-layer dispatcher for use later when packets
  174. // are injected.
  175. func (ep *linkEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
  176. ep.mu.Lock()
  177. defer ep.mu.Unlock()
  178. ep.dispatcher = dispatcher
  179. }
  180. // IsAttached implements stack.LinkEndpoint.IsAttached.
  181. func (ep *linkEndpoint) IsAttached() bool {
  182. ep.mu.RLock()
  183. defer ep.mu.RUnlock()
  184. return ep.dispatcher != nil
  185. }
  186. // MTU implements stack.LinkEndpoint.MTU.
  187. func (ep *linkEndpoint) MTU() uint32 {
  188. ep.mu.RLock()
  189. defer ep.mu.RUnlock()
  190. return ep.mtu
  191. }
  192. // SetMTU implements stack.LinkEndpoint.SetMTU.
  193. func (ep *linkEndpoint) SetMTU(mtu uint32) {
  194. ep.mu.Lock()
  195. defer ep.mu.Unlock()
  196. ep.mtu = mtu
  197. }
  198. // Capabilities implements stack.LinkEndpoint.Capabilities.
  199. func (ep *linkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
  200. // We are required to offload RX checksum validation for the purposes of
  201. // GRO.
  202. return stack.CapabilityRXChecksumOffload
  203. }
  204. // GSOMaxSize implements stack.GSOEndpoint.
  205. func (*linkEndpoint) GSOMaxSize() uint32 {
  206. // This an increase from 32k returned by channel.Endpoint.GSOMaxSize() to
  207. // 64k, which improves throughput.
  208. return (1 << 16) - 1
  209. }
  210. // SupportedGSO implements stack.GSOEndpoint.
  211. func (ep *linkEndpoint) SupportedGSO() stack.SupportedGSO {
  212. return ep.SupportedGSOKind
  213. }
  214. // MaxHeaderLength returns the maximum size of the link layer header. Given it
  215. // doesn't have a header, it just returns 0.
  216. func (*linkEndpoint) MaxHeaderLength() uint16 {
  217. return 0
  218. }
  219. // LinkAddress returns the link address of this endpoint.
  220. func (ep *linkEndpoint) LinkAddress() tcpip.LinkAddress {
  221. ep.mu.RLock()
  222. defer ep.mu.RUnlock()
  223. return ep.linkAddr
  224. }
  225. // SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
  226. func (ep *linkEndpoint) SetLinkAddress(addr tcpip.LinkAddress) {
  227. ep.mu.Lock()
  228. defer ep.mu.Unlock()
  229. ep.linkAddr = addr
  230. }
  231. // WritePackets stores outbound packets into the channel.
  232. // Multiple concurrent calls are permitted.
  233. func (ep *linkEndpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
  234. n := 0
  235. // TODO(jwhited): evaluate writing a stack.PacketBufferList instead of a
  236. // single packet. We can split 2 x 64K GSO across
  237. // wireguard-go/conn.IdealBatchSize (128 slots) @ 1280 MTU, and non-GSO we
  238. // could do more. Read API would need to change to take advantage. Verify
  239. // gVisor limits around max number of segments packed together. Since we
  240. // control MTU (and by effect TCP MSS in gVisor) we *shouldn't* expect to
  241. // ever overflow 128 slots (see wireguard-go/tun.ErrTooManySegments usage).
  242. for _, pkt := range pkts.AsSlice() {
  243. if err := ep.q.Write(pkt); err != nil {
  244. if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
  245. return 0, err
  246. }
  247. break
  248. }
  249. n++
  250. }
  251. return n, nil
  252. }
  253. // Wait implements stack.LinkEndpoint.Wait.
  254. func (*linkEndpoint) Wait() {}
  255. // ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
  256. func (*linkEndpoint) ARPHardwareType() header.ARPHardwareType {
  257. return header.ARPHardwareNone
  258. }
  259. // AddHeader implements stack.LinkEndpoint.AddHeader.
  260. func (*linkEndpoint) AddHeader(*stack.PacketBuffer) {}
  261. // ParseHeader implements stack.LinkEndpoint.ParseHeader.
  262. func (*linkEndpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
  263. // SetOnCloseAction implements stack.LinkEndpoint.
  264. func (*linkEndpoint) SetOnCloseAction(func()) {}