scheduler.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package proxy
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "math/big"
  6. "sync"
  7. "time"
  8. "github.com/xtls/xray-core/common/buf"
  9. "github.com/xtls/xray-core/common/errors"
  10. )
  11. type Scheduler struct {
  12. Buffer chan buf.MultiBuffer
  13. Trigger chan int
  14. Error chan error
  15. closed chan int
  16. bufferReadLock *sync.Mutex
  17. writer buf.Writer
  18. addons *Addons
  19. trafficState *TrafficState
  20. writeOnceUserUUID *[]byte
  21. ctx context.Context
  22. }
  23. func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler {
  24. var s = Scheduler{
  25. Buffer: make(chan buf.MultiBuffer, 100),
  26. Trigger: make(chan int),
  27. Error: make(chan error, 100),
  28. closed: make(chan int),
  29. bufferReadLock: new(sync.Mutex),
  30. writer: w,
  31. addons: addon,
  32. trafficState: state,
  33. writeOnceUserUUID: userUUID,
  34. ctx: context,
  35. }
  36. go s.mainLoop()
  37. if s.addons.Scheduler != nil {
  38. go s.exampleIndependentScheduler()
  39. }
  40. return &s
  41. }
  42. func(s *Scheduler) mainLoop() {
  43. for trigger := range s.Trigger {
  44. if len(s.closed) > 0 {
  45. return
  46. }
  47. go func() { // each trigger has independent delay, trigger does not block
  48. var d = 0 * time.Millisecond
  49. if s.addons.Delay != nil {
  50. l, err := rand.Int(rand.Reader, big.NewInt(int64(s.addons.Delay.MaxMillis - s.addons.Delay.MinMillis)))
  51. if err != nil {
  52. errors.LogWarningInner(s.ctx, err, "failed to generate delay", trigger)
  53. }
  54. d = time.Duration(uint32(l.Int64()) + s.addons.Delay.MinMillis) * time.Millisecond
  55. time.Sleep(d)
  56. }
  57. s.bufferReadLock.Lock() // guard against multiple trigger threads
  58. var sending = len(s.Buffer)
  59. if sending > 0 {
  60. errors.LogDebug(s.ctx, "Scheduler Trigger for ", sending, " buffer(s) with ", d, " ", trigger)
  61. for i := 0; i<sending; i++ {
  62. err := s.writer.WriteMultiBuffer(<-s.Buffer)
  63. if err != nil {
  64. s.Error <- err
  65. s.closed <- 1
  66. return
  67. }
  68. }
  69. } else if trigger > 0 && (s.trafficState.Inbound.IsPadding || s.trafficState.Outbound.IsPadding) && ShouldStartSeed(s.addons, s.trafficState) && !ShouldStopSeed(s.addons, s.trafficState) {
  70. errors.LogDebug(s.ctx, "Scheduler Trigger for fake buffer with ", d, " ", trigger)
  71. s.trafficState.NumberOfPacketSent += 1
  72. mb := make(buf.MultiBuffer, 1)
  73. mb[0] = XtlsPadding(nil, CommandPaddingContinue, s.writeOnceUserUUID, true, s.addons, s.ctx)
  74. s.trafficState.ByteSent += int64(mb.Len())
  75. if s.trafficState.StartTime.IsZero() {
  76. s.trafficState.StartTime = time.Now()
  77. }
  78. err := s.writer.WriteMultiBuffer(mb)
  79. if err != nil {
  80. s.Error <- err
  81. s.closed <- 1
  82. return
  83. }
  84. if buffered, ok := s.writer.(*buf.BufferedWriter); ok {
  85. buffered.SetBuffered(false)
  86. }
  87. }
  88. s.bufferReadLock.Unlock()
  89. }()
  90. }
  91. }
  92. func(s *Scheduler) exampleIndependentScheduler() {
  93. for {
  94. if len(s.closed) > 0 {
  95. return
  96. }
  97. s.Trigger <- 1 // send fake buffer if no pending
  98. time.Sleep(500 * time.Millisecond)
  99. }
  100. }