upload_queue.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package splithttp
  2. // upload_queue is a specialized priorityqueue + channel to reorder generic
  3. // packets by a sequence number
  4. import (
  5. "container/heap"
  6. "io"
  7. )
  8. type Packet struct {
  9. Payload []byte
  10. Seq uint64
  11. }
  12. type UploadQueue struct {
  13. pushedPackets chan Packet
  14. heap uploadHeap
  15. nextSeq uint64
  16. closed bool
  17. maxPackets int
  18. }
  19. func NewUploadQueue(maxPackets int) *UploadQueue {
  20. return &UploadQueue{
  21. pushedPackets: make(chan Packet, maxPackets),
  22. heap: uploadHeap{},
  23. nextSeq: 0,
  24. closed: false,
  25. maxPackets: maxPackets,
  26. }
  27. }
  28. func (h *UploadQueue) Push(p Packet) error {
  29. if h.closed {
  30. return newError("splithttp packet queue closed")
  31. }
  32. h.pushedPackets <- p
  33. return nil
  34. }
  35. func (h *UploadQueue) Close() error {
  36. h.closed = true
  37. close(h.pushedPackets)
  38. return nil
  39. }
  40. func (h *UploadQueue) Read(b []byte) (int, error) {
  41. if h.closed {
  42. return 0, io.EOF
  43. }
  44. if len(h.heap) == 0 {
  45. packet, more := <-h.pushedPackets
  46. if !more {
  47. return 0, io.EOF
  48. }
  49. heap.Push(&h.heap, packet)
  50. }
  51. for len(h.heap) > 0 {
  52. packet := heap.Pop(&h.heap).(Packet)
  53. n := 0
  54. if packet.Seq == h.nextSeq {
  55. copy(b, packet.Payload)
  56. n = min(len(b), len(packet.Payload))
  57. if n < len(packet.Payload) {
  58. // partial read
  59. packet.Payload = packet.Payload[n:]
  60. heap.Push(&h.heap, packet)
  61. } else {
  62. h.nextSeq = packet.Seq + 1
  63. }
  64. return n, nil
  65. }
  66. // misordered packet
  67. if packet.Seq > h.nextSeq {
  68. if len(h.heap) > h.maxPackets {
  69. // the "reassembly buffer" is too large, and we want to
  70. // constrain memory usage somehow. let's tear down the
  71. // connection, and hope the application retries.
  72. return 0, newError("packet queue is too large")
  73. }
  74. heap.Push(&h.heap, packet)
  75. packet2, more := <-h.pushedPackets
  76. if !more {
  77. return 0, io.EOF
  78. }
  79. heap.Push(&h.heap, packet2)
  80. }
  81. }
  82. return 0, nil
  83. }
  84. // heap code directly taken from https://pkg.go.dev/container/heap
  85. type uploadHeap []Packet
  86. func (h uploadHeap) Len() int { return len(h) }
  87. func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
  88. func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
  89. func (h *uploadHeap) Push(x any) {
  90. // Push and Pop use pointer receivers because they modify the slice's length,
  91. // not just its contents.
  92. *h = append(*h, x.(Packet))
  93. }
  94. func (h *uploadHeap) Pop() any {
  95. old := *h
  96. n := len(old)
  97. x := old[n-1]
  98. *h = old[0 : n-1]
  99. return x
  100. }