Browse Source

watch: don't watch each individual file (#1613)

Dan Bentley 6 years ago
parent
commit
f82e2de57e
4 changed files with 241 additions and 71 deletions
  1. 0 27
      pkg/watch/features.go
  2. 75 4
      pkg/watch/notify_test.go
  3. 56 40
      pkg/watch/watcher_naive.go
  4. 110 0
      pkg/watch/watcher_naive_test.go

+ 0 - 27
pkg/watch/features.go

@@ -1,27 +0,0 @@
-package watch
-
-import (
-	"os"
-	"strconv"
-)
-
-const CheckLimitKey = "WM_CHECK_LIMIT"
-
-var limitChecksEnabled = true
-
-// Allows limit checks to be disabled for testing.
-func SetLimitChecksEnabled(enabled bool) {
-	limitChecksEnabled = enabled
-}
-
-func LimitChecksEnabled() bool {
-	env, ok := os.LookupEnv(CheckLimitKey)
-	if ok {
-		enabled, err := strconv.ParseBool(env)
-		if err == nil {
-			return enabled
-		}
-	}
-
-	return limitChecksEnabled
-}

+ 75 - 4
pkg/watch/notify_test.go

@@ -5,6 +5,7 @@ import (
 	"io/ioutil"
 	"os"
 	"path/filepath"
+	"runtime"
 	"strings"
 	"testing"
 	"time"
@@ -329,7 +330,7 @@ func TestWatchBothDirAndFile(t *testing.T) {
 	f.assertEvents(fileB)
 }
 
-func TestWatchNonexistentDirectory(t *testing.T) {
+func TestWatchNonexistentFileInNonexistentDirectoryCreatedSimultaneously(t *testing.T) {
 	f := newNotifyFixture(t)
 	defer f.tearDown()
 
@@ -347,6 +348,69 @@ func TestWatchNonexistentDirectory(t *testing.T) {
 	f.assertEvents(file)
 }
 
+func TestWatchNonexistentDirectory(t *testing.T) {
+	f := newNotifyFixture(t)
+	defer f.tearDown()
+
+	root := f.JoinPath("root")
+	err := os.Mkdir(root, 0777)
+	if err != nil {
+		t.Fatal(err)
+	}
+	parent := f.JoinPath("parent")
+	file := f.JoinPath("parent", "a")
+
+	f.watch(parent)
+	f.fsync()
+	f.events = nil
+
+	err = os.Mkdir(parent, 0777)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if runtime.GOOS == "darwin" {
+		// for directories that were the root of an Add, we don't report creation, cf. watcher_darwin.go
+		f.assertEvents()
+	} else {
+		f.assertEvents(parent)
+	}
+	f.WriteFile(file, "hello")
+
+	if runtime.GOOS == "darwin" {
+		// mac doesn't return the dir change as part of file creation
+		f.assertEvents(file)
+	} else {
+		f.assertEvents(parent, file)
+	}
+}
+
+// doesn't work on linux
+// func TestWatchNonexistentFileInNonexistentDirectory(t *testing.T) {
+// 	f := newNotifyFixture(t)
+// 	defer f.tearDown()
+
+// 	root := f.JoinPath("root")
+// 	err := os.Mkdir(root, 0777)
+// 	if err != nil {
+// 		t.Fatal(err)
+// 	}
+// 	parent := f.JoinPath("parent")
+// 	file := f.JoinPath("parent", "a")
+
+// 	f.watch(file)
+// 	f.assertEvents()
+
+// 	err = os.Mkdir(parent, 0777)
+// 	if err != nil {
+// 		t.Fatal(err)
+// 	}
+
+// 	f.assertEvents()
+// 	f.WriteFile(file, "hello")
+// 	f.assertEvents(file)
+// }
+
 type notifyFixture struct {
 	*tempdir.TempDirFixture
 	notify  Notify
@@ -355,7 +419,6 @@ type notifyFixture struct {
 }
 
 func newNotifyFixture(t *testing.T) *notifyFixture {
-	SetLimitChecksEnabled(false)
 	notify, err := NewWatcher()
 	if err != nil {
 		t.Fatal(err)
@@ -434,12 +497,20 @@ F:
 }
 
 func (f *notifyFixture) tearDown() {
-	SetLimitChecksEnabled(true)
-
 	err := f.notify.Close()
 	if err != nil {
 		f.T().Fatal(err)
 	}
 
+	// drain channels from watcher
+	go func() {
+		for _ = range f.notify.Events() {
+		}
+	}()
+	go func() {
+		for _ = range f.notify.Errors() {
+		}
+	}()
+
 	f.TempDirFixture.TearDown()
 }

+ 56 - 40
pkg/watch/watcher_naive.go

@@ -7,6 +7,7 @@ import (
 	"log"
 	"os"
 	"path/filepath"
+	"sync"
 
 	"github.com/pkg/errors"
 	"github.com/windmilleng/fsnotify"
@@ -24,6 +25,8 @@ type naiveNotify struct {
 	wrappedEvents chan FileEvent
 	errors        chan error
 
+	mu sync.Mutex
+
 	// Paths that we're watching that should be passed up to the caller.
 	// Note that we may have to watch ancestors of these paths
 	// in order to fulfill the API promise.
@@ -48,11 +51,14 @@ func (d *naiveNotify) Add(name string) error {
 			return errors.Wrapf(err, "notify.Add(%q)", name)
 		}
 	} else {
-		err = d.watcher.Add(name)
+		err = d.watcher.Add(filepath.Dir(name))
 		if err != nil {
-			return errors.Wrapf(err, "notify.Add(%q)", name)
+			return errors.Wrapf(err, "notify.Add(%q)", filepath.Dir(name))
 		}
 	}
+
+	d.mu.Lock()
+	defer d.mu.Unlock()
 	d.notifyList[name] = true
 
 	return nil
@@ -64,6 +70,9 @@ func (d *naiveNotify) watchRecursively(dir string) error {
 			return err
 		}
 
+		if !mode.IsDir() {
+			return nil
+		}
 		err = d.watcher.Add(path)
 		if err != nil {
 			if os.IsNotExist(err) {
@@ -106,56 +115,63 @@ func (d *naiveNotify) Errors() chan error {
 }
 
 func (d *naiveNotify) loop() {
+	defer close(d.wrappedEvents)
 	for e := range d.events {
-		isCreateOp := e.Op&fsnotify.Create == fsnotify.Create
-		shouldWalk := false
-		if isCreateOp {
-			isDir, err := isDir(e.Name)
-			if err != nil {
-				log.Printf("Error stat-ing file %s: %s", e.Name, err)
-				continue
+		shouldNotify := d.shouldNotify(e.Name)
+
+		if e.Op&fsnotify.Create != fsnotify.Create {
+			if shouldNotify {
+				d.wrappedEvents <- FileEvent{e.Name}
 			}
-			shouldWalk = isDir
+			continue
 		}
-		if shouldWalk {
-			err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error {
-				if err != nil {
-					return err
-				}
-				newE := fsnotify.Event{
-					Op:   fsnotify.Create,
-					Name: path,
-				}
 
-				if d.shouldNotify(newE) {
-					d.wrappedEvents <- FileEvent{newE.Name}
+		// TODO(dbentley): if there's a delete should we call d.watcher.Remove to prevent leaking?
+		if err := filepath.Walk(e.Name, func(path string, mode os.FileInfo, err error) error {
+			if err != nil {
+				return err
+			}
 
-					// TODO(dmiller): symlinks 😭
-					err = d.Add(path)
-					if err != nil {
-						log.Printf("Error watching path %s: %s", e.Name, err)
-					}
+			if d.shouldNotify(path) {
+				d.wrappedEvents <- FileEvent{path}
+			}
+
+			// TODO(dmiller): symlinks 😭
+
+			shouldWatch := false
+			if mode.IsDir() {
+				// watch all directories
+				shouldWatch = true
+			} else {
+				// watch files that are explicitly named, but don't watch others
+				_, ok := d.notifyList[path]
+				if ok {
+					shouldWatch = true
+				}
+			}
+			if shouldWatch {
+				err := d.watcher.Add(path)
+				if err != nil {
+					log.Printf("Error watching path %s: %s", e.Name, err)
 				}
-				return nil
-			})
-			if err != nil {
-				log.Printf("Error walking directory %s: %s", e.Name, err)
 			}
-		} else if d.shouldNotify(e) {
-			d.wrappedEvents <- FileEvent{e.Name}
+			return nil
+		}); err != nil {
+			log.Printf("Error walking directory %s: %s", e.Name, err)
 		}
 	}
 }
 
-func (d *naiveNotify) shouldNotify(e fsnotify.Event) bool {
-	if _, ok := d.notifyList[e.Name]; ok {
+func (d *naiveNotify) shouldNotify(path string) bool {
+	d.mu.Lock()
+	defer d.mu.Unlock()
+	if _, ok := d.notifyList[path]; ok {
 		return true
-	} else {
-		// TODO(dmiller): maybe use a prefix tree here?
-		for path := range d.notifyList {
-			if ospath.IsChild(path, e.Name) {
-				return true
-			}
+	}
+	// TODO(dmiller): maybe use a prefix tree here?
+	for root := range d.notifyList {
+		if ospath.IsChild(root, path) {
+			return true
 		}
 	}
 	return false

+ 110 - 0
pkg/watch/watcher_naive_test.go

@@ -0,0 +1,110 @@
+// +build !darwin
+
+package watch
+
+import (
+	"fmt"
+	"os"
+	"os/exec"
+	"strconv"
+	"strings"
+	"testing"
+)
+
+func TestDontWatchEachFile(t *testing.T) {
+	// fsnotify is not recursive, so we need to watch each directory
+	// you can watch individual files with fsnotify, but that is more prone to exhaust resources
+	// this test uses a Linux way to get the number of watches to make sure we're watching
+	// per-directory, not per-file
+	f := newNotifyFixture(t)
+	defer f.tearDown()
+
+	watched := f.TempDir("watched")
+
+	// there are a few different cases we want to test for because the code paths are slightly
+	// different:
+	// 1) initial: data there before we ever call watch
+	// 2) inplace: data we create while the watch is happening
+	// 3) staged: data we create in another directory and then atomically move into place
+
+	// initial
+	f.WriteFile(f.JoinPath(watched, "initial.txt"), "initial data")
+
+	initialDir := f.JoinPath(watched, "initial_dir")
+	if err := os.Mkdir(initialDir, 0777); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 100; i++ {
+		f.WriteFile(f.JoinPath(initialDir, fmt.Sprintf("%d", i)), "initial data")
+	}
+
+	f.watch(watched)
+	f.fsync()
+	if len(f.events) != 0 {
+		t.Fatalf("expected 0 initial events; got %d events: %v", len(f.events), f.events)
+	}
+	f.events = nil
+
+	// inplace
+	inplace := f.JoinPath(watched, "inplace")
+	if err := os.Mkdir(inplace, 0777); err != nil {
+		t.Fatal(err)
+	}
+	f.WriteFile(f.JoinPath(inplace, "inplace.txt"), "inplace data")
+
+	inplaceDir := f.JoinPath(inplace, "inplace_dir")
+	if err := os.Mkdir(inplaceDir, 0777); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 100; i++ {
+		f.WriteFile(f.JoinPath(inplaceDir, fmt.Sprintf("%d", i)), "inplace data")
+	}
+
+	f.fsync()
+	if len(f.events) < 100 {
+		t.Fatalf("expected >100 inplace events; got %d events: %v", len(f.events), f.events)
+	}
+	f.events = nil
+
+	// staged
+	staged := f.TempDir("staged")
+	f.WriteFile(f.JoinPath(staged, "staged.txt"), "staged data")
+
+	stagedDir := f.JoinPath(staged, "staged_dir")
+	if err := os.Mkdir(stagedDir, 0777); err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 100; i++ {
+		f.WriteFile(f.JoinPath(stagedDir, fmt.Sprintf("%d", i)), "staged data")
+	}
+
+	if err := os.Rename(staged, f.JoinPath(watched, "staged")); err != nil {
+		t.Fatal(err)
+	}
+
+	f.fsync()
+	if len(f.events) < 100 {
+		t.Fatalf("expected >100 staged events; got %d events: %v", len(f.events), f.events)
+	}
+	f.events = nil
+
+	pid := os.Getpid()
+
+	output, err := exec.Command("bash", "-c", fmt.Sprintf(
+		"find /proc/%d/fd -lname anon_inode:inotify -printf '%%hinfo/%%f\n' | xargs cat | grep -c '^inotify'", pid)).Output()
+	if err != nil {
+		t.Fatalf("error running command to determine number of watched files: %v", err)
+	}
+
+	n, err := strconv.Atoi(strings.TrimSpace(string(output)))
+	if err != nil {
+		t.Fatalf("couldn't parse number of watched files: %v", err)
+	}
+
+	if n > 10 {
+		t.Fatalf("watching more than 10 files: %d", n)
+	}
+}