Przeglądaj źródła

lib/model: Don't exit pullerRoutine on cancelled ctx (fixes #6559) (#6562)

* lib/model: Don't exit pullerRoutine on cancelled ctx (fixes #6559)

* actual fix
Simon Frei 5 lat temu
rodzic
commit
d3ed4de4ed

+ 6 - 0
lib/model/bytesemaphore.go

@@ -51,6 +51,12 @@ func (s *byteSemaphore) take(bytes int) {
 }
 
 func (s *byteSemaphore) takeInner(ctx context.Context, bytes int) error {
+	// Checking context for bytes <= s.available is required for testing and doesn't do any harm.
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	default:
+	}
 	s.mut.Lock()
 	defer s.mut.Unlock()
 	if bytes > s.max {

+ 5 - 21
lib/model/folder_sendrecv.go

@@ -1079,30 +1079,12 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, snap *db.Snapshot
 		"action": "update",
 	})
 
-	s := sharedPullerState{
-		file:             file,
-		fs:               f.fs,
-		folder:           f.folderID,
-		tempName:         tempName,
-		realName:         file.Name,
-		copyTotal:        len(blocks),
-		copyNeeded:       len(blocks),
-		reused:           len(reused),
-		updated:          time.Now(),
-		available:        reused,
-		availableUpdated: time.Now(),
-		ignorePerms:      f.IgnorePerms || file.NoPermissions,
-		hasCurFile:       hasCurFile,
-		curFile:          curFile,
-		mut:              sync.NewRWMutex(),
-		sparse:           !f.DisableSparseFiles,
-		created:          time.Now(),
-	}
+	s := newSharedPullerState(file, f.fs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles)
 
 	l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
 
 	cs := copyBlocksState{
-		sharedPullerState: &s,
+		sharedPullerState: s,
 		blocks:            blocks,
 		have:              len(have),
 	}
@@ -1384,7 +1366,9 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
 		bytes := int(state.block.Size)
 
 		if err := requestLimiter.takeWithContext(f.ctx, bytes); err != nil {
-			break
+			state.fail(err)
+			out <- state.sharedPullerState
+			continue
 		}
 
 		wg.Add(1)

+ 43 - 0
lib/model/folder_sendrecv_test.go

@@ -1014,6 +1014,49 @@ func TestDeleteBehindSymlink(t *testing.T) {
 	}
 }
 
+// Reproduces https://github.com/syncthing/syncthing/issues/6559
+func TestPullCtxCancel(t *testing.T) {
+	m, f := setupSendReceiveFolder()
+	defer cleanupSRFolder(f, m)
+
+	pullChan := make(chan pullBlockState)
+	finisherChan := make(chan *sharedPullerState)
+
+	var cancel context.CancelFunc
+	f.ctx, cancel = context.WithCancel(context.Background())
+
+	go f.pullerRoutine(pullChan, finisherChan)
+	defer close(pullChan)
+
+	emptyState := func() pullBlockState {
+		return pullBlockState{
+			sharedPullerState: newSharedPullerState(protocol.FileInfo{}, nil, f.folderID, "", nil, nil, false, false, protocol.FileInfo{}, false),
+			block:             protocol.BlockInfo{},
+		}
+	}
+
+	cancel()
+
+	done := make(chan struct{})
+	defer close(done)
+	for i := 0; i < 2; i++ {
+		go func() {
+			select {
+			case pullChan <- emptyState():
+			case <-done:
+			}
+		}()
+		select {
+		case s := <-finisherChan:
+			if s.failed() == nil {
+				t.Errorf("state %v not failed", i)
+			}
+		case <-time.After(5 * time.Second):
+			t.Fatalf("timed out before receiving state %v on finisherChan", i)
+		}
+	}
+}
+
 func cleanupSharedPullerState(s *sharedPullerState) {
 	s.mut.Lock()
 	defer s.mut.Unlock()

+ 22 - 0
lib/model/sharedpullerstate.go

@@ -49,6 +49,28 @@ type sharedPullerState struct {
 	mut               sync.RWMutex    // Protects the above
 }
 
+func newSharedPullerState(file protocol.FileInfo, fs fs.Filesystem, folderID, tempName string, blocks []protocol.BlockInfo, reused []int32, ignorePerms, hasCurFile bool, curFile protocol.FileInfo, sparse bool) *sharedPullerState {
+	return &sharedPullerState{
+		file:             file,
+		fs:               fs,
+		folder:           folderID,
+		tempName:         tempName,
+		realName:         file.Name,
+		copyTotal:        len(blocks),
+		copyNeeded:       len(blocks),
+		reused:           len(reused),
+		updated:          time.Now(),
+		available:        reused,
+		availableUpdated: time.Now(),
+		ignorePerms:      ignorePerms,
+		hasCurFile:       hasCurFile,
+		curFile:          curFile,
+		mut:              sync.NewRWMutex(),
+		sparse:           sparse,
+		created:          time.Now(),
+	}
+}
+
 // A momentary state representing the progress of the puller
 type pullerProgress struct {
 	Total                   int   `json:"total"`