conn_linux.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package batching
  4. import (
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "net"
  9. "net/netip"
  10. "runtime"
  11. "strings"
  12. "sync"
  13. "sync/atomic"
  14. "syscall"
  15. "time"
  16. "unsafe"
  17. "golang.org/x/net/ipv4"
  18. "golang.org/x/net/ipv6"
  19. "golang.org/x/sys/unix"
  20. "tailscale.com/hostinfo"
  21. "tailscale.com/net/neterror"
  22. "tailscale.com/net/packet"
  23. "tailscale.com/types/nettype"
  24. )
  25. // xnetBatchReaderWriter defines the batching i/o methods of
  26. // golang.org/x/net/ipv4.PacketConn (and ipv6.PacketConn).
  27. // TODO(jwhited): This should eventually be replaced with the standard library
  28. // implementation of https://github.com/golang/go/issues/45886
  29. type xnetBatchReaderWriter interface {
  30. xnetBatchReader
  31. xnetBatchWriter
  32. }
  33. type xnetBatchReader interface {
  34. ReadBatch([]ipv6.Message, int) (int, error)
  35. }
  36. type xnetBatchWriter interface {
  37. WriteBatch([]ipv6.Message, int) (int, error)
  38. }
  39. var (
  40. // [linuxBatchingConn] implements [Conn].
  41. _ Conn = (*linuxBatchingConn)(nil)
  42. )
  43. // linuxBatchingConn is a UDP socket that provides batched i/o. It implements
  44. // [Conn].
  45. type linuxBatchingConn struct {
  46. pc *net.UDPConn
  47. xpc xnetBatchReaderWriter
  48. rxOffload bool // supports UDP GRO or similar
  49. txOffload atomic.Bool // supports UDP GSO or similar
  50. setGSOSizeInControl func(control *[]byte, gsoSize uint16) // typically setGSOSizeInControl(); swappable for testing
  51. getGSOSizeFromControl func(control []byte) (int, error) // typically getGSOSizeFromControl(); swappable for testing
  52. sendBatchPool sync.Pool
  53. }
  54. func (c *linuxBatchingConn) ReadFromUDPAddrPort(p []byte) (n int, addr netip.AddrPort, err error) {
  55. if c.rxOffload {
  56. // UDP_GRO is opt-in on Linux via setsockopt(). Once enabled you may
  57. // receive a "monster datagram" from any read call. The ReadFrom() API
  58. // does not support passing the GSO size and is unsafe to use in such a
  59. // case. Other platforms may vary in behavior, but we go with the most
  60. // conservative approach to prevent this from becoming a footgun in the
  61. // future.
  62. return 0, netip.AddrPort{}, errors.New("rx UDP offload is enabled on this socket, single packet reads are unavailable")
  63. }
  64. return c.pc.ReadFromUDPAddrPort(p)
  65. }
  66. func (c *linuxBatchingConn) SetDeadline(t time.Time) error {
  67. return c.pc.SetDeadline(t)
  68. }
  69. func (c *linuxBatchingConn) SetReadDeadline(t time.Time) error {
  70. return c.pc.SetReadDeadline(t)
  71. }
  72. func (c *linuxBatchingConn) SetWriteDeadline(t time.Time) error {
  73. return c.pc.SetWriteDeadline(t)
  74. }
  75. const (
  76. // This was initially established for Linux, but may split out to
  77. // GOOS-specific values later. It originates as UDP_MAX_SEGMENTS in the
  78. // kernel's TX path, and UDP_GRO_CNT_MAX for RX.
  79. udpSegmentMaxDatagrams = 64
  80. )
  81. const (
  82. // Exceeding these values results in EMSGSIZE.
  83. maxIPv4PayloadLen = 1<<16 - 1 - 20 - 8
  84. maxIPv6PayloadLen = 1<<16 - 1 - 8
  85. )
  86. // coalesceMessages iterates 'buffs', setting and coalescing them in 'msgs'
  87. // where possible while maintaining datagram order.
  88. //
  89. // All msgs have their Addr field set to addr.
  90. //
  91. // All msgs[i].Buffers[0] are preceded by a Geneve header (geneve) if geneve.VNI.IsSet().
  92. func (c *linuxBatchingConn) coalesceMessages(addr *net.UDPAddr, geneve packet.GeneveHeader, buffs [][]byte, msgs []ipv6.Message, offset int) int {
  93. var (
  94. base = -1 // index of msg we are currently coalescing into
  95. gsoSize int // segmentation size of msgs[base]
  96. dgramCnt int // number of dgrams coalesced into msgs[base]
  97. endBatch bool // tracking flag to start a new batch on next iteration of buffs
  98. )
  99. maxPayloadLen := maxIPv4PayloadLen
  100. if addr.IP.To4() == nil {
  101. maxPayloadLen = maxIPv6PayloadLen
  102. }
  103. vniIsSet := geneve.VNI.IsSet()
  104. for i, buff := range buffs {
  105. if vniIsSet {
  106. geneve.Encode(buff)
  107. } else {
  108. buff = buff[offset:]
  109. }
  110. if i > 0 {
  111. msgLen := len(buff)
  112. baseLenBefore := len(msgs[base].Buffers[0])
  113. freeBaseCap := cap(msgs[base].Buffers[0]) - baseLenBefore
  114. if msgLen+baseLenBefore <= maxPayloadLen &&
  115. msgLen <= gsoSize &&
  116. msgLen <= freeBaseCap &&
  117. dgramCnt < udpSegmentMaxDatagrams &&
  118. !endBatch {
  119. msgs[base].Buffers[0] = append(msgs[base].Buffers[0], make([]byte, msgLen)...)
  120. copy(msgs[base].Buffers[0][baseLenBefore:], buff)
  121. if i == len(buffs)-1 {
  122. c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize))
  123. }
  124. dgramCnt++
  125. if msgLen < gsoSize {
  126. // A smaller than gsoSize packet on the tail is legal, but
  127. // it must end the batch.
  128. endBatch = true
  129. }
  130. continue
  131. }
  132. }
  133. if dgramCnt > 1 {
  134. c.setGSOSizeInControl(&msgs[base].OOB, uint16(gsoSize))
  135. }
  136. // Reset prior to incrementing base since we are preparing to start a
  137. // new potential batch.
  138. endBatch = false
  139. base++
  140. gsoSize = len(buff)
  141. msgs[base].OOB = msgs[base].OOB[:0]
  142. msgs[base].Buffers[0] = buff
  143. msgs[base].Addr = addr
  144. dgramCnt = 1
  145. }
  146. return base + 1
  147. }
  148. type sendBatch struct {
  149. msgs []ipv6.Message
  150. ua *net.UDPAddr
  151. }
  152. func (c *linuxBatchingConn) getSendBatch() *sendBatch {
  153. batch := c.sendBatchPool.Get().(*sendBatch)
  154. return batch
  155. }
  156. func (c *linuxBatchingConn) putSendBatch(batch *sendBatch) {
  157. for i := range batch.msgs {
  158. batch.msgs[i] = ipv6.Message{Buffers: batch.msgs[i].Buffers, OOB: batch.msgs[i].OOB}
  159. }
  160. c.sendBatchPool.Put(batch)
  161. }
  162. func (c *linuxBatchingConn) WriteBatchTo(buffs [][]byte, addr netip.AddrPort, geneve packet.GeneveHeader, offset int) error {
  163. batch := c.getSendBatch()
  164. defer c.putSendBatch(batch)
  165. if addr.Addr().Is6() {
  166. as16 := addr.Addr().As16()
  167. copy(batch.ua.IP, as16[:])
  168. batch.ua.IP = batch.ua.IP[:16]
  169. } else {
  170. as4 := addr.Addr().As4()
  171. copy(batch.ua.IP, as4[:])
  172. batch.ua.IP = batch.ua.IP[:4]
  173. }
  174. batch.ua.Port = int(addr.Port())
  175. var (
  176. n int
  177. retried bool
  178. )
  179. retry:
  180. if c.txOffload.Load() {
  181. n = c.coalesceMessages(batch.ua, geneve, buffs, batch.msgs, offset)
  182. } else {
  183. vniIsSet := geneve.VNI.IsSet()
  184. if vniIsSet {
  185. offset -= packet.GeneveFixedHeaderLength
  186. }
  187. for i := range buffs {
  188. if vniIsSet {
  189. geneve.Encode(buffs[i])
  190. }
  191. batch.msgs[i].Buffers[0] = buffs[i][offset:]
  192. batch.msgs[i].Addr = batch.ua
  193. batch.msgs[i].OOB = batch.msgs[i].OOB[:0]
  194. }
  195. n = len(buffs)
  196. }
  197. err := c.writeBatch(batch.msgs[:n])
  198. if err != nil && c.txOffload.Load() && neterror.ShouldDisableUDPGSO(err) {
  199. c.txOffload.Store(false)
  200. retried = true
  201. goto retry
  202. }
  203. if retried {
  204. return neterror.ErrUDPGSODisabled{OnLaddr: c.pc.LocalAddr().String(), RetryErr: err}
  205. }
  206. return err
  207. }
  208. func (c *linuxBatchingConn) SyscallConn() (syscall.RawConn, error) {
  209. return c.pc.SyscallConn()
  210. }
  211. func (c *linuxBatchingConn) writeBatch(msgs []ipv6.Message) error {
  212. var head int
  213. for {
  214. n, err := c.xpc.WriteBatch(msgs[head:], 0)
  215. if err != nil || n == len(msgs[head:]) {
  216. // Returning the number of packets written would require
  217. // unraveling individual msg len and gso size during a coalesced
  218. // write. The top of the call stack disregards partial success,
  219. // so keep this simple for now.
  220. return err
  221. }
  222. head += n
  223. }
  224. }
  225. // splitCoalescedMessages splits coalesced messages from the tail of dst
  226. // beginning at index 'firstMsgAt' into the head of the same slice. It reports
  227. // the number of elements to evaluate in msgs for nonzero len (msgs[i].N). An
  228. // error is returned if a socket control message cannot be parsed or a split
  229. // operation would overflow msgs.
  230. func (c *linuxBatchingConn) splitCoalescedMessages(msgs []ipv6.Message, firstMsgAt int) (n int, err error) {
  231. for i := firstMsgAt; i < len(msgs); i++ {
  232. msg := &msgs[i]
  233. if msg.N == 0 {
  234. return n, err
  235. }
  236. var (
  237. gsoSize int
  238. start int
  239. end = msg.N
  240. numToSplit = 1
  241. )
  242. gsoSize, err = c.getGSOSizeFromControl(msg.OOB[:msg.NN])
  243. if err != nil {
  244. return n, err
  245. }
  246. if gsoSize > 0 {
  247. numToSplit = (msg.N + gsoSize - 1) / gsoSize
  248. end = gsoSize
  249. }
  250. for j := 0; j < numToSplit; j++ {
  251. if n > i {
  252. return n, errors.New("splitting coalesced packet resulted in overflow")
  253. }
  254. copied := copy(msgs[n].Buffers[0], msg.Buffers[0][start:end])
  255. msgs[n].N = copied
  256. msgs[n].Addr = msg.Addr
  257. start = end
  258. end += gsoSize
  259. if end > msg.N {
  260. end = msg.N
  261. }
  262. n++
  263. }
  264. if i != n-1 {
  265. // It is legal for bytes to move within msg.Buffers[0] as a result
  266. // of splitting, so we only zero the source msg len when it is not
  267. // the destination of the last split operation above.
  268. msg.N = 0
  269. }
  270. }
  271. return n, nil
  272. }
  273. func (c *linuxBatchingConn) ReadBatch(msgs []ipv6.Message, flags int) (n int, err error) {
  274. if !c.rxOffload || len(msgs) < 2 {
  275. return c.xpc.ReadBatch(msgs, flags)
  276. }
  277. // Read into the tail of msgs, split into the head.
  278. readAt := len(msgs) - 2
  279. numRead, err := c.xpc.ReadBatch(msgs[readAt:], 0)
  280. if err != nil || numRead == 0 {
  281. return 0, err
  282. }
  283. return c.splitCoalescedMessages(msgs, readAt)
  284. }
  285. func (c *linuxBatchingConn) LocalAddr() net.Addr {
  286. return c.pc.LocalAddr().(*net.UDPAddr)
  287. }
  288. func (c *linuxBatchingConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) {
  289. return c.pc.WriteToUDPAddrPort(b, addr)
  290. }
  291. func (c *linuxBatchingConn) Close() error {
  292. return c.pc.Close()
  293. }
  294. // tryEnableUDPOffload attempts to enable the UDP_GRO socket option on pconn,
  295. // and returns two booleans indicating TX and RX UDP offload support.
  296. func tryEnableUDPOffload(pconn nettype.PacketConn) (hasTX bool, hasRX bool) {
  297. if c, ok := pconn.(*net.UDPConn); ok {
  298. rc, err := c.SyscallConn()
  299. if err != nil {
  300. return
  301. }
  302. err = rc.Control(func(fd uintptr) {
  303. _, errSyscall := syscall.GetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_SEGMENT)
  304. hasTX = errSyscall == nil
  305. errSyscall = syscall.SetsockoptInt(int(fd), unix.IPPROTO_UDP, unix.UDP_GRO, 1)
  306. hasRX = errSyscall == nil
  307. })
  308. if err != nil {
  309. return false, false
  310. }
  311. }
  312. return hasTX, hasRX
  313. }
  314. // getGSOSizeFromControl returns the GSO size found in control. If no GSO size
  315. // is found or the len(control) < unix.SizeofCmsghdr, this function returns 0.
  316. // A non-nil error will be returned if len(control) > unix.SizeofCmsghdr but
  317. // its contents cannot be parsed as a socket control message.
  318. func getGSOSizeFromControl(control []byte) (int, error) {
  319. var (
  320. hdr unix.Cmsghdr
  321. data []byte
  322. rem = control
  323. err error
  324. )
  325. for len(rem) > unix.SizeofCmsghdr {
  326. hdr, data, rem, err = unix.ParseOneSocketControlMessage(rem)
  327. if err != nil {
  328. return 0, fmt.Errorf("error parsing socket control message: %w", err)
  329. }
  330. if hdr.Level == unix.SOL_UDP && hdr.Type == unix.UDP_GRO && len(data) >= 2 {
  331. return int(binary.NativeEndian.Uint16(data[:2])), nil
  332. }
  333. }
  334. return 0, nil
  335. }
  336. // setGSOSizeInControl sets a socket control message in control containing
  337. // gsoSize. If len(control) < controlMessageSize control's len will be set to 0.
  338. func setGSOSizeInControl(control *[]byte, gsoSize uint16) {
  339. *control = (*control)[:0]
  340. if cap(*control) < int(unsafe.Sizeof(unix.Cmsghdr{})) {
  341. return
  342. }
  343. if cap(*control) < controlMessageSize {
  344. return
  345. }
  346. *control = (*control)[:cap(*control)]
  347. hdr := (*unix.Cmsghdr)(unsafe.Pointer(&(*control)[0]))
  348. hdr.Level = unix.SOL_UDP
  349. hdr.Type = unix.UDP_SEGMENT
  350. hdr.SetLen(unix.CmsgLen(2))
  351. binary.NativeEndian.PutUint16((*control)[unix.SizeofCmsghdr:], gsoSize)
  352. *control = (*control)[:unix.CmsgSpace(2)]
  353. }
  354. // TryUpgradeToConn probes the capabilities of the OS and pconn, and upgrades
  355. // pconn to a [Conn] if appropriate. A batch size of [IdealBatchSize] is
  356. // suggested for the best performance.
  357. func TryUpgradeToConn(pconn nettype.PacketConn, network string, batchSize int) nettype.PacketConn {
  358. if runtime.GOOS != "linux" {
  359. // Exclude Android.
  360. return pconn
  361. }
  362. if network != "udp4" && network != "udp6" {
  363. return pconn
  364. }
  365. if strings.HasPrefix(hostinfo.GetOSVersion(), "2.") {
  366. // recvmmsg/sendmmsg were added in 2.6.33, but we support down to
  367. // 2.6.32 for old NAS devices. See https://github.com/tailscale/tailscale/issues/6807.
  368. // As a cheap heuristic: if the Linux kernel starts with "2", just
  369. // consider it too old for mmsg. Nobody who cares about performance runs
  370. // such ancient kernels. UDP offload was added much later, so no
  371. // upgrades are available.
  372. return pconn
  373. }
  374. uc, ok := pconn.(*net.UDPConn)
  375. if !ok {
  376. return pconn
  377. }
  378. b := &linuxBatchingConn{
  379. pc: uc,
  380. getGSOSizeFromControl: getGSOSizeFromControl,
  381. setGSOSizeInControl: setGSOSizeInControl,
  382. sendBatchPool: sync.Pool{
  383. New: func() any {
  384. ua := &net.UDPAddr{
  385. IP: make([]byte, 16),
  386. }
  387. msgs := make([]ipv6.Message, batchSize)
  388. for i := range msgs {
  389. msgs[i].Buffers = make([][]byte, 1)
  390. msgs[i].Addr = ua
  391. msgs[i].OOB = make([]byte, controlMessageSize)
  392. }
  393. return &sendBatch{
  394. ua: ua,
  395. msgs: msgs,
  396. }
  397. },
  398. },
  399. }
  400. switch network {
  401. case "udp4":
  402. b.xpc = ipv4.NewPacketConn(uc)
  403. case "udp6":
  404. b.xpc = ipv6.NewPacketConn(uc)
  405. default:
  406. panic("bogus network")
  407. }
  408. var txOffload bool
  409. txOffload, b.rxOffload = tryEnableUDPOffload(uc)
  410. b.txOffload.Store(txOffload)
  411. return b
  412. }
  413. var controlMessageSize = -1 // bomb if used for allocation before init
  414. func init() {
  415. // controlMessageSize is set to hold a UDP_GRO or UDP_SEGMENT control
  416. // message. These contain a single uint16 of data.
  417. controlMessageSize = unix.CmsgSpace(2)
  418. }
  419. // MinControlMessageSize returns the minimum control message size required to
  420. // support read batching via [Conn.ReadBatch].
  421. func MinControlMessageSize() int {
  422. return controlMessageSize
  423. }
  424. const IdealBatchSize = 128