|
@@ -10,6 +10,7 @@ import (
|
|
|
"context"
|
|
|
"os"
|
|
|
"path/filepath"
|
|
|
+ "runtime"
|
|
|
"strconv"
|
|
|
"testing"
|
|
|
"time"
|
|
@@ -62,7 +63,7 @@ func TestAggregate(t *testing.T) {
|
|
|
folderCfg := defaultFolderCfg.Copy()
|
|
|
folderCfg.ID = "Aggregate"
|
|
|
ctx, _ := context.WithCancel(context.Background())
|
|
|
- a := new(folderCfg, ctx)
|
|
|
+ a := newAggregator(folderCfg, ctx)
|
|
|
|
|
|
// checks whether maxFilesPerDir events in one dir are kept as is
|
|
|
for i := 0; i < maxFilesPerDir; i++ {
|
|
@@ -228,54 +229,56 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e
|
|
|
|
|
|
folderCfg := defaultFolderCfg.Copy()
|
|
|
folderCfg.ID = name
|
|
|
- a := new(folderCfg, ctx)
|
|
|
+ a := newAggregator(folderCfg, ctx)
|
|
|
a.notifyTimeout = testNotifyTimeout
|
|
|
|
|
|
startTime := time.Now()
|
|
|
go a.mainLoop(eventChan, watchChan, defaultCfg)
|
|
|
|
|
|
- sleepMs(10)
|
|
|
- go testAggregatorOutput(t, watchChan, expectedBatches, startTime, ctx)
|
|
|
+ sleepMs(20)
|
|
|
|
|
|
- testCase(eventChan)
|
|
|
+ go testCase(eventChan)
|
|
|
+
|
|
|
+ testAggregatorOutput(t, watchChan, expectedBatches, startTime)
|
|
|
|
|
|
- timeout := time.NewTimer(time.Duration(expectedBatches[len(expectedBatches)-1].beforeMs+100) * time.Millisecond)
|
|
|
- <-timeout.C
|
|
|
cancel()
|
|
|
}
|
|
|
|
|
|
-func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time, ctx context.Context) {
|
|
|
+func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
|
|
|
var received []string
|
|
|
var elapsedTime time.Duration
|
|
|
batchIndex := 0
|
|
|
+ timeout := time.NewTimer(10 * time.Second)
|
|
|
for {
|
|
|
select {
|
|
|
- case <-ctx.Done():
|
|
|
- if batchIndex != len(expectedBatches) {
|
|
|
- t.Errorf("Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
|
|
|
- }
|
|
|
+ case <-timeout.C:
|
|
|
+ t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
|
|
|
return
|
|
|
case received = <-fsWatchChan:
|
|
|
}
|
|
|
|
|
|
- if batchIndex >= len(expectedBatches) {
|
|
|
- t.Errorf("Received batch %d (only %d expected)", batchIndex+1, len(expectedBatches))
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
elapsedTime = time.Since(startTime)
|
|
|
expected := expectedBatches[batchIndex]
|
|
|
- switch {
|
|
|
- case elapsedTime < durationMs(expected.afterMs):
|
|
|
- t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
|
|
|
|
|
|
- case elapsedTime > durationMs(expected.beforeMs):
|
|
|
- t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
|
|
|
+ if runtime.GOOS != "darwin" {
|
|
|
+ switch {
|
|
|
+ case elapsedTime < durationMs(expected.afterMs):
|
|
|
+ t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
|
|
|
+
|
|
|
+ case elapsedTime > durationMs(expected.beforeMs):
|
|
|
+ t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- case len(received) != len(expected.paths):
|
|
|
+ if len(received) != len(expected.paths) {
|
|
|
t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1)
|
|
|
}
|
|
|
compareBatchToExpected(t, received, expected.paths)
|
|
|
+
|
|
|
batchIndex++
|
|
|
+ if batchIndex == len(expectedBatches) {
|
|
|
+ // received everything we expected to
|
|
|
+ return
|
|
|
+ }
|
|
|
}
|
|
|
}
|