upload_queue.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. package splithttp
  2. // upload_queue is a specialized priorityqueue + channel to reorder generic
  3. // packets by a sequence number
  4. import (
  5. "container/heap"
  6. "io"
  7. "github.com/xtls/xray-core/common/errors"
  8. )
  9. type Packet struct {
  10. Payload []byte
  11. Seq uint64
  12. }
  13. type UploadQueue struct {
  14. pushedPackets chan Packet
  15. heap uploadHeap
  16. nextSeq uint64
  17. closed bool
  18. maxPackets int
  19. }
  20. func NewUploadQueue(maxPackets int) *UploadQueue {
  21. return &UploadQueue{
  22. pushedPackets: make(chan Packet, maxPackets),
  23. heap: uploadHeap{},
  24. nextSeq: 0,
  25. closed: false,
  26. maxPackets: maxPackets,
  27. }
  28. }
  29. func (h *UploadQueue) Push(p Packet) error {
  30. if h.closed {
  31. return errors.New("splithttp packet queue closed")
  32. }
  33. h.pushedPackets <- p
  34. return nil
  35. }
  36. func (h *UploadQueue) Close() error {
  37. h.closed = true
  38. close(h.pushedPackets)
  39. return nil
  40. }
  41. func (h *UploadQueue) Read(b []byte) (int, error) {
  42. if h.closed {
  43. return 0, io.EOF
  44. }
  45. if len(h.heap) == 0 {
  46. packet, more := <-h.pushedPackets
  47. if !more {
  48. return 0, io.EOF
  49. }
  50. heap.Push(&h.heap, packet)
  51. }
  52. for len(h.heap) > 0 {
  53. packet := heap.Pop(&h.heap).(Packet)
  54. n := 0
  55. if packet.Seq == h.nextSeq {
  56. copy(b, packet.Payload)
  57. n = min(len(b), len(packet.Payload))
  58. if n < len(packet.Payload) {
  59. // partial read
  60. packet.Payload = packet.Payload[n:]
  61. heap.Push(&h.heap, packet)
  62. } else {
  63. h.nextSeq = packet.Seq + 1
  64. }
  65. return n, nil
  66. }
  67. // misordered packet
  68. if packet.Seq > h.nextSeq {
  69. if len(h.heap) > h.maxPackets {
  70. // the "reassembly buffer" is too large, and we want to
  71. // constrain memory usage somehow. let's tear down the
  72. // connection, and hope the application retries.
  73. return 0, errors.New("packet queue is too large")
  74. }
  75. heap.Push(&h.heap, packet)
  76. packet2, more := <-h.pushedPackets
  77. if !more {
  78. return 0, io.EOF
  79. }
  80. heap.Push(&h.heap, packet2)
  81. }
  82. }
  83. return 0, nil
  84. }
  85. // heap code directly taken from https://pkg.go.dev/container/heap
  86. type uploadHeap []Packet
  87. func (h uploadHeap) Len() int { return len(h) }
  88. func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
  89. func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  90. func (h *uploadHeap) Push(x any) {
  91. // Push and Pop use pointer receivers because they modify the slice's length,
  92. // not just its contents.
  93. *h = append(*h, x.(Packet))
  94. }
  95. func (h *uploadHeap) Pop() any {
  96. old := *h
  97. n := len(old)
  98. x := old[n-1]
  99. *h = old[0 : n-1]
  100. return x
  101. }