1
0

impl.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. package pipe
  2. import (
  3. "errors"
  4. "io"
  5. "runtime"
  6. "sync"
  7. "time"
  8. "github.com/xtls/xray-core/common"
  9. "github.com/xtls/xray-core/common/buf"
  10. "github.com/xtls/xray-core/common/signal"
  11. "github.com/xtls/xray-core/common/signal/done"
  12. )
  13. type state byte
  14. const (
  15. open state = iota
  16. closed
  17. errord
  18. )
  19. type pipeOption struct {
  20. limit int32 // maximum buffer size in bytes
  21. discardOverflow bool
  22. onTransmission func(buffer buf.MultiBuffer) buf.MultiBuffer
  23. }
  24. func (o *pipeOption) isFull(curSize int32) bool {
  25. return o.limit >= 0 && curSize > o.limit
  26. }
  27. type pipe struct {
  28. sync.Mutex
  29. data buf.MultiBuffer
  30. readSignal *signal.Notifier
  31. writeSignal *signal.Notifier
  32. done *done.Instance
  33. option pipeOption
  34. state state
  35. }
  36. var (
  37. errBufferFull = errors.New("buffer full")
  38. errSlowDown = errors.New("slow down")
  39. )
  40. func (p *pipe) getState(forRead bool) error {
  41. switch p.state {
  42. case open:
  43. if !forRead && p.option.isFull(p.data.Len()) {
  44. return errBufferFull
  45. }
  46. return nil
  47. case closed:
  48. if !forRead {
  49. return io.ErrClosedPipe
  50. }
  51. if !p.data.IsEmpty() {
  52. return nil
  53. }
  54. return io.EOF
  55. case errord:
  56. return io.ErrClosedPipe
  57. default:
  58. panic("impossible case")
  59. }
  60. }
  61. func (p *pipe) readMultiBufferInternal() (buf.MultiBuffer, error) {
  62. p.Lock()
  63. defer p.Unlock()
  64. if err := p.getState(true); err != nil {
  65. return nil, err
  66. }
  67. data := p.data
  68. p.data = nil
  69. return data, nil
  70. }
  71. func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
  72. for {
  73. data, err := p.readMultiBufferInternal()
  74. if data != nil || err != nil {
  75. p.writeSignal.Signal()
  76. return data, err
  77. }
  78. select {
  79. case <-p.readSignal.Wait():
  80. case <-p.done.Wait():
  81. }
  82. }
  83. }
  84. func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
  85. timer := time.NewTimer(d)
  86. defer timer.Stop()
  87. for {
  88. data, err := p.readMultiBufferInternal()
  89. if data != nil || err != nil {
  90. p.writeSignal.Signal()
  91. return data, err
  92. }
  93. select {
  94. case <-p.readSignal.Wait():
  95. case <-p.done.Wait():
  96. case <-timer.C:
  97. return nil, buf.ErrReadTimeout
  98. }
  99. }
  100. }
  101. func (p *pipe) writeMultiBufferInternal(mb buf.MultiBuffer) error {
  102. p.Lock()
  103. defer p.Unlock()
  104. if err := p.getState(false); err != nil {
  105. return err
  106. }
  107. if p.data == nil {
  108. p.data = mb
  109. return nil
  110. }
  111. p.data, _ = buf.MergeMulti(p.data, mb)
  112. return errSlowDown
  113. }
  114. func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
  115. if mb.IsEmpty() {
  116. return nil
  117. }
  118. if p.option.onTransmission != nil {
  119. mb = p.option.onTransmission(mb)
  120. }
  121. for {
  122. err := p.writeMultiBufferInternal(mb)
  123. if err == nil {
  124. p.readSignal.Signal()
  125. return nil
  126. }
  127. if err == errSlowDown {
  128. p.readSignal.Signal()
  129. // Yield current goroutine. Hopefully the reading counterpart can pick up the payload.
  130. runtime.Gosched()
  131. return nil
  132. }
  133. if err == errBufferFull && p.option.discardOverflow {
  134. buf.ReleaseMulti(mb)
  135. return nil
  136. }
  137. if err != errBufferFull {
  138. buf.ReleaseMulti(mb)
  139. p.readSignal.Signal()
  140. return err
  141. }
  142. select {
  143. case <-p.writeSignal.Wait():
  144. case <-p.done.Wait():
  145. return io.ErrClosedPipe
  146. }
  147. }
  148. }
  149. func (p *pipe) Close() error {
  150. p.Lock()
  151. defer p.Unlock()
  152. if p.state == closed || p.state == errord {
  153. return nil
  154. }
  155. p.state = closed
  156. common.Must(p.done.Close())
  157. return nil
  158. }
  159. // Interrupt implements common.Interruptible.
  160. func (p *pipe) Interrupt() {
  161. p.Lock()
  162. defer p.Unlock()
  163. if p.state == closed || p.state == errord {
  164. return
  165. }
  166. p.state = errord
  167. if !p.data.IsEmpty() {
  168. buf.ReleaseMulti(p.data)
  169. p.data = nil
  170. }
  171. common.Must(p.done.Close())
  172. }