upload_queue.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. package splithttp
  2. // upload_queue is a specialized priorityqueue + channel to reorder generic
  3. // packets by a sequence number
  4. import (
  5. "container/heap"
  6. "io"
  7. "runtime"
  8. "sync"
  9. "github.com/xtls/xray-core/common/errors"
  10. )
  11. type Packet struct {
  12. Reader io.ReadCloser
  13. Payload []byte
  14. Seq uint64
  15. }
  16. type uploadQueue struct {
  17. reader io.ReadCloser
  18. pushedPackets chan Packet
  19. writeCloseMutex sync.Mutex
  20. heap uploadHeap
  21. nextSeq uint64
  22. closed bool
  23. maxPackets int
  24. }
  25. func NewUploadQueue(maxPackets int) *uploadQueue {
  26. return &uploadQueue{
  27. pushedPackets: make(chan Packet, maxPackets),
  28. heap: uploadHeap{},
  29. nextSeq: 0,
  30. closed: false,
  31. maxPackets: maxPackets,
  32. }
  33. }
  34. func (h *uploadQueue) Push(p Packet) error {
  35. h.writeCloseMutex.Lock()
  36. defer h.writeCloseMutex.Unlock()
  37. runtime.Gosched()
  38. if h.reader != nil && p.Reader != nil {
  39. p.Reader.Close()
  40. return errors.New("h.reader already exists")
  41. }
  42. if h.closed {
  43. if p.Reader != nil {
  44. p.Reader.Close()
  45. }
  46. return errors.New("splithttp packet queue closed")
  47. }
  48. h.pushedPackets <- p
  49. return nil
  50. }
  51. func (h *uploadQueue) Close() error {
  52. h.writeCloseMutex.Lock()
  53. defer h.writeCloseMutex.Unlock()
  54. if !h.closed {
  55. h.closed = true
  56. close(h.pushedPackets)
  57. }
  58. runtime.Gosched()
  59. if h.reader != nil {
  60. return h.reader.Close()
  61. }
  62. return nil
  63. }
  64. func (h *uploadQueue) Read(b []byte) (int, error) {
  65. if h.reader != nil {
  66. return h.reader.Read(b)
  67. }
  68. if h.closed {
  69. return 0, io.EOF
  70. }
  71. if len(h.heap) == 0 {
  72. packet, more := <-h.pushedPackets
  73. if !more {
  74. return 0, io.EOF
  75. }
  76. if packet.Reader != nil {
  77. h.reader = packet.Reader
  78. return h.reader.Read(b)
  79. }
  80. heap.Push(&h.heap, packet)
  81. }
  82. for len(h.heap) > 0 {
  83. packet := heap.Pop(&h.heap).(Packet)
  84. n := 0
  85. if packet.Seq == h.nextSeq {
  86. copy(b, packet.Payload)
  87. n = min(len(b), len(packet.Payload))
  88. if n < len(packet.Payload) {
  89. // partial read
  90. packet.Payload = packet.Payload[n:]
  91. heap.Push(&h.heap, packet)
  92. } else {
  93. h.nextSeq = packet.Seq + 1
  94. }
  95. return n, nil
  96. }
  97. // misordered packet
  98. if packet.Seq > h.nextSeq {
  99. if len(h.heap) > h.maxPackets {
  100. // the "reassembly buffer" is too large, and we want to
  101. // constrain memory usage somehow. let's tear down the
  102. // connection, and hope the application retries.
  103. return 0, errors.New("packet queue is too large")
  104. }
  105. heap.Push(&h.heap, packet)
  106. packet2, more := <-h.pushedPackets
  107. if !more {
  108. return 0, io.EOF
  109. }
  110. heap.Push(&h.heap, packet2)
  111. }
  112. }
  113. return 0, nil
  114. }
  115. // heap code directly taken from https://pkg.go.dev/container/heap
  116. type uploadHeap []Packet
  117. func (h uploadHeap) Len() int { return len(h) }
  118. func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
  119. func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  120. func (h *uploadHeap) Push(x any) {
  121. // Push and Pop use pointer receivers because they modify the slice's length,
  122. // not just its contents.
  123. *h = append(*h, x.(Packet))
  124. }
  125. func (h *uploadHeap) Pop() any {
  126. old := *h
  127. n := len(old)
  128. x := old[n-1]
  129. *h = old[0 : n-1]
  130. return x
  131. }