Explorar el Código

lib/watchaggregator: Speedup propagation of removals (fixes #4953) (#4955)

Simon Frei hace 7 años
padre
commit
a83176c77a
Se han modificado 3 ficheros con 170 adiciones y 105 borrados
  1. 5 0
      lib/fs/filesystem.go
  2. 72 49
      lib/watchaggregator/aggregator.go
  3. 93 56
      lib/watchaggregator/aggregator_test.go

+ 5 - 0
lib/fs/filesystem.go

@@ -107,6 +107,11 @@ const (
 	Mixed // Should probably not be necessary to be used in filesystem interface implementation
 )
 
+// Merge returns Mixed, except if evType and other are the same and not Mixed.
+func (evType EventType) Merge(other EventType) EventType {
+	return evType | other
+}
+
 func (evType EventType) String() string {
 	switch {
 	case evType == NonRemove:

+ 72 - 49
lib/watchaggregator/aggregator.go

@@ -26,6 +26,8 @@ var (
 
 // aggregatedEvent represents potentially multiple events at and/or recursively
 // below one path until it times out and a scan is scheduled.
+// If it represents multiple events and there are events of both Remove and
+// NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).
 type aggregatedEvent struct {
 	firstModTime time.Time
 	lastModTime  time.Time
@@ -46,14 +48,6 @@ func newEventDir() *eventDir {
 	}
 }
 
-func (dir *eventDir) eventCount() int {
-	count := len(dir.events)
-	for _, dir := range dir.dirs {
-		count += dir.eventCount()
-	}
-	return count
-}
-
 func (dir *eventDir) childCount() int {
 	return len(dir.events) + len(dir.dirs)
 }
@@ -110,6 +104,8 @@ type aggregator struct {
 	notifyTimer           *time.Timer
 	notifyTimerNeedsReset bool
 	notifyTimerResetChan  chan time.Duration
+	counts                map[fs.EventType]int
+	root                  *eventDir
 	ctx                   context.Context
 }
 
@@ -119,6 +115,8 @@ func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *a
 		folderCfgUpdate:       make(chan config.FolderConfiguration),
 		notifyTimerNeedsReset: false,
 		notifyTimerResetChan:  make(chan time.Duration),
+		counts:                make(map[fs.EventType]int),
+		root:                  newEventDir(),
 		ctx:                   ctx,
 	}
 
@@ -143,16 +141,14 @@ func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *conf
 
 	cfg.Subscribe(a)
 
-	rootEventDir := newEventDir()
-
 	for {
 		select {
 		case event := <-in:
-			a.newEvent(event, rootEventDir, inProgress)
+			a.newEvent(event, inProgress)
 		case event := <-inProgressItemSubscription.C():
 			updateInProgressSet(event, inProgress)
 		case <-a.notifyTimer.C:
-			a.actOnTimer(rootEventDir, out)
+			a.actOnTimer(out)
 		case interval := <-a.notifyTimerResetChan:
 			a.resetNotifyTimer(interval)
 		case folderCfg := <-a.folderCfgUpdate:
@@ -165,8 +161,8 @@ func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *conf
 	}
 }
 
-func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress map[string]struct{}) {
-	if _, ok := rootEventDir.events["."]; ok {
+func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {
+	if _, ok := a.root.events["."]; ok {
 		l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
 		return
 	}
@@ -174,29 +170,31 @@ func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress
 		l.Debugln(a, "Skipping path we modified:", event.Name)
 		return
 	}
-	a.aggregateEvent(event, time.Now(), rootEventDir)
+	a.aggregateEvent(event, time.Now())
 }
 
-func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventDir *eventDir) {
-	if event.Name == "." || rootEventDir.eventCount() == maxFiles {
+func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {
+	if event.Name == "." || a.eventCount() == maxFiles {
 		l.Debugln(a, "Scan entire folder")
 		firstModTime := evTime
-		if rootEventDir.childCount() != 0 {
-			event.Type |= rootEventDir.eventType()
-			firstModTime = rootEventDir.firstModTime()
+		if a.root.childCount() != 0 {
+			event.Type = event.Type.Merge(a.root.eventType())
+			firstModTime = a.root.firstModTime()
 		}
-		rootEventDir.dirs = make(map[string]*eventDir)
-		rootEventDir.events = make(map[string]*aggregatedEvent)
-		rootEventDir.events["."] = &aggregatedEvent{
+		a.root.dirs = make(map[string]*eventDir)
+		a.root.events = make(map[string]*aggregatedEvent)
+		a.root.events["."] = &aggregatedEvent{
 			firstModTime: firstModTime,
 			lastModTime:  evTime,
 			evType:       event.Type,
 		}
+		a.counts = make(map[fs.EventType]int)
+		a.counts[event.Type]++
 		a.resetNotifyTimerIfNeeded()
 		return
 	}
 
-	parentDir := rootEventDir
+	parentDir := a.root
 
 	// Check if any parent directory is already tracked or will exceed
 	// events per directory limit bottom up
@@ -211,7 +209,11 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD
 
 		if ev, ok := parentDir.events[name]; ok {
 			ev.lastModTime = evTime
-			ev.evType |= event.Type
+			if merged := event.Type.Merge(ev.evType); ev.evType != merged {
+				a.counts[ev.evType]--
+				ev.evType = merged
+				a.counts[ev.evType]++
+			}
 			l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
 			return
 		}
@@ -219,7 +221,7 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD
 		if parentDir.childCount() == localMaxFilesPerDir {
 			l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
 			event.Name = filepath.Dir(currPath)
-			a.aggregateEvent(event, evTime, rootEventDir)
+			a.aggregateEvent(event, evTime)
 			return
 		}
 
@@ -244,7 +246,11 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD
 
 	if ev, ok := parentDir.events[name]; ok {
 		ev.lastModTime = evTime
-		ev.evType |= event.Type
+		if merged := event.Type.Merge(ev.evType); ev.evType != merged {
+			a.counts[ev.evType]--
+			ev.evType = merged
+			a.counts[ev.evType]++
+		}
 		l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
 		return
 	}
@@ -256,14 +262,17 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD
 	if !ok && parentDir.childCount() == localMaxFilesPerDir {
 		l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
 		event.Name = filepath.Dir(event.Name)
-		a.aggregateEvent(event, evTime, rootEventDir)
+		a.aggregateEvent(event, evTime)
 		return
 	}
 
 	firstModTime := evTime
 	if ok {
 		firstModTime = childDir.firstModTime()
-		event.Type |= childDir.eventType()
+		if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {
+			a.counts[event.Type]--
+			event.Type = merged
+		}
 		delete(parentDir.dirs, name)
 	}
 	l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
@@ -272,6 +281,7 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD
 		lastModTime:  evTime,
 		evType:       event.Type,
 	}
+	a.counts[event.Type]++
 	a.resetNotifyTimerIfNeeded()
 }
 
@@ -289,22 +299,27 @@ func (a *aggregator) resetNotifyTimer(duration time.Duration) {
 	a.notifyTimer.Reset(duration)
 }
 
-func (a *aggregator) actOnTimer(rootEventDir *eventDir, out chan<- []string) {
-	eventCount := rootEventDir.eventCount()
-	if eventCount == 0 {
+func (a *aggregator) actOnTimer(out chan<- []string) {
+	c := a.eventCount()
+	if c == 0 {
 		l.Debugln(a, "No tracked events, waiting for new event.")
 		a.notifyTimerNeedsReset = true
 		return
 	}
-	oldevents := a.popOldEvents(rootEventDir, ".", time.Now())
-	if len(oldevents) == 0 {
+	oldEvents := make(map[string]*aggregatedEvent, c)
+	a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
+	if a.notifyDelay != a.notifyTimeout && a.counts[fs.NonRemove]+a.counts[fs.Mixed] == 0 && a.counts[fs.Remove] != 0 {
+		// Only deletion events remaining, no need to delay them additionally
+		a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
+	}
+	if len(oldEvents) == 0 {
 		l.Debugln(a, "No old fs events")
 		a.resetNotifyTimer(a.notifyDelay)
 		return
 	}
 	// Sending to channel might block for a long time, but we need to keep
 	// reading from notify backend channel to avoid overflow
-	go a.notify(oldevents, out)
+	go a.notify(oldEvents, out)
 }
 
 // Schedule scan for given events dispatching deletes last and reset notification
@@ -348,42 +363,50 @@ func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []
 // popOldEvents finds events that should be scheduled for scanning recursively in dirs,
 // removes those events and empty eventDirs and returns a map with all the removed
 // events referenced by their filesystem path
-func (a *aggregator) popOldEvents(dir *eventDir, dirPath string, currTime time.Time) map[string]*aggregatedEvent {
-	oldEvents := make(map[string]*aggregatedEvent)
+func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {
 	for childName, childDir := range dir.dirs {
-		for evPath, event := range a.popOldEvents(childDir, filepath.Join(dirPath, childName), currTime) {
-			oldEvents[evPath] = event
-		}
+		a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)
 		if childDir.childCount() == 0 {
 			delete(dir.dirs, childName)
 		}
 	}
 	for name, event := range dir.events {
-		if a.isOld(event, currTime) {
-			oldEvents[filepath.Join(dirPath, name)] = event
+		if a.isOld(event, currTime, delayRem) {
+			to[filepath.Join(dirPath, name)] = event
 			delete(dir.events, name)
+			a.counts[event.evType]--
 		}
 	}
-	return oldEvents
 }
 
-func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time) bool {
-	// Deletes should always be scanned last, therefore they are always
-	// delayed by letting them time out (see below).
+func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {
+	// Deletes should in general be scanned last, therefore they are delayed by
+	// letting them time out. This behaviour is overriden by delayRem == false.
+	// Refer to following comments as to why.
 	// An event that has not registered any new modifications recently is scanned.
 	// a.notifyDelay is the user facing value signifying the normal delay between
-	// a picking up a modification and scanning it. As scheduling scans happens at
+	// picking up a modification and scanning it. As scheduling scans happens at
 	// regular intervals of a.notifyDelay the delay of a single event is not exactly
-	// a.notifyDelay, but lies in in the range of 0.5 to 1.5 times a.notifyDelay.
-	if ev.evType == fs.NonRemove && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
+	// a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.
+	if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
 		return true
 	}
 	// When an event registers repeat modifications or involves removals it
 	// is delayed to reduce resource usage, but after a certain time (notifyTimeout)
 	// passed it is scanned anyway.
+	// If only removals are remaining to be scanned, there is no point to delay
+	// removals further, so this behaviour is overriden by delayRem == false.
 	return currTime.Sub(ev.firstModTime) > a.notifyTimeout
 }
 
+func (a *aggregator) eventCount() int {
+	c := 0
+	for _, v := range a.counts {
+		c += v
+	}
+	return c
+}
+
 func (a *aggregator) String() string {
 	return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
 }

+ 93 - 56
lib/watchaggregator/aggregator_test.go

@@ -23,17 +23,19 @@ import (
 func TestMain(m *testing.M) {
 	maxFiles = 32
 	maxFilesPerDir = 8
-	defer func() {
-		maxFiles = 512
-		maxFilesPerDir = 128
-	}()
 
-	os.Exit(m.Run())
+	ret := m.Run()
+
+	maxFiles = 512
+	maxFilesPerDir = 128
+
+	os.Exit(ret)
 }
 
 const (
-	testNotifyDelayS  = 1
-	testNotifyTimeout = 2 * time.Second
+	testNotifyDelayS   = 1
+	testNotifyTimeout  = 2 * time.Second
+	timeoutWithinBatch = time.Second
 )
 
 var (
@@ -48,8 +50,10 @@ var (
 	})
 )
 
+// Represents possibly multiple (different event types) expected paths from
+// aggregation, that should be received back to back.
 type expectedBatch struct {
-	paths    []string
+	paths    [][]string
 	afterMs  int
 	beforeMs int
 }
@@ -57,7 +61,6 @@ type expectedBatch struct {
 // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
 // aggregated to parent dir
 func TestAggregate(t *testing.T) {
-	evDir := newEventDir()
 	inProgress := make(map[string]struct{})
 
 	folderCfg := defaultFolderCfg.Copy()
@@ -67,45 +70,44 @@ func TestAggregate(t *testing.T) {
 
 	// checks whether maxFilesPerDir events in one dir are kept as is
 	for i := 0; i < maxFilesPerDir; i++ {
-		a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
+		a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
 	}
-	if len(getEventPaths(evDir, ".", a)) != maxFilesPerDir {
-		t.Errorf("Unexpected number of events stored")
+	if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir {
+		t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir)
 	}
 
 	// checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
-	a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, evDir, inProgress)
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
+	a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, inProgress)
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
 
 	// checks that adding an event below "parent" does not change anything
-	a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, evDir, inProgress)
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
+	a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, inProgress)
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
 
 	// again test aggregation in "parent" but with event in subdirs
-	evDir = newEventDir()
+	a = newAggregator(folderCfg, ctx)
 	for i := 0; i < maxFilesPerDir; i++ {
-		a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
+		a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
 	}
-	a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
+	a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
 
 	// test aggregation in root
-	evDir = newEventDir()
+	a = newAggregator(folderCfg, ctx)
 	for i := 0; i < maxFiles; i++ {
-		a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, evDir, inProgress)
+		a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, inProgress)
 	}
-	if len(getEventPaths(evDir, ".", a)) != maxFiles {
+	if len(getEventPaths(a.root, ".", a)) != maxFiles {
 		t.Errorf("Unexpected number of events stored in root")
 	}
-	a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
+	a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
 
 	// checks that adding an event when "." is already stored is a noop
-	a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, evDir, inProgress)
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
+	a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, inProgress)
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
 
-	// TestOverflow checks that the entire folder is scanned when maxFiles is reached
-	evDir = newEventDir()
+	a = newAggregator(folderCfg, ctx)
 	filesPerDir := maxFilesPerDir / 2
 	dirs := make([]string, maxFiles/filesPerDir+1)
 	for i := 0; i < maxFiles/filesPerDir+1; i++ {
@@ -113,10 +115,10 @@ func TestAggregate(t *testing.T) {
 	}
 	for _, dir := range dirs {
 		for i := 0; i < filesPerDir; i++ {
-			a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
+			a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, inProgress)
 		}
 	}
-	compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
+	compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
 }
 
 // TestInProgress checks that ignoring files currently edited by Syncthing works
@@ -137,7 +139,7 @@ func TestInProgress(t *testing.T) {
 	}
 
 	expectedBatches := []expectedBatch{
-		{[]string{"notinprogress"}, 2000, 3500},
+		{[][]string{{"notinprogress"}}, 2000, 3500},
 	}
 
 	testScenario(t, "InProgress", testCase, expectedBatches)
@@ -149,6 +151,7 @@ func TestDelay(t *testing.T) {
 	file := filepath.Join("parent", "file")
 	delayed := "delayed"
 	del := "deleted"
+	delAfter := "deletedAfter"
 	both := filepath.Join("parent", "sub", "both")
 	testCase := func(c chan<- fs.Event) {
 		sleepMs(200)
@@ -166,16 +169,15 @@ func TestDelay(t *testing.T) {
 			timer.Reset(delay)
 			c <- fs.Event{Name: delayed, Type: fs.NonRemove}
 		}
+		c <- fs.Event{Name: delAfter, Type: fs.Remove}
 		<-timer.C
 	}
 
 	// batches that we expect to receive with time interval in milliseconds
 	expectedBatches := []expectedBatch{
-		{[]string{file}, 500, 2500},
-		{[]string{delayed}, 2500, 4500},
-		{[]string{both}, 2500, 4500},
-		{[]string{del}, 2500, 4500},
-		{[]string{delayed}, 3600, 7000},
+		{[][]string{{file}}, 500, 2500},
+		{[][]string{{delayed}, {both}, {del}}, 2500, 4500},
+		{[][]string{{delayed}, {delAfter}}, 3600, 7000},
 	}
 
 	testScenario(t, "Delay", testCase, expectedBatches)
@@ -202,7 +204,7 @@ func durationMs(ms int) time.Duration {
 	return time.Duration(ms) * time.Millisecond
 }
 
-func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string) {
+func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) {
 	for _, expected := range expectedPaths {
 		expected = filepath.Clean(expected)
 		found := false
@@ -214,15 +216,28 @@ func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string
 			}
 		}
 		if !found {
-			t.Errorf("Did not receive event %s", expected)
+			missing = append(missing, expected)
 		}
 	}
 	for _, received := range batch {
-		t.Errorf("Received unexpected event %s", received)
+		unexpected = append(unexpected, received)
+	}
+	return missing, unexpected
+}
+
+func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) {
+	t.Helper()
+	missing, unexpected := compareBatchToExpected(batch, expectedPaths)
+	for _, p := range missing {
+		t.Errorf("Did not receive event %s", p)
+	}
+	for _, p := range unexpected {
+		t.Errorf("Received unexpected event %s", p)
 	}
 }
 
 func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
+	t.Helper()
 	ctx, cancel := context.WithCancel(context.Background())
 	eventChan := make(chan fs.Event)
 	watchChan := make(chan []string)
@@ -245,9 +260,10 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e
 }
 
 func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
+	t.Helper()
 	var received []string
 	var elapsedTime time.Duration
-	batchIndex := 0
+	var batchIndex, innerIndex int
 	timeout := time.NewTimer(10 * time.Second)
 	for {
 		select {
@@ -257,28 +273,49 @@ func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBat
 		case received = <-fsWatchChan:
 		}
 
-		elapsedTime = time.Since(startTime)
-		expected := expectedBatches[batchIndex]
+		if batchIndex >= len(expectedBatches) {
+			t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches))
+			continue
+		}
 
 		if runtime.GOOS != "darwin" {
-			switch {
-			case elapsedTime < durationMs(expected.afterMs):
-				t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
-
-			case elapsedTime > durationMs(expected.beforeMs):
-				t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
+			now := time.Since(startTime)
+			if innerIndex == 0 {
+				switch {
+				case now < durationMs(expectedBatches[batchIndex].afterMs):
+					t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
+
+				case now > durationMs(expectedBatches[batchIndex].beforeMs):
+					t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
+				}
+			} else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
+				t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
 			}
+			elapsedTime = now
 		}
 
-		if len(received) != len(expected.paths) {
-			t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1)
+		expected := expectedBatches[batchIndex].paths[innerIndex]
+
+		if len(received) != len(expected) {
+			t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1)
+		}
+		missing, unexpected := compareBatchToExpected(received, expected)
+		for _, p := range missing {
+			t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
+		}
+		for _, p := range unexpected {
+			t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
 		}
-		compareBatchToExpected(t, received, expected.paths)
 
-		batchIndex++
-		if batchIndex == len(expectedBatches) {
-			// received everything we expected to
-			return
+		if innerIndex == len(expectedBatches[batchIndex].paths)-1 {
+			if batchIndex == len(expectedBatches)-1 {
+				// received everything we expected to
+				return
+			}
+			innerIndex = 0
+			batchIndex++
+		} else {
+			innerIndex++
 		}
 	}
 }