Kaynağa Gözat

taildrop: implement asynchronous file deletion (#9844)

File resumption requires keeping partial files around for some time,
but we must still eventually delete them if never resumed.
Thus, we implement asynchronous file deletion, which could
spawn a background goroutine to delete the files.

We also use the same mechanism for deleting files on Windows,
where a file can't be deleted if there is still an open file handle.
We can enqueue those with the asynchronous file deleter as well.

Updates tailscale/corp#14772

Signed-off-by: Joe Tsai <[email protected]>
Joe Tsai 2 yıl önce
ebeveyn
işleme
c2a551469c

+ 6 - 3
ipn/ipnlocal/local.go

@@ -646,6 +646,9 @@ func (b *LocalBackend) Shutdown() {
 	if b.sockstatLogger != nil {
 		b.sockstatLogger.Shutdown()
 	}
+	if b.peerAPIServer != nil {
+		b.peerAPIServer.taildrop.Shutdown()
+	}
 
 	b.unregisterNetMon()
 	b.unregisterHealthWatch()
@@ -3614,14 +3617,14 @@ func (b *LocalBackend) initPeerAPIListener() {
 
 	ps := &peerAPIServer{
 		b: b,
-		taildrop: &taildrop.Manager{
+		taildrop: taildrop.ManagerOptions{
 			Logf:             b.logf,
-			Clock:            tstime.DefaultClock{b.clock},
+			Clock:            tstime.DefaultClock{Clock: b.clock},
 			Dir:              fileRoot,
 			DirectFileMode:   b.directFileRoot != "",
 			AvoidFinalRename: !b.directFileDoFinalRename,
 			SendFileNotify:   b.sendFileNotify,
-		},
+		}.New(),
 	}
 	if dm, ok := b.sys.DNSManager.GetOK(); ok {
 		ps.resolver = dm.Resolver()

+ 7 - 7
ipn/ipnlocal/peerapi_test.go

@@ -68,7 +68,7 @@ func bodyNotContains(sub string) check {
 
 func fileHasSize(name string, size int) check {
 	return func(t *testing.T, e *peerAPITestEnv) {
-		root := e.ph.ps.taildrop.Dir
+		root := e.ph.ps.taildrop.Dir()
 		if root == "" {
 			t.Errorf("no rootdir; can't check whether %q has size %v", name, size)
 			return
@@ -84,7 +84,7 @@ func fileHasSize(name string, size int) check {
 
 func fileHasContents(name string, want string) check {
 	return func(t *testing.T, e *peerAPITestEnv) {
-		root := e.ph.ps.taildrop.Dir
+		root := e.ph.ps.taildrop.Dir()
 		if root == "" {
 			t.Errorf("no rootdir; can't check contents of %q", name)
 			return
@@ -540,11 +540,11 @@ func TestHandlePeerAPI(t *testing.T) {
 			if !tt.omitRoot {
 				rootDir = t.TempDir()
 				if e.ph.ps.taildrop == nil {
-					e.ph.ps.taildrop = &taildrop.Manager{
+					e.ph.ps.taildrop = taildrop.ManagerOptions{
 						Logf: e.logBuf.Logf,
-					}
+						Dir:  rootDir,
+					}.New()
 				}
-				e.ph.ps.taildrop.Dir = rootDir
 			}
 			for _, req := range tt.reqs {
 				e.rr = httptest.NewRecorder()
@@ -583,10 +583,10 @@ func TestFileDeleteRace(t *testing.T) {
 			capFileSharing: true,
 			clock:          &tstest.Clock{},
 		},
-		taildrop: &taildrop.Manager{
+		taildrop: taildrop.ManagerOptions{
 			Logf: t.Logf,
 			Dir:  dir,
-		},
+		}.New(),
 	}
 	ph := &peerAPIHandler{
 		isSelf: true,

+ 182 - 0
taildrop/delete.go

@@ -0,0 +1,182 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package taildrop
+
+import (
+	"container/list"
+	"context"
+	"io/fs"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
+
+	"tailscale.com/syncs"
+	"tailscale.com/tstime"
+	"tailscale.com/types/logger"
+)
+
+// deleteDelay is the amount of time to wait before we delete a file.
+// A shorter value ensures timely deletion of deleted and partial files, while
+// a longer value provides more opportunity for partial files to be resumed.
+const deleteDelay = time.Hour
+
+// fileDeleter manages asynchronous deletion of files after deleteDelay.
+type fileDeleter struct {
+	logf  logger.Logf
+	clock tstime.DefaultClock
+	event func(string) // called for certain events; for testing only
+	dir   string
+
+	mu     sync.Mutex
+	queue  list.List
+	byName map[string]*list.Element
+
+	emptySignal chan struct{} // signal that the queue is empty
+	group       syncs.WaitGroup
+	shutdownCtx context.Context
+	shutdown    context.CancelFunc
+}
+
+// deleteFile is a specific file to delete after deleteDelay.
+type deleteFile struct {
+	name     string
+	inserted time.Time
+}
+
+func (d *fileDeleter) Init(logf logger.Logf, clock tstime.DefaultClock, event func(string), dir string) {
+	d.logf = logf
+	d.clock = clock
+	d.dir = dir
+	d.event = event
+
+	// From a cold-start, load the list of partial and deleted files.
+	d.byName = make(map[string]*list.Element)
+	d.emptySignal = make(chan struct{})
+	d.shutdownCtx, d.shutdown = context.WithCancel(context.Background())
+	d.group.Go(func() {
+		d.event("start init")
+		defer d.event("end init")
+		rangeDir(dir, func(de fs.DirEntry) bool {
+			switch {
+			case d.shutdownCtx.Err() != nil:
+				return false // terminate early
+			case !de.Type().IsRegular():
+				return true
+			case strings.Contains(de.Name(), partialSuffix):
+				d.Insert(de.Name())
+			case strings.Contains(de.Name(), deletedSuffix):
+				// Best-effort immediate deletion of deleted files.
+				name := strings.TrimSuffix(de.Name(), deletedSuffix)
+				if os.Remove(filepath.Join(dir, name)) == nil {
+					if os.Remove(filepath.Join(dir, de.Name())) == nil {
+						break
+					}
+				}
+				// Otherwise, enqueue the file for later deletion.
+				d.Insert(de.Name())
+			}
+			return true
+		})
+	})
+}
+
+// Insert enqueues baseName for eventual deletion.
+func (d *fileDeleter) Insert(baseName string) {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if d.shutdownCtx.Err() != nil {
+		return
+	}
+	if _, ok := d.byName[baseName]; ok {
+		return // already queued for deletion
+	}
+	d.byName[baseName] = d.queue.PushBack(&deleteFile{baseName, d.clock.Now()})
+	if d.queue.Len() == 1 {
+		d.group.Go(func() { d.waitAndDelete(deleteDelay) })
+	}
+}
+
+// waitAndDelete is an asynchronous deletion goroutine.
+// At most one waitAndDelete routine is ever running at a time.
+// It is not started unless there is at least one file in the queue.
+func (d *fileDeleter) waitAndDelete(wait time.Duration) {
+	tc, ch := d.clock.NewTimer(wait)
+	defer tc.Stop() // cleanup the timer resource if we stop early
+	d.event("start waitAndDelete")
+	defer d.event("end waitAndDelete")
+	select {
+	case <-d.shutdownCtx.Done():
+	case <-d.emptySignal:
+	case now := <-ch:
+		d.mu.Lock()
+		defer d.mu.Unlock()
+
+		// Iterate over all files to delete, and delete anything old enough.
+		var next *list.Element
+		var failed []*list.Element
+		for elem := d.queue.Front(); elem != nil; elem = next {
+			next = elem.Next()
+			file := elem.Value.(*deleteFile)
+			if now.Sub(file.inserted) < deleteDelay {
+				break // everything after this is recently inserted
+			}
+
+			// Delete the expired file.
+			if name, ok := strings.CutSuffix(file.name, deletedSuffix); ok {
+				if err := os.Remove(filepath.Join(d.dir, name)); err != nil && !os.IsNotExist(err) {
+					d.logf("could not delete: %v", redactError(err))
+					failed = append(failed, elem)
+					continue
+				}
+			}
+			if err := os.Remove(filepath.Join(d.dir, file.name)); err != nil && !os.IsNotExist(err) {
+				d.logf("could not delete: %v", redactError(err))
+				failed = append(failed, elem)
+				continue
+			}
+			d.queue.Remove(elem)
+			delete(d.byName, file.name)
+			d.event("deleted " + file.name)
+		}
+		for _, elem := range failed {
+			elem.Value.(*deleteFile).inserted = now // retry after deleteDelay
+			d.queue.MoveToBack(elem)
+		}
+
+		// If there are still some files to delete, retry again later.
+		if d.queue.Len() > 0 {
+			file := d.queue.Front().Value.(*deleteFile)
+			retryAfter := deleteDelay - now.Sub(file.inserted)
+			d.group.Go(func() { d.waitAndDelete(retryAfter) })
+		}
+	}
+}
+
+// Remove dequeues baseName from eventual deletion.
+func (d *fileDeleter) Remove(baseName string) {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if elem := d.byName[baseName]; elem != nil {
+		d.queue.Remove(elem)
+		delete(d.byName, baseName)
+		// Signal to terminate any waitAndDelete goroutines.
+		if d.queue.Len() == 0 {
+			select {
+			case <-d.shutdownCtx.Done():
+			case d.emptySignal <- struct{}{}:
+			}
+		}
+	}
+}
+
+// Shutdown shuts down the deleter.
+// It blocks until all goroutines are stopped.
+func (d *fileDeleter) Shutdown() {
+	d.mu.Lock() // acquire lock to ensure no new goroutines start after shutdown
+	d.shutdown()
+	d.mu.Unlock()
+	d.group.Wait()
+}

+ 132 - 0
taildrop/delete_test.go

@@ -0,0 +1,132 @@
+// Copyright (c) Tailscale Inc & AUTHORS
+// SPDX-License-Identifier: BSD-3-Clause
+
+package taildrop
+
+import (
+	"os"
+	"path/filepath"
+	"slices"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"tailscale.com/tstest"
+	"tailscale.com/tstime"
+	"tailscale.com/util/must"
+)
+
+func TestDeleter(t *testing.T) {
+	dir := t.TempDir()
+	must.Do(touchFile(filepath.Join(dir, "foo.partial")))
+	must.Do(touchFile(filepath.Join(dir, "bar.partial")))
+	must.Do(touchFile(filepath.Join(dir, "fizz")))
+	must.Do(touchFile(filepath.Join(dir, "fizz.deleted")))
+	must.Do(touchFile(filepath.Join(dir, "buzz.deleted"))) // lacks a matching "buzz" file
+
+	checkDirectory := func(want ...string) {
+		t.Helper()
+		var got []string
+		for _, de := range must.Get(os.ReadDir(dir)) {
+			got = append(got, de.Name())
+		}
+		slices.Sort(got)
+		slices.Sort(want)
+		if diff := cmp.Diff(got, want); diff != "" {
+			t.Fatalf("directory mismatch (-got +want):\n%s", diff)
+		}
+	}
+
+	clock := tstest.NewClock(tstest.ClockOpts{Start: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC)})
+	advance := func(d time.Duration) {
+		t.Helper()
+		t.Logf("advance: %v", d)
+		clock.Advance(d)
+	}
+
+	eventsChan := make(chan string, 1000)
+	checkEvents := func(want ...string) {
+		t.Helper()
+		tm := time.NewTimer(10 * time.Second)
+		defer tm.Stop()
+		var got []string
+		for range want {
+			select {
+			case event := <-eventsChan:
+				t.Logf("event: %s", event)
+				got = append(got, event)
+			case <-tm.C:
+				t.Fatalf("timed out waiting for event: got %v, want %v", got, want)
+			}
+		}
+		slices.Sort(got)
+		slices.Sort(want)
+		if diff := cmp.Diff(got, want); diff != "" {
+			t.Fatalf("events mismatch (-got +want):\n%s", diff)
+		}
+	}
+	eventHook := func(event string) { eventsChan <- event }
+
+	var fd fileDeleter
+	fd.Init(t.Logf, tstime.DefaultClock{Clock: clock}, eventHook, dir)
+	defer fd.Shutdown()
+	insert := func(name string) {
+		t.Helper()
+		t.Logf("insert: %v", name)
+		fd.Insert(name)
+	}
+	remove := func(name string) {
+		t.Helper()
+		t.Logf("remove: %v", name)
+		fd.Remove(name)
+	}
+
+	checkEvents("start init")
+	checkEvents("end init", "start waitAndDelete")
+	checkDirectory("foo.partial", "bar.partial", "buzz.deleted")
+
+	advance(deleteDelay / 2)
+	checkDirectory("foo.partial", "bar.partial", "buzz.deleted")
+	advance(deleteDelay / 2)
+	checkEvents("deleted foo.partial", "deleted bar.partial", "deleted buzz.deleted")
+	checkEvents("end waitAndDelete")
+	checkDirectory()
+
+	must.Do(touchFile(filepath.Join(dir, "one.partial")))
+	insert("one.partial")
+	checkEvents("start waitAndDelete")
+	advance(deleteDelay / 4)
+	must.Do(touchFile(filepath.Join(dir, "two.partial")))
+	insert("two.partial")
+	advance(deleteDelay / 4)
+	must.Do(touchFile(filepath.Join(dir, "three.partial")))
+	insert("three.partial")
+	advance(deleteDelay / 4)
+	must.Do(touchFile(filepath.Join(dir, "four.partial")))
+	insert("four.partial")
+
+	advance(deleteDelay / 4)
+	checkEvents("deleted one.partial")
+	checkDirectory("two.partial", "three.partial", "four.partial")
+	checkEvents("end waitAndDelete", "start waitAndDelete")
+
+	advance(deleteDelay / 4)
+	checkEvents("deleted two.partial")
+	checkDirectory("three.partial", "four.partial")
+	checkEvents("end waitAndDelete", "start waitAndDelete")
+
+	advance(deleteDelay / 4)
+	checkEvents("deleted three.partial")
+	checkDirectory("four.partial")
+	checkEvents("end waitAndDelete", "start waitAndDelete")
+
+	advance(deleteDelay / 4)
+	checkEvents("deleted four.partial")
+	checkDirectory()
+	checkEvents("end waitAndDelete")
+
+	insert("wuzz.partial")
+	checkEvents("start waitAndDelete")
+	remove("wuzz.partial")
+	checkEvents("end waitAndDelete")
+}

+ 16 - 26
taildrop/resume.go

@@ -9,6 +9,7 @@ import (
 	"encoding/hex"
 	"fmt"
 	"io"
+	"io/fs"
 	"os"
 	"slices"
 	"strings"
@@ -72,34 +73,23 @@ func hexAppendEncode(dst, src []byte) []byte {
 // PartialFiles returns a list of partial files in [Handler.Dir]
 // that were sent (or is actively being sent) by the provided id.
 func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) {
-	if m == nil || m.Dir == "" {
+	if m == nil || m.opts.Dir == "" {
 		return nil, ErrNoTaildrop
 	}
-	if m.DirectFileMode && m.AvoidFinalRename {
+	if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
 		return nil, nil // resuming is not supported for users that peek at our file structure
 	}
 
-	f, err := os.Open(m.Dir)
-	if err != nil {
-		return ret, err
-	}
-	defer f.Close()
-
 	suffix := id.partialSuffix()
-	for {
-		des, err := f.ReadDir(10)
-		if err != nil {
-			return ret, err
-		}
-		for _, de := range des {
-			if name := de.Name(); strings.HasSuffix(name, suffix) {
-				ret = append(ret, name)
-			}
-		}
-		if err == io.EOF {
-			return ret, nil
+	if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
+		if name := de.Name(); strings.HasSuffix(name, suffix) {
+			ret = append(ret, name)
 		}
+		return true
+	}); err != nil {
+		return ret, redactError(err)
 	}
+	return ret, nil
 }
 
 // HashPartialFile hashes the contents of a partial file sent by id,
@@ -109,14 +99,14 @@ func (m *Manager) PartialFiles(id ClientID) (ret []string, err error) {
 // If [FileHashes.Length] is less than length and no error occurred,
 // then it implies that all remaining content in the file has been hashed.
 func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length int64) (FileChecksums, error) {
-	if m == nil || m.Dir == "" {
+	if m == nil || m.opts.Dir == "" {
 		return FileChecksums{}, ErrNoTaildrop
 	}
-	if m.DirectFileMode && m.AvoidFinalRename {
+	if m.opts.DirectFileMode && m.opts.AvoidFinalRename {
 		return FileChecksums{}, nil // resuming is not supported for users that peek at our file structure
 	}
 
-	dstFile, err := m.joinDir(baseName)
+	dstFile, err := joinDir(m.opts.Dir, baseName)
 	if err != nil {
 		return FileChecksums{}, err
 	}
@@ -125,12 +115,12 @@ func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length i
 		if os.IsNotExist(err) {
 			return FileChecksums{}, nil
 		}
-		return FileChecksums{}, err
+		return FileChecksums{}, redactError(err)
 	}
 	defer f.Close()
 
 	if _, err := f.Seek(offset, io.SeekStart); err != nil {
-		return FileChecksums{}, err
+		return FileChecksums{}, redactError(err)
 	}
 	checksums := FileChecksums{
 		Offset:    offset,
@@ -145,7 +135,7 @@ func (m *Manager) HashPartialFile(id ClientID, baseName string, offset, length i
 	for {
 		switch n, err := io.ReadFull(r, b); {
 		case err != nil && err != io.EOF && err != io.ErrUnexpectedEOF:
-			return checksums, err
+			return checksums, redactError(err)
 		case n == 0:
 			return checksums, nil
 		default:

+ 4 - 3
taildrop/resume_test.go

@@ -19,7 +19,8 @@ func TestResume(t *testing.T) {
 	defer func() { blockSize = oldBlockSize }()
 	blockSize = 256
 
-	m := Manager{Logf: t.Logf, Dir: t.TempDir()}
+	m := ManagerOptions{Logf: t.Logf, Dir: t.TempDir()}.New()
+	defer m.Shutdown()
 
 	rn := rand.New(rand.NewSource(0))
 	want := make([]byte, 12345)
@@ -32,7 +33,7 @@ func TestResume(t *testing.T) {
 		})
 		must.Do(err)
 		must.Get(m.PutFile("", "foo", r, offset, -1))
-		got := must.Get(os.ReadFile(must.Get(m.joinDir("foo"))))
+		got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
 		if !bytes.Equal(got, want) {
 			t.Errorf("content mismatches")
 		}
@@ -54,7 +55,7 @@ func TestResume(t *testing.T) {
 				break
 			}
 		}
-		got := must.Get(os.ReadFile(must.Get(m.joinDir("foo"))))
+		got := must.Get(os.ReadFile(must.Get(joinDir(m.opts.Dir, "foo"))))
 		if !bytes.Equal(got, want) {
 			t.Errorf("content mismatches")
 		}

+ 63 - 129
taildrop/retrieve.go

@@ -12,7 +12,6 @@ import (
 	"path/filepath"
 	"runtime"
 	"sort"
-	"strings"
 	"time"
 
 	"tailscale.com/client/tailscale/apitype"
@@ -21,163 +20,98 @@ import (
 
 // HasFilesWaiting reports whether any files are buffered in [Handler.Dir].
 // This always returns false when [Handler.DirectFileMode] is false.
-func (m *Manager) HasFilesWaiting() bool {
-	if m == nil || m.Dir == "" || m.DirectFileMode {
+func (m *Manager) HasFilesWaiting() (has bool) {
+	if m == nil || m.opts.Dir == "" || m.opts.DirectFileMode {
 		return false
 	}
-	if m.knownEmpty.Load() {
-		// Optimization: this is usually empty, so avoid opening
-		// the directory and checking. We can't cache the actual
-		// has-files-or-not values as the macOS/iOS client might
-		// in the future use+delete the files directly. So only
-		// keep this negative cache.
-		return false
-	}
-	f, err := os.Open(m.Dir)
-	if err != nil {
+
+	// Optimization: this is usually empty, so avoid opening
+	// the directory and checking. We can't cache the actual
+	// has-files-or-not values as the macOS/iOS client might
+	// in the future use+delete the files directly. So only
+	// keep this negative cache.
+	totalReceived := m.totalReceived.Load()
+	if totalReceived == m.emptySince.Load() {
 		return false
 	}
-	defer f.Close()
-	for {
-		des, err := f.ReadDir(10)
-		for _, de := range des {
-			name := de.Name()
-			if strings.HasSuffix(name, partialSuffix) {
-				continue
-			}
-			if name, ok := strings.CutSuffix(name, deletedSuffix); ok { // for Windows + tests
-				// After we're done looping over files, then try
-				// to delete this file. Don't do it proactively,
-				// as the OS may return "foo.jpg.deleted" before "foo.jpg"
-				// and we don't want to delete the ".deleted" file before
-				// enumerating to the "foo.jpg" file.
-				defer tryDeleteAgain(filepath.Join(m.Dir, name))
-				continue
-			}
-			if de.Type().IsRegular() {
-				_, err := os.Stat(filepath.Join(m.Dir, name+deletedSuffix))
-				if os.IsNotExist(err) {
-					return true
-				}
-				if err == nil {
-					tryDeleteAgain(filepath.Join(m.Dir, name))
-					continue
-				}
-			}
-		}
-		if err == io.EOF {
-			m.knownEmpty.Store(true)
+
+	// Check whether there is at least one one waiting file.
+	err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
+		name := de.Name()
+		if isPartialOrDeleted(name) || !de.Type().IsRegular() {
+			return true
 		}
-		if err != nil {
-			break
+		_, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix))
+		if os.IsNotExist(err) {
+			has = true
+			return false
 		}
+		return true
+	})
+
+	// If there are no more waiting files, record totalReceived as emptySince
+	// so that we can short-circuit the expensive directory traversal
+	// if no files have been received after the start of this call.
+	if err == nil && !has {
+		m.emptySince.Store(totalReceived)
 	}
-	return false
+	return has
 }
 
 // WaitingFiles returns the list of files that have been sent by a
 // peer that are waiting in [Handler.Dir].
 // This always returns nil when [Handler.DirectFileMode] is false.
 func (m *Manager) WaitingFiles() (ret []apitype.WaitingFile, err error) {
-	if m == nil || m.Dir == "" {
+	if m == nil || m.opts.Dir == "" {
 		return nil, ErrNoTaildrop
 	}
-	if m.DirectFileMode {
+	if m.opts.DirectFileMode {
 		return nil, nil
 	}
-	f, err := os.Open(m.Dir)
-	if err != nil {
-		return nil, err
-	}
-	defer f.Close()
-	var deleted map[string]bool // "foo.jpg" => true (if "foo.jpg.deleted" exists)
-	for {
-		des, err := f.ReadDir(10)
-		for _, de := range des {
-			name := de.Name()
-			if strings.HasSuffix(name, partialSuffix) {
-				continue
-			}
-			if name, ok := strings.CutSuffix(name, deletedSuffix); ok { // for Windows + tests
-				if deleted == nil {
-					deleted = map[string]bool{}
-				}
-				deleted[name] = true
-				continue
-			}
-			if de.Type().IsRegular() {
-				fi, err := de.Info()
-				if err != nil {
-					continue
-				}
-				ret = append(ret, apitype.WaitingFile{
-					Name: filepath.Base(name),
-					Size: fi.Size(),
-				})
-			}
+	if err := rangeDir(m.opts.Dir, func(de fs.DirEntry) bool {
+		name := de.Name()
+		if isPartialOrDeleted(name) || !de.Type().IsRegular() {
+			return true
 		}
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return nil, err
-		}
-	}
-	if len(deleted) > 0 {
-		// Filter out any return values "foo.jpg" where a
-		// "foo.jpg.deleted" marker file exists on disk.
-		all := ret
-		ret = ret[:0]
-		for _, wf := range all {
-			if !deleted[wf.Name] {
-				ret = append(ret, wf)
+		_, err := os.Stat(filepath.Join(m.opts.Dir, name+deletedSuffix))
+		if os.IsNotExist(err) {
+			fi, err := de.Info()
+			if err != nil {
+				return true
 			}
+			ret = append(ret, apitype.WaitingFile{
+				Name: filepath.Base(name),
+				Size: fi.Size(),
+			})
 		}
-		// And do some opportunistic deleting while we're here.
-		// Maybe Windows is done virus scanning the file we tried
-		// to delete a long time ago and will let us delete it now.
-		for name := range deleted {
-			tryDeleteAgain(filepath.Join(m.Dir, name))
-		}
+		return true
+	}); err != nil {
+		return nil, redactError(err)
 	}
 	sort.Slice(ret, func(i, j int) bool { return ret[i].Name < ret[j].Name })
 	return ret, nil
 }
 
-// tryDeleteAgain tries to delete path (and path+deletedSuffix) after
-// it failed earlier.  This happens on Windows when various anti-virus
-// tools hook into filesystem operations and have the file open still
-// while we're trying to delete it. In that case we instead mark it as
-// deleted (writing a "foo.jpg.deleted" marker file), but then we
-// later try to clean them up.
-//
-// fullPath is the full path to the file without the deleted suffix.
-func tryDeleteAgain(fullPath string) {
-	if err := os.Remove(fullPath); err == nil || os.IsNotExist(err) {
-		os.Remove(fullPath + deletedSuffix)
-	}
-}
-
 // DeleteFile deletes a file of the given baseName from [Handler.Dir].
 // This method is only allowed when [Handler.DirectFileMode] is false.
 func (m *Manager) DeleteFile(baseName string) error {
-	if m == nil || m.Dir == "" {
+	if m == nil || m.opts.Dir == "" {
 		return ErrNoTaildrop
 	}
-	if m.DirectFileMode {
+	if m.opts.DirectFileMode {
 		return errors.New("deletes not allowed in direct mode")
 	}
-	path, err := m.joinDir(baseName)
+	path, err := joinDir(m.opts.Dir, baseName)
 	if err != nil {
 		return err
 	}
 	var bo *backoff.Backoff
-	logf := m.Logf
-	t0 := m.Clock.Now()
+	logf := m.opts.Logf
+	t0 := m.opts.Clock.Now()
 	for {
 		err := os.Remove(path)
 		if err != nil && !os.IsNotExist(err) {
-			err = redactErr(err)
+			err = redactError(err)
 			// Put a retry loop around deletes on Windows. Windows
 			// file descriptor closes are effectively asynchronous,
 			// as a bunch of hooks run on/after close, and we can't
@@ -192,13 +126,14 @@ func (m *Manager) DeleteFile(baseName string) error {
 				if bo == nil {
 					bo = backoff.NewBackoff("delete-retry", logf, 1*time.Second)
 				}
-				if m.Clock.Since(t0) < 5*time.Second {
+				if m.opts.Clock.Since(t0) < 5*time.Second {
 					bo.BackOff(context.Background(), err)
 					continue
 				}
 				if err := touchFile(path + deletedSuffix); err != nil {
 					logf("peerapi: failed to leave deleted marker: %v", err)
 				}
+				m.deleter.Insert(baseName + deletedSuffix)
 			}
 			logf("peerapi: failed to DeleteFile: %v", err)
 			return err
@@ -210,7 +145,7 @@ func (m *Manager) DeleteFile(baseName string) error {
 func touchFile(path string) error {
 	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE, 0666)
 	if err != nil {
-		return redactErr(err)
+		return redactError(err)
 	}
 	return f.Close()
 }
@@ -218,28 +153,27 @@ func touchFile(path string) error {
 // OpenFile opens a file of the given baseName from [Handler.Dir].
 // This method is only allowed when [Handler.DirectFileMode] is false.
 func (m *Manager) OpenFile(baseName string) (rc io.ReadCloser, size int64, err error) {
-	if m == nil || m.Dir == "" {
+	if m == nil || m.opts.Dir == "" {
 		return nil, 0, ErrNoTaildrop
 	}
-	if m.DirectFileMode {
+	if m.opts.DirectFileMode {
 		return nil, 0, errors.New("opens not allowed in direct mode")
 	}
-	path, err := m.joinDir(baseName)
+	path, err := joinDir(m.opts.Dir, baseName)
 	if err != nil {
 		return nil, 0, err
 	}
-	if fi, err := os.Stat(path + deletedSuffix); err == nil && fi.Mode().IsRegular() {
-		tryDeleteAgain(path)
-		return nil, 0, &fs.PathError{Op: "open", Path: redacted, Err: fs.ErrNotExist}
+	if _, err := os.Stat(path + deletedSuffix); err == nil {
+		return nil, 0, redactError(&fs.PathError{Op: "open", Path: path, Err: fs.ErrNotExist})
 	}
 	f, err := os.Open(path)
 	if err != nil {
-		return nil, 0, redactErr(err)
+		return nil, 0, redactError(err)
 	}
 	fi, err := f.Stat()
 	if err != nil {
 		f.Close()
-		return nil, 0, redactErr(err)
+		return nil, 0, redactError(err)
 	}
 	return f, fi.Size(), nil
 }

+ 17 - 21
taildrop/send.go

@@ -8,6 +8,7 @@ import (
 	"errors"
 	"io"
 	"os"
+	"path/filepath"
 	"sync"
 	"time"
 
@@ -72,25 +73,25 @@ func (f *incomingFile) Write(p []byte) (n int, err error) {
 // offset to specify where to resume receiving data at.
 func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, length int64) (int64, error) {
 	switch {
-	case m == nil || m.Dir == "":
+	case m == nil || m.opts.Dir == "":
 		return 0, ErrNoTaildrop
 	case !envknob.CanTaildrop():
 		return 0, ErrNoTaildrop
-	case distro.Get() == distro.Unraid && !m.DirectFileMode:
+	case distro.Get() == distro.Unraid && !m.opts.DirectFileMode:
 		return 0, ErrNotAccessible
 	}
-	dstPath, err := m.joinDir(baseName)
+	dstPath, err := joinDir(m.opts.Dir, baseName)
 	if err != nil {
 		return 0, err
 	}
 
 	redactAndLogError := func(action string, err error) error {
-		err = redactErr(err)
-		m.Logf("put %v error: %v", action, err)
+		err = redactError(err)
+		m.opts.Logf("put %v error: %v", action, err)
 		return err
 	}
 
-	avoidPartialRename := m.DirectFileMode && m.AvoidFinalRename
+	avoidPartialRename := m.opts.DirectFileMode && m.opts.AvoidFinalRename
 	if avoidPartialRename {
 		// Users using AvoidFinalRename are depending on the exact filename
 		// of the partial files. So avoid injecting the id into it.
@@ -98,20 +99,16 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len
 	}
 
 	// Check whether there is an in-progress transfer for the file.
-	sendFileNotify := m.SendFileNotify
-	if sendFileNotify == nil {
-		sendFileNotify = func() {} // avoid nil panics below
-	}
 	partialPath := dstPath + id.partialSuffix()
 	inFileKey := incomingFileKey{id, baseName}
 	inFile, loaded := m.incomingFiles.LoadOrInit(inFileKey, func() *incomingFile {
 		inFile := &incomingFile{
-			clock:          m.Clock,
-			started:        m.Clock.Now(),
+			clock:          m.opts.Clock,
+			started:        m.opts.Clock.Now(),
 			size:           length,
-			sendFileNotify: sendFileNotify,
+			sendFileNotify: m.opts.SendFileNotify,
 		}
-		if m.DirectFileMode {
+		if m.opts.DirectFileMode {
 			inFile.partialPath = partialPath
 		}
 		return inFile
@@ -120,6 +117,7 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len
 		return 0, ErrFileExists
 	}
 	defer m.incomingFiles.Delete(inFileKey)
+	m.deleter.Remove(filepath.Base(partialPath)) // avoid deleting the partial file while receiving
 
 	// Create (if not already) the partial file with read-write permissions.
 	f, err := os.OpenFile(partialPath, os.O_CREATE|os.O_RDWR, 0666)
@@ -133,9 +131,7 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len
 				os.Remove(partialPath) // best-effort
 				return
 			}
-
-			// TODO: We need to delete partialPath eventually.
-			// However, this must be done after some period of time.
+			m.deleter.Insert(filepath.Base(partialPath)) // mark partial file for eventual deletion
 		}
 	}()
 	inFile.w = f
@@ -177,8 +173,8 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len
 		inFile.mu.Lock()
 		inFile.done = true
 		inFile.mu.Unlock()
-		m.knownEmpty.Store(false)
-		sendFileNotify()
+		m.totalReceived.Add(1)
+		m.opts.SendFileNotify()
 		return fileLength, nil
 	}
 
@@ -236,8 +232,8 @@ func (m *Manager) PutFile(id ClientID, baseName string, r io.Reader, offset, len
 	if maxRetries <= 0 {
 		return 0, errors.New("too many retries trying to rename partial file")
 	}
-	m.knownEmpty.Store(false)
-	sendFileNotify()
+	m.totalReceived.Add(1)
+	m.opts.SendFileNotify()
 	return fileLength, nil
 }
 

+ 106 - 34
taildrop/taildrop.go

@@ -12,6 +12,8 @@ package taildrop
 import (
 	"errors"
 	"hash/adler32"
+	"io"
+	"io/fs"
 	"os"
 	"path"
 	"path/filepath"
@@ -30,6 +32,27 @@ import (
 	"tailscale.com/util/multierr"
 )
 
+var (
+	ErrNoTaildrop      = errors.New("Taildrop disabled; no storage directory")
+	ErrInvalidFileName = errors.New("invalid filename")
+	ErrFileExists      = errors.New("file already exists")
+	ErrNotAccessible   = errors.New("Taildrop folder not configured or accessible")
+)
+
+const (
+	// partialSuffix is the suffix appended to files while they're
+	// still in the process of being transferred.
+	partialSuffix = ".partial"
+
+	// deletedSuffix is the suffix for a deleted marker file
+	// that's placed next to a file (without the suffix) that we
+	// tried to delete, but Windows wouldn't let us. These are
+	// only written on Windows (and in tests), but they're not
+	// permitted to be uploaded directly on any platform, like
+	// partial files.
+	deletedSuffix = ".deleted"
+)
+
 // ClientID is an opaque identifier for file resumption.
 // A client can only list and resume partial files for its own ID.
 // It must contain any filesystem specific characters (e.g., slashes).
@@ -42,8 +65,8 @@ func (id ClientID) partialSuffix() string {
 	return "." + string(id) + partialSuffix // e.g., ".n12345CNTRL.partial"
 }
 
-// Manager manages the state for receiving and managing taildropped files.
-type Manager struct {
+// ManagerOptions are options to configure the [Manager].
+type ManagerOptions struct {
 	Logf  logger.Logf
 	Clock tstime.DefaultClock
 
@@ -80,39 +103,56 @@ type Manager struct {
 	// to the function when reception completes.
 	// It is not called if nil.
 	SendFileNotify func()
+}
 
-	knownEmpty atomic.Bool
+// Manager manages the state for receiving and managing taildropped files.
+type Manager struct {
+	opts ManagerOptions
 
+	// incomingFiles is a map of files actively being received.
 	incomingFiles syncs.Map[incomingFileKey, *incomingFile]
+	// deleter managers asynchronous deletion of files.
+	deleter fileDeleter
 
 	// renameMu is used to protect os.Rename calls so that they are atomic.
 	renameMu sync.Mutex
-}
 
-var (
-	ErrNoTaildrop      = errors.New("Taildrop disabled; no storage directory")
-	ErrInvalidFileName = errors.New("invalid filename")
-	ErrFileExists      = errors.New("file already exists")
-	ErrNotAccessible   = errors.New("Taildrop folder not configured or accessible")
-)
+	// totalReceived counts the cumulative total of received files.
+	totalReceived atomic.Int64
+	// emptySince specifies that there were no waiting files
+	// since this value of totalReceived.
+	emptySince atomic.Int64
+}
 
-const (
-	// partialSuffix is the suffix appended to files while they're
-	// still in the process of being transferred.
-	partialSuffix = ".partial"
+// New initializes a new taildrop manager.
+// It may spawn asynchronous goroutines to delete files,
+// so the Shutdown method must be called for resource cleanup.
+func (opts ManagerOptions) New() *Manager {
+	if opts.Logf == nil {
+		opts.Logf = logger.Discard
+	}
+	if opts.SendFileNotify == nil {
+		opts.SendFileNotify = func() {}
+	}
+	m := &Manager{opts: opts}
+	m.deleter.Init(opts.Logf, opts.Clock, func(string) {}, opts.Dir)
+	m.emptySince.Store(-1) // invalidate this cache
+	return m
+}
 
-	// deletedSuffix is the suffix for a deleted marker file
-	// that's placed next to a file (without the suffix) that we
-	// tried to delete, but Windows wouldn't let us. These are
-	// only written on Windows (and in tests), but they're not
-	// permitted to be uploaded directly on any platform, like
-	// partial files.
-	deletedSuffix = ".deleted"
-)
+// Dir returns the directory.
+func (m *Manager) Dir() string {
+	return m.opts.Dir
+}
 
-// redacted is a fake path name we use in errors, to avoid
-// accidentally logging actual filenames anywhere.
-const redacted = "redacted"
+// Shutdown shuts down the Manager.
+// It blocks until all spawned goroutines have stopped running.
+func (m *Manager) Shutdown() {
+	if m != nil {
+		m.deleter.shutdown()
+		m.deleter.group.Wait()
+	}
+}
 
 func validFilenameRune(r rune) bool {
 	switch r {
@@ -131,7 +171,11 @@ func validFilenameRune(r rune) bool {
 	return unicode.IsPrint(r)
 }
 
-func (m *Manager) joinDir(baseName string) (fullPath string, err error) {
+func isPartialOrDeleted(s string) bool {
+	return strings.HasSuffix(s, deletedSuffix) || strings.HasSuffix(s, partialSuffix)
+}
+
+func joinDir(dir, baseName string) (fullPath string, err error) {
 	if !utf8.ValidString(baseName) {
 		return "", ErrInvalidFileName
 	}
@@ -145,8 +189,7 @@ func (m *Manager) joinDir(baseName string) (fullPath string, err error) {
 	clean := path.Clean(baseName)
 	if clean != baseName ||
 		clean == "." || clean == ".." ||
-		strings.HasSuffix(clean, deletedSuffix) ||
-		strings.HasSuffix(clean, partialSuffix) {
+		isPartialOrDeleted(clean) {
 		return "", ErrInvalidFileName
 	}
 	for _, r := range baseName {
@@ -157,7 +200,32 @@ func (m *Manager) joinDir(baseName string) (fullPath string, err error) {
 	if !filepath.IsLocal(baseName) {
 		return "", ErrInvalidFileName
 	}
-	return filepath.Join(m.Dir, baseName), nil
+	return filepath.Join(dir, baseName), nil
+}
+
+// rangeDir iterates over the contents of a directory, calling fn for each entry.
+// It continues iterating while fn returns true.
+// It reports the number of entries seen.
+func rangeDir(dir string, fn func(fs.DirEntry) bool) error {
+	f, err := os.Open(dir)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+	for {
+		des, err := f.ReadDir(10)
+		for _, de := range des {
+			if !fn(de) {
+				return nil
+			}
+		}
+		if err != nil {
+			if err == io.EOF {
+				return nil
+			}
+			return err
+		}
+	}
 }
 
 // IncomingFiles returns a list of active incoming files.
@@ -182,16 +250,20 @@ func (m *Manager) IncomingFiles() []ipn.PartialFile {
 	return files
 }
 
-type redactedErr struct {
+// redacted is a fake path name we use in errors, to avoid
+// accidentally logging actual filenames anywhere.
+const redacted = "redacted"
+
+type redactedError struct {
 	msg   string
 	inner error
 }
 
-func (re *redactedErr) Error() string {
+func (re *redactedError) Error() string {
 	return re.msg
 }
 
-func (re *redactedErr) Unwrap() error {
+func (re *redactedError) Unwrap() error {
 	return re.inner
 }
 
@@ -205,7 +277,7 @@ func redactString(s string) string {
 	return string(b)
 }
 
-func redactErr(root error) error {
+func redactError(root error) error {
 	// redactStrings is a list of sensitive strings that were redacted.
 	// It is not sufficient to just snub out sensitive fields in Go errors
 	// since some wrapper errors like fmt.Errorf pre-cache the error string,
@@ -243,7 +315,7 @@ func redactErr(root error) error {
 	for _, toRedact := range redactStrings {
 		s = strings.ReplaceAll(s, toRedact, redactString(toRedact))
 	}
-	return &redactedErr{msg: s, inner: root}
+	return &redactedError{msg: s, inner: root}
 }
 
 var (

+ 24 - 140
taildrop/taildrop_test.go

@@ -4,153 +4,37 @@
 package taildrop
 
 import (
-	"errors"
-	"fmt"
-	"io/fs"
-	"os"
 	"path/filepath"
-	"runtime"
+	"strings"
 	"testing"
 )
 
-// Tests "foo.jpg.deleted" marks (for Windows).
-func TestDeletedMarkers(t *testing.T) {
+func TestJoinDir(t *testing.T) {
 	dir := t.TempDir()
-	h := &Manager{Dir: dir}
-
-	nothingWaiting := func() {
-		t.Helper()
-		h.knownEmpty.Store(false)
-		if h.HasFilesWaiting() {
-			t.Fatal("unexpected files waiting")
-		}
-	}
-	touch := func(base string) {
-		t.Helper()
-		if err := touchFile(filepath.Join(dir, base)); err != nil {
-			t.Fatal(err)
-		}
-	}
-	wantEmptyTempDir := func() {
-		t.Helper()
-		if fis, err := os.ReadDir(dir); err != nil {
-			t.Fatal(err)
-		} else if len(fis) > 0 && runtime.GOOS != "windows" {
-			for _, fi := range fis {
-				t.Errorf("unexpected file in tempdir: %q", fi.Name())
-			}
-		}
-	}
-
-	nothingWaiting()
-	wantEmptyTempDir()
-
-	touch("foo.jpg.deleted")
-	nothingWaiting()
-	wantEmptyTempDir()
-
-	touch("foo.jpg.deleted")
-	touch("foo.jpg")
-	nothingWaiting()
-	wantEmptyTempDir()
-
-	touch("foo.jpg.deleted")
-	touch("foo.jpg")
-	wf, err := h.WaitingFiles()
-	if err != nil {
-		t.Fatal(err)
-	}
-	if len(wf) != 0 {
-		t.Fatalf("WaitingFiles = %d; want 0", len(wf))
-	}
-	wantEmptyTempDir()
-
-	touch("foo.jpg.deleted")
-	touch("foo.jpg")
-	if rc, _, err := h.OpenFile("foo.jpg"); err == nil {
-		rc.Close()
-		t.Fatal("unexpected foo.jpg open")
-	}
-	wantEmptyTempDir()
-
-	// And verify basics still work in non-deleted cases.
-	touch("foo.jpg")
-	touch("bar.jpg.deleted")
-	if wf, err := h.WaitingFiles(); err != nil {
-		t.Error(err)
-	} else if len(wf) != 1 {
-		t.Errorf("WaitingFiles = %d; want 1", len(wf))
-	} else if wf[0].Name != "foo.jpg" {
-		t.Errorf("unexpected waiting file %+v", wf[0])
-	}
-	if rc, _, err := h.OpenFile("foo.jpg"); err != nil {
-		t.Fatal(err)
-	} else {
-		rc.Close()
-	}
-}
-
-func TestRedactErr(t *testing.T) {
-	testCases := []struct {
-		name string
-		err  func() error
-		want string
+	tests := []struct {
+		in     string
+		want   string // just relative to m.Dir
+		wantOk bool
 	}{
-		{
-			name: "PathError",
-			err: func() error {
-				return &os.PathError{
-					Op:   "open",
-					Path: "/tmp/sensitive.txt",
-					Err:  fs.ErrNotExist,
-				}
-			},
-			want: `open redacted.41360718: file does not exist`,
-		},
-		{
-			name: "LinkError",
-			err: func() error {
-				return &os.LinkError{
-					Op:  "symlink",
-					Old: "/tmp/sensitive.txt",
-					New: "/tmp/othersensitive.txt",
-					Err: fs.ErrNotExist,
-				}
-			},
-			want: `symlink redacted.41360718 redacted.6bcf093a: file does not exist`,
-		},
-		{
-			name: "something else",
-			err:  func() error { return errors.New("i am another error type") },
-			want: `i am another error type`,
-		},
+		{"", "", false},
+		{"foo", "foo", true},
+		{"./foo", "", false},
+		{"../foo", "", false},
+		{"foo/bar", "", false},
+		{"😋", "😋", true},
+		{"\xde\xad\xbe\xef", "", false},
+		{"foo.partial", "", false},
+		{"foo.deleted", "", false},
+		{strings.Repeat("a", 1024), "", false},
+		{"foo:bar", "", false},
 	}
-
-	for _, tc := range testCases {
-		t.Run(tc.name, func(t *testing.T) {
-			// For debugging
-			var i int
-			for err := tc.err(); err != nil; err = errors.Unwrap(err) {
-				t.Logf("%d: %T @ %p", i, err, err)
-				i++
-			}
-
-			t.Run("Root", func(t *testing.T) {
-				got := redactErr(tc.err()).Error()
-				if got != tc.want {
-					t.Errorf("err = %q; want %q", got, tc.want)
-				}
-			})
-			t.Run("Wrapped", func(t *testing.T) {
-				wrapped := fmt.Errorf("wrapped error: %w", tc.err())
-				want := "wrapped error: " + tc.want
-
-				got := redactErr(wrapped).Error()
-				if got != want {
-					t.Errorf("err = %q; want %q", got, want)
-				}
-			})
-		})
+	for _, tt := range tests {
+		got, gotErr := joinDir(dir, tt.in)
+		got, _ = filepath.Rel(dir, got)
+		gotOk := gotErr == nil
+		if got != tt.want || gotOk != tt.wantOk {
+			t.Errorf("joinDir(%q) = (%v, %v), want (%v, %v)", tt.in, got, gotOk, tt.want, tt.wantOk)
+		}
 	}
 }