|
|
@@ -12,25 +12,29 @@ import (
|
|
|
)
|
|
|
|
|
|
type Scheduler struct {
|
|
|
- Buffer chan buf.MultiBuffer
|
|
|
- Trigger chan int
|
|
|
- Error chan error
|
|
|
- bufferReadLock *sync.Mutex
|
|
|
- writer buf.Writer
|
|
|
- addons *Addons
|
|
|
- trafficState *TrafficState
|
|
|
- ctx context.Context
|
|
|
+ Buffer chan buf.MultiBuffer
|
|
|
+ Trigger chan int
|
|
|
+ Error chan error
|
|
|
+ closed chan int
|
|
|
+ bufferReadLock *sync.Mutex
|
|
|
+ writer buf.Writer
|
|
|
+ addons *Addons
|
|
|
+ trafficState *TrafficState
|
|
|
+ writeOnceUserUUID *[]byte
|
|
|
+ ctx context.Context
|
|
|
}
|
|
|
|
|
|
-func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context context.Context) *Scheduler {
|
|
|
+func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, userUUID *[]byte, context context.Context) *Scheduler {
|
|
|
var s = Scheduler{
|
|
|
Buffer: make(chan buf.MultiBuffer, 100),
|
|
|
Trigger: make(chan int),
|
|
|
Error: make(chan error, 100),
|
|
|
+ closed: make(chan int),
|
|
|
bufferReadLock: new(sync.Mutex),
|
|
|
writer: w,
|
|
|
addons: addon,
|
|
|
trafficState: state,
|
|
|
+ writeOnceUserUUID: userUUID,
|
|
|
ctx: context,
|
|
|
}
|
|
|
go s.mainLoop()
|
|
|
@@ -42,6 +46,9 @@ func NewScheduler(w buf.Writer, addon *Addons, state *TrafficState, context cont
|
|
|
|
|
|
func(s *Scheduler) mainLoop() {
|
|
|
for trigger := range s.Trigger {
|
|
|
+ if len(s.closed) > 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
go func() { // each trigger has independent delay, trigger does not block
|
|
|
var d = 0 * time.Millisecond
|
|
|
if s.addons.Delay != nil {
|
|
|
@@ -58,12 +65,31 @@ func(s *Scheduler) mainLoop() {
|
|
|
if sending > 0 {
|
|
|
errors.LogDebug(s.ctx, "Scheduler Trigger for ", sending, " buffer(s) with ", d, " ", trigger)
|
|
|
for i := 0; i<sending; i++ {
|
|
|
- s.Error <- s.writer.WriteMultiBuffer(<-s.Buffer)
|
|
|
+ err := s.writer.WriteMultiBuffer(<-s.Buffer)
|
|
|
+ if err != nil {
|
|
|
+ s.Error <- err
|
|
|
+ s.closed <- 1
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
- } else if trigger > 0 {
|
|
|
+ } else if trigger > 0 && (s.trafficState.Inbound.IsPadding || s.trafficState.Outbound.IsPadding) && ShouldStartSeed(s.addons, s.trafficState) && !ShouldStopSeed(s.addons, s.trafficState) {
|
|
|
errors.LogDebug(s.ctx, "Scheduler Trigger for fake buffer with ", d, " ", trigger)
|
|
|
+ s.trafficState.NumberOfPacketSent += 1
|
|
|
mb := make(buf.MultiBuffer, 1)
|
|
|
- s.Error <- s.writer.WriteMultiBuffer(mb)
|
|
|
+ mb[0] = XtlsPadding(nil, CommandPaddingContinue, s.writeOnceUserUUID, true, s.addons, s.ctx)
|
|
|
+ s.trafficState.ByteSent += int64(mb.Len())
|
|
|
+ if s.trafficState.StartTime.IsZero() {
|
|
|
+ s.trafficState.StartTime = time.Now()
|
|
|
+ }
|
|
|
+ err := s.writer.WriteMultiBuffer(mb)
|
|
|
+ if err != nil {
|
|
|
+ s.Error <- err
|
|
|
+ s.closed <- 1
|
|
|
+ return
|
|
|
+ }
|
|
|
+ if buffered, ok := s.writer.(*buf.BufferedWriter); ok {
|
|
|
+ buffered.SetBuffered(false)
|
|
|
+ }
|
|
|
}
|
|
|
s.bufferReadLock.Unlock()
|
|
|
}()
|
|
|
@@ -72,7 +98,10 @@ func(s *Scheduler) mainLoop() {
|
|
|
|
|
|
func(s *Scheduler) exampleIndependentScheduler() {
|
|
|
for {
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
+ if len(s.closed) > 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
s.Trigger <- 1 // send fake buffer if no pending
|
|
|
+ time.Sleep(500 * time.Millisecond)
|
|
|
}
|
|
|
}
|