|
@@ -490,17 +490,21 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
|
|
t.Fatal("Expected file in progress")
|
|
|
}
|
|
|
|
|
|
- copyChan := make(chan copyBlocksState)
|
|
|
pullChan := make(chan pullBlockState)
|
|
|
finisherBufferChan := make(chan *sharedPullerState)
|
|
|
finisherChan := make(chan *sharedPullerState)
|
|
|
dbUpdateChan := make(chan dbUpdateJob, 1)
|
|
|
|
|
|
- go f.copierRoutine(copyChan, pullChan, finisherBufferChan)
|
|
|
+ copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan)
|
|
|
go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string))
|
|
|
- defer close(copyChan)
|
|
|
- defer close(pullChan)
|
|
|
- defer close(finisherChan)
|
|
|
+
|
|
|
+ defer func() {
|
|
|
+ close(copyChan)
|
|
|
+ copyWg.Wait()
|
|
|
+ close(pullChan)
|
|
|
+ close(finisherBufferChan)
|
|
|
+ close(finisherChan)
|
|
|
+ }()
|
|
|
|
|
|
f.handleFile(file, copyChan, dbUpdateChan)
|
|
|
|
|
@@ -508,15 +512,15 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
|
|
|
// loop has been performed.
|
|
|
toPull := <-pullChan
|
|
|
|
|
|
- // Close the file, causing errors on further access
|
|
|
- toPull.sharedPullerState.fail(os.ErrNotExist)
|
|
|
-
|
|
|
// Unblock copier
|
|
|
go func() {
|
|
|
for range pullChan {
|
|
|
}
|
|
|
}()
|
|
|
|
|
|
+ // Close the file, causing errors on further access
|
|
|
+ toPull.sharedPullerState.fail(os.ErrNotExist)
|
|
|
+
|
|
|
select {
|
|
|
case state := <-finisherBufferChan:
|
|
|
// At this point the file should still be registered with both the job
|
|
@@ -580,66 +584,90 @@ func TestDeregisterOnFailInPull(t *testing.T) {
|
|
|
t.Fatal("Expected file in progress")
|
|
|
}
|
|
|
|
|
|
- copyChan := make(chan copyBlocksState)
|
|
|
pullChan := make(chan pullBlockState)
|
|
|
finisherBufferChan := make(chan *sharedPullerState)
|
|
|
finisherChan := make(chan *sharedPullerState)
|
|
|
dbUpdateChan := make(chan dbUpdateJob, 1)
|
|
|
|
|
|
- go f.copierRoutine(copyChan, pullChan, finisherBufferChan)
|
|
|
- go f.pullerRoutine(pullChan, finisherBufferChan)
|
|
|
+ copyChan, copyWg := startCopier(f, pullChan, finisherBufferChan)
|
|
|
+ pullWg := sync.NewWaitGroup()
|
|
|
+ pullWg.Add(1)
|
|
|
+ go func() {
|
|
|
+ f.pullerRoutine(pullChan, finisherBufferChan)
|
|
|
+ pullWg.Done()
|
|
|
+ }()
|
|
|
go f.finisherRoutine(finisherChan, dbUpdateChan, make(chan string))
|
|
|
- defer close(copyChan)
|
|
|
- defer close(pullChan)
|
|
|
- defer close(finisherChan)
|
|
|
+ defer func() {
|
|
|
+ // Unblock copier and puller
|
|
|
+ go func() {
|
|
|
+ for range finisherBufferChan {
|
|
|
+ }
|
|
|
+ }()
|
|
|
+ close(copyChan)
|
|
|
+ copyWg.Wait()
|
|
|
+ close(pullChan)
|
|
|
+ pullWg.Wait()
|
|
|
+ close(finisherBufferChan)
|
|
|
+ close(finisherChan)
|
|
|
+ }()
|
|
|
|
|
|
f.handleFile(file, copyChan, dbUpdateChan)
|
|
|
|
|
|
// Receive at finisher, we should error out as puller has nowhere to pull
|
|
|
// from.
|
|
|
timeout = time.Second
|
|
|
- select {
|
|
|
- case state := <-finisherBufferChan:
|
|
|
- // At this point the file should still be registered with both the job
|
|
|
- // queue, and the progress emitter. Verify this.
|
|
|
- if f.model.progressEmitter.lenRegistry() != 1 || f.queue.lenProgress() != 1 || f.queue.lenQueued() != 0 {
|
|
|
- t.Fatal("Could not find file")
|
|
|
+
|
|
|
+ // Both the puller and copier may send to the finisherBufferChan.
|
|
|
+ var state *sharedPullerState
|
|
|
+ after := time.After(5 * time.Second)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case state = <-finisherBufferChan:
|
|
|
+ case <-after:
|
|
|
+ t.Fatal("Didn't get failed state to the finisher")
|
|
|
}
|
|
|
+ if state.failed() != nil {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // Pass the file down the real finisher, and give it time to consume
|
|
|
- finisherChan <- state
|
|
|
+ // At this point the file should still be registered with both the job
|
|
|
+ // queue, and the progress emitter. Verify this.
|
|
|
+ if f.model.progressEmitter.lenRegistry() != 1 || f.queue.lenProgress() != 1 || f.queue.lenQueued() != 0 {
|
|
|
+ t.Fatal("Could not find file")
|
|
|
+ }
|
|
|
|
|
|
- t0 := time.Now()
|
|
|
- if ev, err := s.Poll(time.Minute); err != nil {
|
|
|
- t.Fatal("Got error waiting for ItemFinished event:", err)
|
|
|
- } else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
|
|
- t.Fatal("Got ItemFinished event for wrong file:", n)
|
|
|
- }
|
|
|
- t.Log("event took", time.Since(t0))
|
|
|
+ // Pass the file down the real finisher, and give it time to consume
|
|
|
+ finisherChan <- state
|
|
|
|
|
|
- state.mut.Lock()
|
|
|
- stateWriter := state.writer
|
|
|
- state.mut.Unlock()
|
|
|
- if stateWriter != nil {
|
|
|
- t.Fatal("File not closed?")
|
|
|
- }
|
|
|
+ t0 := time.Now()
|
|
|
+ if ev, err := s.Poll(time.Minute); err != nil {
|
|
|
+ t.Fatal("Got error waiting for ItemFinished event:", err)
|
|
|
+ } else if n := ev.Data.(map[string]interface{})["item"]; n != state.file.Name {
|
|
|
+ t.Fatal("Got ItemFinished event for wrong file:", n)
|
|
|
+ }
|
|
|
+ t.Log("event took", time.Since(t0))
|
|
|
|
|
|
- if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
|
|
- t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
|
|
- }
|
|
|
+ state.mut.Lock()
|
|
|
+ stateWriter := state.writer
|
|
|
+ state.mut.Unlock()
|
|
|
+ if stateWriter != nil {
|
|
|
+ t.Fatal("File not closed?")
|
|
|
+ }
|
|
|
|
|
|
- // Doing it again should have no effect
|
|
|
- finisherChan <- state
|
|
|
+ if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
|
|
+ t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
|
|
+ }
|
|
|
|
|
|
- if _, err := s.Poll(time.Second); err != events.ErrTimeout {
|
|
|
- t.Fatal("Expected timeout, not another event", err)
|
|
|
- }
|
|
|
+ // Doing it again should have no effect
|
|
|
+ finisherChan <- state
|
|
|
|
|
|
- if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
|
|
- t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
|
|
- }
|
|
|
- case <-time.After(5 * time.Second):
|
|
|
- t.Fatal("Didn't get anything to the finisher")
|
|
|
+ if _, err := s.Poll(time.Second); err != events.ErrTimeout {
|
|
|
+ t.Fatal("Expected timeout, not another event", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if f.model.progressEmitter.lenRegistry() != 0 || f.queue.lenProgress() != 0 || f.queue.lenQueued() != 0 {
|
|
|
+ t.Fatal("Still registered", f.model.progressEmitter.lenRegistry(), f.queue.lenProgress(), f.queue.lenQueued())
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -830,11 +858,14 @@ func TestCopyOwner(t *testing.T) {
|
|
|
// comes the finisher is done.
|
|
|
|
|
|
finisherChan := make(chan *sharedPullerState)
|
|
|
- defer close(finisherChan)
|
|
|
- copierChan := make(chan copyBlocksState)
|
|
|
- defer close(copierChan)
|
|
|
- go f.copierRoutine(copierChan, nil, finisherChan)
|
|
|
+ copierChan, copyWg := startCopier(f, nil, finisherChan)
|
|
|
go f.finisherRoutine(finisherChan, dbUpdateChan, nil)
|
|
|
+ defer func() {
|
|
|
+ close(copierChan)
|
|
|
+ copyWg.Wait()
|
|
|
+ close(finisherChan)
|
|
|
+ }()
|
|
|
+
|
|
|
f.handleFile(file, copierChan, nil)
|
|
|
<-dbUpdateChan
|
|
|
|
|
@@ -993,3 +1024,14 @@ func cleanupSharedPullerState(s *sharedPullerState) {
|
|
|
s.writer.fd.Close()
|
|
|
s.writer.mut.Unlock()
|
|
|
}
|
|
|
+
|
|
|
+func startCopier(f *sendReceiveFolder, pullChan chan<- pullBlockState, finisherChan chan<- *sharedPullerState) (chan copyBlocksState, sync.WaitGroup) {
|
|
|
+ copyChan := make(chan copyBlocksState)
|
|
|
+ wg := sync.NewWaitGroup()
|
|
|
+ wg.Add(1)
|
|
|
+ go func() {
|
|
|
+ f.copierRoutine(copyChan, pullChan, finisherChan)
|
|
|
+ wg.Done()
|
|
|
+ }()
|
|
|
+ return copyChan, wg
|
|
|
+}
|