diff --git a/lib/fs/filesystem.go b/lib/fs/filesystem.go index 7e5e30d2..3b8c8fef 100644 --- a/lib/fs/filesystem.go +++ b/lib/fs/filesystem.go @@ -107,6 +107,11 @@ const ( Mixed // Should probably not be necessary to be used in filesystem interface implementation ) +// Merge returns Mixed, except if evType and other are the same and not Mixed. +func (evType EventType) Merge(other EventType) EventType { + return evType | other +} + func (evType EventType) String() string { switch { case evType == NonRemove: diff --git a/lib/watchaggregator/aggregator.go b/lib/watchaggregator/aggregator.go index 4533034f..057cd1ef 100644 --- a/lib/watchaggregator/aggregator.go +++ b/lib/watchaggregator/aggregator.go @@ -26,6 +26,8 @@ var ( // aggregatedEvent represents potentially multiple events at and/or recursively // below one path until it times out and a scan is scheduled. +// If it represents multiple events and there are events of both Remove and +// NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge). type aggregatedEvent struct { firstModTime time.Time lastModTime time.Time @@ -46,14 +48,6 @@ func newEventDir() *eventDir { } } -func (dir *eventDir) eventCount() int { - count := len(dir.events) - for _, dir := range dir.dirs { - count += dir.eventCount() - } - return count -} - func (dir *eventDir) childCount() int { return len(dir.events) + len(dir.dirs) } @@ -110,6 +104,8 @@ type aggregator struct { notifyTimer *time.Timer notifyTimerNeedsReset bool notifyTimerResetChan chan time.Duration + counts map[fs.EventType]int + root *eventDir ctx context.Context } @@ -119,6 +115,8 @@ func newAggregator(folderCfg config.FolderConfiguration, ctx context.Context) *a folderCfgUpdate: make(chan config.FolderConfiguration), notifyTimerNeedsReset: false, notifyTimerResetChan: make(chan time.Duration), + counts: make(map[fs.EventType]int), + root: newEventDir(), ctx: ctx, } @@ -143,16 +141,14 @@ func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *conf cfg.Subscribe(a) - rootEventDir := newEventDir() - for { select { case event := <-in: - a.newEvent(event, rootEventDir, inProgress) + a.newEvent(event, inProgress) case event := <-inProgressItemSubscription.C(): updateInProgressSet(event, inProgress) case <-a.notifyTimer.C: - a.actOnTimer(rootEventDir, out) + a.actOnTimer(out) case interval := <-a.notifyTimerResetChan: a.resetNotifyTimer(interval) case folderCfg := <-a.folderCfgUpdate: @@ -165,8 +161,8 @@ func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *conf } } -func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress map[string]struct{}) { - if _, ok := rootEventDir.events["."]; ok { +func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) { + if _, ok := a.root.events["."]; ok { l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name) return } @@ -174,29 +170,31 @@ func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress l.Debugln(a, "Skipping path we modified:", event.Name) return } - a.aggregateEvent(event, time.Now(), rootEventDir) + a.aggregateEvent(event, time.Now()) } -func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventDir *eventDir) { - if event.Name == "." || rootEventDir.eventCount() == maxFiles { +func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) { + if event.Name == "." || a.eventCount() == maxFiles { l.Debugln(a, "Scan entire folder") firstModTime := evTime - if rootEventDir.childCount() != 0 { - event.Type |= rootEventDir.eventType() - firstModTime = rootEventDir.firstModTime() + if a.root.childCount() != 0 { + event.Type = event.Type.Merge(a.root.eventType()) + firstModTime = a.root.firstModTime() } - rootEventDir.dirs = make(map[string]*eventDir) - rootEventDir.events = make(map[string]*aggregatedEvent) - rootEventDir.events["."] = &aggregatedEvent{ + a.root.dirs = make(map[string]*eventDir) + a.root.events = make(map[string]*aggregatedEvent) + a.root.events["."] = &aggregatedEvent{ firstModTime: firstModTime, lastModTime: evTime, evType: event.Type, } + a.counts = make(map[fs.EventType]int) + a.counts[event.Type]++ a.resetNotifyTimerIfNeeded() return } - parentDir := rootEventDir + parentDir := a.root // Check if any parent directory is already tracked or will exceed // events per directory limit bottom up @@ -211,7 +209,11 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD if ev, ok := parentDir.events[name]; ok { ev.lastModTime = evTime - ev.evType |= event.Type + if merged := event.Type.Merge(ev.evType); ev.evType != merged { + a.counts[ev.evType]-- + ev.evType = merged + a.counts[ev.evType]++ + } l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name) return } @@ -219,7 +221,7 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD if parentDir.childCount() == localMaxFilesPerDir { l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name) event.Name = filepath.Dir(currPath) - a.aggregateEvent(event, evTime, rootEventDir) + a.aggregateEvent(event, evTime) return } @@ -244,7 +246,11 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD if ev, ok := parentDir.events[name]; ok { ev.lastModTime = evTime - ev.evType |= event.Type + if merged := event.Type.Merge(ev.evType); ev.evType != merged { + a.counts[ev.evType]-- + ev.evType = merged + a.counts[ev.evType]++ + } l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name) return } @@ -256,14 +262,17 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD if !ok && parentDir.childCount() == localMaxFilesPerDir { l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name) event.Name = filepath.Dir(event.Name) - a.aggregateEvent(event, evTime, rootEventDir) + a.aggregateEvent(event, evTime) return } firstModTime := evTime if ok { firstModTime = childDir.firstModTime() - event.Type |= childDir.eventType() + if merged := event.Type.Merge(childDir.eventType()); event.Type != merged { + a.counts[event.Type]-- + event.Type = merged + } delete(parentDir.dirs, name) } l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name) @@ -272,6 +281,7 @@ func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventD lastModTime: evTime, evType: event.Type, } + a.counts[event.Type]++ a.resetNotifyTimerIfNeeded() } @@ -289,22 +299,27 @@ func (a *aggregator) resetNotifyTimer(duration time.Duration) { a.notifyTimer.Reset(duration) } -func (a *aggregator) actOnTimer(rootEventDir *eventDir, out chan<- []string) { - eventCount := rootEventDir.eventCount() - if eventCount == 0 { +func (a *aggregator) actOnTimer(out chan<- []string) { + c := a.eventCount() + if c == 0 { l.Debugln(a, "No tracked events, waiting for new event.") a.notifyTimerNeedsReset = true return } - oldevents := a.popOldEvents(rootEventDir, ".", time.Now()) - if len(oldevents) == 0 { + oldEvents := make(map[string]*aggregatedEvent, c) + a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true) + if a.notifyDelay != a.notifyTimeout && a.counts[fs.NonRemove]+a.counts[fs.Mixed] == 0 && a.counts[fs.Remove] != 0 { + // Only deletion events remaining, no need to delay them additionally + a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false) + } + if len(oldEvents) == 0 { l.Debugln(a, "No old fs events") a.resetNotifyTimer(a.notifyDelay) return } // Sending to channel might block for a long time, but we need to keep // reading from notify backend channel to avoid overflow - go a.notify(oldevents, out) + go a.notify(oldEvents, out) } // Schedule scan for given events dispatching deletes last and reset notification @@ -348,42 +363,50 @@ func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- [] // popOldEvents finds events that should be scheduled for scanning recursively in dirs, // removes those events and empty eventDirs and returns a map with all the removed // events referenced by their filesystem path -func (a *aggregator) popOldEvents(dir *eventDir, dirPath string, currTime time.Time) map[string]*aggregatedEvent { - oldEvents := make(map[string]*aggregatedEvent) +func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) { for childName, childDir := range dir.dirs { - for evPath, event := range a.popOldEvents(childDir, filepath.Join(dirPath, childName), currTime) { - oldEvents[evPath] = event - } + a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem) if childDir.childCount() == 0 { delete(dir.dirs, childName) } } for name, event := range dir.events { - if a.isOld(event, currTime) { - oldEvents[filepath.Join(dirPath, name)] = event + if a.isOld(event, currTime, delayRem) { + to[filepath.Join(dirPath, name)] = event delete(dir.events, name) + a.counts[event.evType]-- } } - return oldEvents } -func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time) bool { - // Deletes should always be scanned last, therefore they are always - // delayed by letting them time out (see below). +func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool { + // Deletes should in general be scanned last, therefore they are delayed by + // letting them time out. This behaviour is overriden by delayRem == false. + // Refer to following comments as to why. // An event that has not registered any new modifications recently is scanned. // a.notifyDelay is the user facing value signifying the normal delay between - // a picking up a modification and scanning it. As scheduling scans happens at + // picking up a modification and scanning it. As scheduling scans happens at // regular intervals of a.notifyDelay the delay of a single event is not exactly - // a.notifyDelay, but lies in in the range of 0.5 to 1.5 times a.notifyDelay. - if ev.evType == fs.NonRemove && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay { + // a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay. + if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay { return true } // When an event registers repeat modifications or involves removals it // is delayed to reduce resource usage, but after a certain time (notifyTimeout) // passed it is scanned anyway. + // If only removals are remaining to be scanned, there is no point to delay + // removals further, so this behaviour is overriden by delayRem == false. return currTime.Sub(ev.firstModTime) > a.notifyTimeout } +func (a *aggregator) eventCount() int { + c := 0 + for _, v := range a.counts { + c += v + } + return c +} + func (a *aggregator) String() string { return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description()) } diff --git a/lib/watchaggregator/aggregator_test.go b/lib/watchaggregator/aggregator_test.go index 9daf0d13..c1302021 100644 --- a/lib/watchaggregator/aggregator_test.go +++ b/lib/watchaggregator/aggregator_test.go @@ -23,17 +23,19 @@ import ( func TestMain(m *testing.M) { maxFiles = 32 maxFilesPerDir = 8 - defer func() { - maxFiles = 512 - maxFilesPerDir = 128 - }() - os.Exit(m.Run()) + ret := m.Run() + + maxFiles = 512 + maxFilesPerDir = 128 + + os.Exit(ret) } const ( - testNotifyDelayS = 1 - testNotifyTimeout = 2 * time.Second + testNotifyDelayS = 1 + testNotifyTimeout = 2 * time.Second + timeoutWithinBatch = time.Second ) var ( @@ -48,8 +50,10 @@ var ( }) ) +// Represents possibly multiple (different event types) expected paths from +// aggregation, that should be received back to back. type expectedBatch struct { - paths []string + paths [][]string afterMs int beforeMs int } @@ -57,7 +61,6 @@ type expectedBatch struct { // TestAggregate checks whether maxFilesPerDir+1 events in one dir are // aggregated to parent dir func TestAggregate(t *testing.T) { - evDir := newEventDir() inProgress := make(map[string]struct{}) folderCfg := defaultFolderCfg.Copy() @@ -67,45 +70,44 @@ func TestAggregate(t *testing.T) { // checks whether maxFilesPerDir events in one dir are kept as is for i := 0; i < maxFilesPerDir; i++ { - a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress) + a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress) } - if len(getEventPaths(evDir, ".", a)) != maxFilesPerDir { - t.Errorf("Unexpected number of events stored") + if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir { + t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir) } // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir - a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, evDir, inProgress) - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"}) + a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, inProgress) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"}) // checks that adding an event below "parent" does not change anything - a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, evDir, inProgress) - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"}) + a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, inProgress) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"}) // again test aggregation in "parent" but with event in subdirs - evDir = newEventDir() + a = newAggregator(folderCfg, ctx) for i := 0; i < maxFilesPerDir; i++ { - a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress) + a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress) } - a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress) - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"}) + a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"}) // test aggregation in root - evDir = newEventDir() + a = newAggregator(folderCfg, ctx) for i := 0; i < maxFiles; i++ { - a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, evDir, inProgress) + a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, inProgress) } - if len(getEventPaths(evDir, ".", a)) != maxFiles { + if len(getEventPaths(a.root, ".", a)) != maxFiles { t.Errorf("Unexpected number of events stored in root") } - a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress) - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."}) + a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."}) // checks that adding an event when "." is already stored is a noop - a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, evDir, inProgress) - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."}) + a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, inProgress) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."}) - // TestOverflow checks that the entire folder is scanned when maxFiles is reached - evDir = newEventDir() + a = newAggregator(folderCfg, ctx) filesPerDir := maxFilesPerDir / 2 dirs := make([]string, maxFiles/filesPerDir+1) for i := 0; i < maxFiles/filesPerDir+1; i++ { @@ -113,10 +115,10 @@ func TestAggregate(t *testing.T) { } for _, dir := range dirs { for i := 0; i < filesPerDir; i++ { - a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress) + a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, inProgress) } } - compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."}) + compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."}) } // TestInProgress checks that ignoring files currently edited by Syncthing works @@ -137,7 +139,7 @@ func TestInProgress(t *testing.T) { } expectedBatches := []expectedBatch{ - {[]string{"notinprogress"}, 2000, 3500}, + {[][]string{{"notinprogress"}}, 2000, 3500}, } testScenario(t, "InProgress", testCase, expectedBatches) @@ -149,6 +151,7 @@ func TestDelay(t *testing.T) { file := filepath.Join("parent", "file") delayed := "delayed" del := "deleted" + delAfter := "deletedAfter" both := filepath.Join("parent", "sub", "both") testCase := func(c chan<- fs.Event) { sleepMs(200) @@ -166,16 +169,15 @@ func TestDelay(t *testing.T) { timer.Reset(delay) c <- fs.Event{Name: delayed, Type: fs.NonRemove} } + c <- fs.Event{Name: delAfter, Type: fs.Remove} <-timer.C } // batches that we expect to receive with time interval in milliseconds expectedBatches := []expectedBatch{ - {[]string{file}, 500, 2500}, - {[]string{delayed}, 2500, 4500}, - {[]string{both}, 2500, 4500}, - {[]string{del}, 2500, 4500}, - {[]string{delayed}, 3600, 7000}, + {[][]string{{file}}, 500, 2500}, + {[][]string{{delayed}, {both}, {del}}, 2500, 4500}, + {[][]string{{delayed}, {delAfter}}, 3600, 7000}, } testScenario(t, "Delay", testCase, expectedBatches) @@ -202,7 +204,7 @@ func durationMs(ms int) time.Duration { return time.Duration(ms) * time.Millisecond } -func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string) { +func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) { for _, expected := range expectedPaths { expected = filepath.Clean(expected) found := false @@ -214,15 +216,28 @@ func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string } } if !found { - t.Errorf("Did not receive event %s", expected) + missing = append(missing, expected) } } for _, received := range batch { - t.Errorf("Received unexpected event %s", received) + unexpected = append(unexpected, received) + } + return missing, unexpected +} + +func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) { + t.Helper() + missing, unexpected := compareBatchToExpected(batch, expectedPaths) + for _, p := range missing { + t.Errorf("Did not receive event %s", p) + } + for _, p := range unexpected { + t.Errorf("Received unexpected event %s", p) } } func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) { + t.Helper() ctx, cancel := context.WithCancel(context.Background()) eventChan := make(chan fs.Event) watchChan := make(chan []string) @@ -245,9 +260,10 @@ func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), e } func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) { + t.Helper() var received []string var elapsedTime time.Duration - batchIndex := 0 + var batchIndex, innerIndex int timeout := time.NewTimer(10 * time.Second) for { select { @@ -257,28 +273,49 @@ func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBat case received = <-fsWatchChan: } - elapsedTime = time.Since(startTime) - expected := expectedBatches[batchIndex] + if batchIndex >= len(expectedBatches) { + t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches)) + continue + } if runtime.GOOS != "darwin" { - switch { - case elapsedTime < durationMs(expected.afterMs): - t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime) + now := time.Since(startTime) + if innerIndex == 0 { + switch { + case now < durationMs(expectedBatches[batchIndex].afterMs): + t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime) - case elapsedTime > durationMs(expected.beforeMs): - t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime) + case now > durationMs(expectedBatches[batchIndex].beforeMs): + t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime) + } + } else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch { + t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime) } + elapsedTime = now } - if len(received) != len(expected.paths) { - t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1) - } - compareBatchToExpected(t, received, expected.paths) + expected := expectedBatches[batchIndex].paths[innerIndex] - batchIndex++ - if batchIndex == len(expectedBatches) { - // received everything we expected to - return + if len(received) != len(expected) { + t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1) + } + missing, unexpected := compareBatchToExpected(received, expected) + for _, p := range missing { + t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1) + } + for _, p := range unexpected { + t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1) + } + + if innerIndex == len(expectedBatches[batchIndex].paths)-1 { + if batchIndex == len(expectedBatches)-1 { + // received everything we expected to + return + } + innerIndex = 0 + batchIndex++ + } else { + innerIndex++ } } }