link_endpoint.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package netstack
  4. import (
  5. "context"
  6. "sync"
  7. "gvisor.dev/gvisor/pkg/tcpip"
  8. "gvisor.dev/gvisor/pkg/tcpip/header"
  9. "gvisor.dev/gvisor/pkg/tcpip/stack"
  10. "tailscale.com/net/packet"
  11. "tailscale.com/types/ipproto"
  12. "tailscale.com/wgengine/netstack/gro"
  13. )
  14. type queue struct {
  15. // TODO(jwhited): evaluate performance with a non-channel buffer.
  16. c chan *stack.PacketBuffer
  17. closeOnce sync.Once
  18. closedCh chan struct{}
  19. mu sync.RWMutex
  20. closed bool
  21. }
  22. func (q *queue) Close() {
  23. q.closeOnce.Do(func() {
  24. close(q.closedCh)
  25. })
  26. q.mu.Lock()
  27. defer q.mu.Unlock()
  28. if q.closed {
  29. return
  30. }
  31. close(q.c)
  32. q.closed = true
  33. }
  34. func (q *queue) Read() *stack.PacketBuffer {
  35. select {
  36. case p := <-q.c:
  37. return p
  38. default:
  39. return nil
  40. }
  41. }
  42. func (q *queue) ReadContext(ctx context.Context) *stack.PacketBuffer {
  43. select {
  44. case pkt := <-q.c:
  45. return pkt
  46. case <-ctx.Done():
  47. return nil
  48. }
  49. }
  50. func (q *queue) Write(pkt *stack.PacketBuffer) tcpip.Error {
  51. q.mu.RLock()
  52. defer q.mu.RUnlock()
  53. if q.closed {
  54. return &tcpip.ErrClosedForSend{}
  55. }
  56. select {
  57. case q.c <- pkt.IncRef():
  58. return nil
  59. case <-q.closedCh:
  60. pkt.DecRef()
  61. return &tcpip.ErrClosedForSend{}
  62. }
  63. }
  64. func (q *queue) Drain() int {
  65. c := 0
  66. for pkt := range q.c {
  67. pkt.DecRef()
  68. c++
  69. }
  70. return c
  71. }
  72. func (q *queue) Num() int {
  73. return len(q.c)
  74. }
  75. var _ stack.LinkEndpoint = (*linkEndpoint)(nil)
  76. var _ stack.GSOEndpoint = (*linkEndpoint)(nil)
  77. type supportedGRO int
  78. const (
  79. groNotSupported supportedGRO = iota
  80. tcpGROSupported
  81. )
  82. // linkEndpoint implements stack.LinkEndpoint and stack.GSOEndpoint. Outbound
  83. // packets written by gVisor towards Tailscale are stored in a channel.
  84. // Inbound is fed to gVisor via injectInbound or gro. This is loosely
  85. // modeled after gvisor.dev/pkg/tcpip/link/channel.Endpoint.
  86. type linkEndpoint struct {
  87. SupportedGSOKind stack.SupportedGSO
  88. supportedGRO supportedGRO
  89. mu sync.RWMutex // mu guards the following fields
  90. dispatcher stack.NetworkDispatcher
  91. linkAddr tcpip.LinkAddress
  92. mtu uint32
  93. q *queue // outbound
  94. }
  95. func newLinkEndpoint(size int, mtu uint32, linkAddr tcpip.LinkAddress, supportedGRO supportedGRO) *linkEndpoint {
  96. le := &linkEndpoint{
  97. supportedGRO: supportedGRO,
  98. q: &queue{
  99. c: make(chan *stack.PacketBuffer, size),
  100. closedCh: make(chan struct{}),
  101. },
  102. mtu: mtu,
  103. linkAddr: linkAddr,
  104. }
  105. return le
  106. }
  107. // gro attempts to enqueue p on g if l supports a GRO kind matching the
  108. // transport protocol carried in p. gro may allocate g if it is nil. gro can
  109. // either return the existing g, a newly allocated one, or nil. Callers are
  110. // responsible for calling Flush() on the returned value if it is non-nil once
  111. // they have finished iterating through all GRO candidates for a given vector.
  112. // If gro allocates a *gro.GRO it will have l's stack.NetworkDispatcher set via
  113. // SetDispatcher().
  114. func (l *linkEndpoint) gro(p *packet.Parsed, g *gro.GRO) *gro.GRO {
  115. if l.supportedGRO == groNotSupported || p.IPProto != ipproto.TCP {
  116. // IPv6 may have extension headers preceding a TCP header, but we trade
  117. // for a fast path and assume p cannot be coalesced in such a case.
  118. l.injectInbound(p)
  119. return g
  120. }
  121. if g == nil {
  122. l.mu.RLock()
  123. d := l.dispatcher
  124. l.mu.RUnlock()
  125. g = gro.NewGRO()
  126. g.SetDispatcher(d)
  127. }
  128. g.Enqueue(p)
  129. return g
  130. }
  131. // Close closes l. Further packet injections will return an error, and all
  132. // pending packets are discarded. Close may be called concurrently with
  133. // WritePackets.
  134. func (l *linkEndpoint) Close() {
  135. l.mu.Lock()
  136. l.dispatcher = nil
  137. l.mu.Unlock()
  138. l.q.Close()
  139. l.Drain()
  140. }
  141. // Read does non-blocking read one packet from the outbound packet queue.
  142. func (l *linkEndpoint) Read() *stack.PacketBuffer {
  143. return l.q.Read()
  144. }
  145. // ReadContext does blocking read for one packet from the outbound packet queue.
  146. // It can be cancelled by ctx, and in this case, it returns nil.
  147. func (l *linkEndpoint) ReadContext(ctx context.Context) *stack.PacketBuffer {
  148. return l.q.ReadContext(ctx)
  149. }
  150. // Drain removes all outbound packets from the channel and counts them.
  151. func (l *linkEndpoint) Drain() int {
  152. return l.q.Drain()
  153. }
  154. // NumQueued returns the number of packets queued for outbound.
  155. func (l *linkEndpoint) NumQueued() int {
  156. return l.q.Num()
  157. }
  158. func (l *linkEndpoint) injectInbound(p *packet.Parsed) {
  159. l.mu.RLock()
  160. d := l.dispatcher
  161. l.mu.RUnlock()
  162. if d == nil {
  163. return
  164. }
  165. pkt := gro.RXChecksumOffload(p)
  166. if pkt == nil {
  167. return
  168. }
  169. d.DeliverNetworkPacket(pkt.NetworkProtocolNumber, pkt)
  170. pkt.DecRef()
  171. }
  172. // Attach saves the stack network-layer dispatcher for use later when packets
  173. // are injected.
  174. func (l *linkEndpoint) Attach(dispatcher stack.NetworkDispatcher) {
  175. l.mu.Lock()
  176. defer l.mu.Unlock()
  177. l.dispatcher = dispatcher
  178. }
  179. // IsAttached implements stack.LinkEndpoint.IsAttached.
  180. func (l *linkEndpoint) IsAttached() bool {
  181. l.mu.RLock()
  182. defer l.mu.RUnlock()
  183. return l.dispatcher != nil
  184. }
  185. // MTU implements stack.LinkEndpoint.MTU.
  186. func (l *linkEndpoint) MTU() uint32 {
  187. l.mu.RLock()
  188. defer l.mu.RUnlock()
  189. return l.mtu
  190. }
  191. // SetMTU implements stack.LinkEndpoint.SetMTU.
  192. func (l *linkEndpoint) SetMTU(mtu uint32) {
  193. l.mu.Lock()
  194. defer l.mu.Unlock()
  195. l.mtu = mtu
  196. }
  197. // Capabilities implements stack.LinkEndpoint.Capabilities.
  198. func (l *linkEndpoint) Capabilities() stack.LinkEndpointCapabilities {
  199. // We are required to offload RX checksum validation for the purposes of
  200. // GRO.
  201. return stack.CapabilityRXChecksumOffload
  202. }
  203. // GSOMaxSize implements stack.GSOEndpoint.
  204. func (*linkEndpoint) GSOMaxSize() uint32 {
  205. // This an increase from 32k returned by channel.Endpoint.GSOMaxSize() to
  206. // 64k, which improves throughput.
  207. return (1 << 16) - 1
  208. }
  209. // SupportedGSO implements stack.GSOEndpoint.
  210. func (l *linkEndpoint) SupportedGSO() stack.SupportedGSO {
  211. return l.SupportedGSOKind
  212. }
  213. // MaxHeaderLength returns the maximum size of the link layer header. Given it
  214. // doesn't have a header, it just returns 0.
  215. func (*linkEndpoint) MaxHeaderLength() uint16 {
  216. return 0
  217. }
  218. // LinkAddress returns the link address of this endpoint.
  219. func (l *linkEndpoint) LinkAddress() tcpip.LinkAddress {
  220. l.mu.RLock()
  221. defer l.mu.RUnlock()
  222. return l.linkAddr
  223. }
  224. // SetLinkAddress implements stack.LinkEndpoint.SetLinkAddress.
  225. func (l *linkEndpoint) SetLinkAddress(addr tcpip.LinkAddress) {
  226. l.mu.Lock()
  227. defer l.mu.Unlock()
  228. l.linkAddr = addr
  229. }
  230. // WritePackets stores outbound packets into the channel.
  231. // Multiple concurrent calls are permitted.
  232. func (l *linkEndpoint) WritePackets(pkts stack.PacketBufferList) (int, tcpip.Error) {
  233. n := 0
  234. // TODO(jwhited): evaluate writing a stack.PacketBufferList instead of a
  235. // single packet. We can split 2 x 64K GSO across
  236. // wireguard-go/conn.IdealBatchSize (128 slots) @ 1280 MTU, and non-GSO we
  237. // could do more. Read API would need to change to take advantage. Verify
  238. // gVisor limits around max number of segments packed together. Since we
  239. // control MTU (and by effect TCP MSS in gVisor) we *shouldn't* expect to
  240. // ever overflow 128 slots (see wireguard-go/tun.ErrTooManySegments usage).
  241. for _, pkt := range pkts.AsSlice() {
  242. if err := l.q.Write(pkt); err != nil {
  243. if _, ok := err.(*tcpip.ErrNoBufferSpace); !ok && n == 0 {
  244. return 0, err
  245. }
  246. break
  247. }
  248. n++
  249. }
  250. return n, nil
  251. }
  252. // Wait implements stack.LinkEndpoint.Wait.
  253. func (*linkEndpoint) Wait() {}
  254. // ARPHardwareType implements stack.LinkEndpoint.ARPHardwareType.
  255. func (*linkEndpoint) ARPHardwareType() header.ARPHardwareType {
  256. return header.ARPHardwareNone
  257. }
  258. // AddHeader implements stack.LinkEndpoint.AddHeader.
  259. func (*linkEndpoint) AddHeader(*stack.PacketBuffer) {}
  260. // ParseHeader implements stack.LinkEndpoint.ParseHeader.
  261. func (*linkEndpoint) ParseHeader(*stack.PacketBuffer) bool { return true }
  262. // SetOnCloseAction implements stack.LinkEndpoint.
  263. func (*linkEndpoint) SetOnCloseAction(func()) {}