all: Add filesystem notification support

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3986
This commit is contained in:
Michael Ploujnikov
2017-10-20 14:52:55 +00:00
committed by Audrius Butkevicius
parent c704ba9ef9
commit f98c21b68e
62 changed files with 6079 additions and 18 deletions

View File

@@ -32,7 +32,7 @@ import (
const (
OldestHandledVersion = 10
CurrentVersion = 24
CurrentVersion = 25
MaxRescanIntervalS = 365 * 24 * 60 * 60
)
@@ -326,6 +326,9 @@ func (cfg *Configuration) clean() error {
if cfg.Version == 23 {
convertV23V24(cfg)
}
if cfg.Version == 24 {
convertV24V25(cfg)
}
// Build a list of available devices
existingDevices := make(map[protocol.DeviceID]bool)
@@ -375,6 +378,14 @@ func (cfg *Configuration) clean() error {
return nil
}
func convertV24V25(cfg *Configuration) {
for i := range cfg.Folders {
cfg.Folders[i].FSWatcherDelayS = 10
}
cfg.Version = 25
}
func convertV23V24(cfg *Configuration) {
cfg.Options.URSeen = 2

View File

@@ -102,18 +102,20 @@ func TestDeviceConfig(t *testing.T) {
expectedFolders := []FolderConfiguration{
{
ID: "test",
FilesystemType: fs.FilesystemTypeBasic,
Path: "testdata",
Devices: []FolderDeviceConfiguration{{DeviceID: device1}, {DeviceID: device4}},
Type: FolderTypeSendOnly,
RescanIntervalS: 600,
Copiers: 0,
Pullers: 0,
Hashers: 0,
AutoNormalize: true,
MinDiskFree: Size{1, "%"},
MaxConflicts: -1,
ID: "test",
FilesystemType: fs.FilesystemTypeBasic,
Path: "testdata",
Devices: []FolderDeviceConfiguration{{DeviceID: device1}, {DeviceID: device4}},
Type: FolderTypeSendOnly,
RescanIntervalS: 600,
FSWatcherEnabled: false,
FSWatcherDelayS: 10,
Copiers: 0,
Pullers: 0,
Hashers: 0,
AutoNormalize: true,
MinDiskFree: Size{1, "%"},
MaxConflicts: -1,
Versioning: VersioningConfiguration{
Params: map[string]string{},
},

View File

@@ -22,6 +22,8 @@ type FolderConfiguration struct {
Type FolderType `xml:"type,attr" json:"type"`
Devices []FolderDeviceConfiguration `xml:"device" json:"devices"`
RescanIntervalS int `xml:"rescanIntervalS,attr" json:"rescanIntervalS"`
FSWatcherEnabled bool `xml:"fsWatcherEnabled,attr" json:"fsWatcherEnabled"`
FSWatcherDelayS int `xml:"fsWatcherDelayS,attr" json:"fsWatcherDelayS"`
IgnorePerms bool `xml:"ignorePerms,attr" json:"ignorePerms"`
AutoNormalize bool `xml:"autoNormalize,attr" json:"autoNormalize"`
MinDiskFree Size `xml:"minDiskFree" json:"minDiskFree"`
@@ -157,6 +159,11 @@ func (f *FolderConfiguration) prepare() {
f.RescanIntervalS = 0
}
if f.FSWatcherDelayS <= 0 {
f.FSWatcherEnabled = false
f.FSWatcherDelayS = 10
}
if f.Versioning.Params == nil {
f.Versioning.Params = make(map[string]string)
}

16
lib/config/testdata/v25.xml vendored Normal file
View File

@@ -0,0 +1,16 @@
<configuration version="25">
<folder id="test" path="testdata" type="readonly" ignorePerms="false" rescanIntervalS="600" fsNotifications="false" notifyDelayS="10" autoNormalize="true">
<filesystemType>basic</filesystemType>
<device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR"></device>
<device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2"></device>
<minDiskFree unit="%">1</minDiskFree>
<maxConflicts>-1</maxConflicts>
<fsync>true</fsync>
</folder>
<device id="AIR6LPZ-7K4PTTV-UXQSMUU-CPQ5YWH-OEDFIIQ-JUG777G-2YQXXR5-YD6AWQR" name="node one" compression="metadata">
<address>tcp://a</address>
</device>
<device id="P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2" name="node two" compression="metadata">
<address>tcp://b</address>
</device>
</configuration>

View File

@@ -7,12 +7,14 @@
package fs
import (
"errors"
"io/ioutil"
"os"
"path/filepath"
"runtime"
"sort"
"strings"
"syscall"
"testing"
"time"
)
@@ -484,3 +486,23 @@ func TestRooted(t *testing.T) {
}
}
}
func TestWatchErrorLinuxInterpretation(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip("testing of linux specific error codes")
}
var errTooManyFiles syscall.Errno = 24
var errNoSpace syscall.Errno = 28
if !reachedMaxUserWatches(errTooManyFiles) {
t.Errorf("Errno %v should be recognised to be about inotify limits.", errTooManyFiles)
}
if !reachedMaxUserWatches(errNoSpace) {
t.Errorf("Errno %v should be recognised to be about inotify limits.", errNoSpace)
}
err := errors.New("Another error")
if reachedMaxUserWatches(err) {
t.Errorf("This error does not concern inotify limits: %#v", err)
}
}

116
lib/fs/basicfs_watch.go Normal file
View File

@@ -0,0 +1,116 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build !solaris,!darwin solaris,cgo darwin,cgo
package fs
import (
"context"
"errors"
"path/filepath"
"github.com/zillode/notify"
)
// Notify does not block on sending to channel, so the channel must be buffered.
// The actual number is magic.
// Not meant to be changed, but must be changeable for tests
var backendBuffer = 500
func (f *BasicFilesystem) Watch(name string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, error) {
absName, err := f.rooted(name)
if err != nil {
return nil, err
}
absShouldIgnore := func(absPath string) bool {
return ignore.ShouldIgnore(f.unrootedChecked(absPath))
}
outChan := make(chan Event)
backendChan := make(chan notify.EventInfo, backendBuffer)
eventMask := subEventMask
if !ignorePerms {
eventMask |= permEventMask
}
if err := notify.WatchWithFilter(filepath.Join(absName, "..."), backendChan, absShouldIgnore, eventMask); err != nil {
notify.Stop(backendChan)
if reachedMaxUserWatches(err) {
err = errors.New("failed to install inotify handler. Please increase inotify limits, see https://github.com/syncthing/syncthing-inotify#troubleshooting-for-folders-with-many-files-on-linux for more information")
}
return nil, err
}
go f.watchLoop(name, absName, backendChan, outChan, ignore, ctx)
return outChan, nil
}
func (f *BasicFilesystem) watchLoop(name string, absName string, backendChan chan notify.EventInfo, outChan chan<- Event, ignore Matcher, ctx context.Context) {
for {
// Detect channel overflow
if len(backendChan) == backendBuffer {
outer:
for {
select {
case <-backendChan:
default:
break outer
}
}
// When next scheduling a scan, do it on the entire folder as events have been lost.
outChan <- Event{Name: name, Type: NonRemove}
l.Debugln(f.Type(), f.URI(), "Watch: Event overflow, send \".\"")
}
select {
case ev := <-backendChan:
relPath := f.unrootedChecked(ev.Path())
if ignore.ShouldIgnore(relPath) {
l.Debugln(f.Type(), f.URI(), "Watch: Ignoring", relPath)
continue
}
evType := f.eventType(ev.Event())
select {
case outChan <- Event{Name: relPath, Type: evType}:
l.Debugln(f.Type(), f.URI(), "Watch: Sending", relPath, evType)
case <-ctx.Done():
notify.Stop(backendChan)
l.Debugln(f.Type(), f.URI(), "Watch: Stopped")
return
}
case <-ctx.Done():
notify.Stop(backendChan)
l.Debugln(f.Type(), f.URI(), "Watch: Stopped")
return
}
}
}
func (f *BasicFilesystem) eventType(notifyType notify.Event) EventType {
if notifyType&rmEventMask != 0 {
return Remove
}
return NonRemove
}
// unrootedChecked returns the path relative to the folder root (same as
// unrooted). It panics if the given path is not a subpath and handles the
// special case when the given path is the folder root without a trailing
// pathseparator.
func (f *BasicFilesystem) unrootedChecked(absPath string) string {
if absPath+string(PathSeparator) == f.root {
return "."
}
relPath := f.unrooted(absPath)
if relPath == absPath {
panic("bug: Notify backend is processing a change outside of the watched path: " + absPath)
}
return relPath
}

View File

@@ -0,0 +1,18 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build linux
package fs
import "syscall"
func reachedMaxUserWatches(err error) bool {
if errno, ok := err.(syscall.Errno); ok {
return errno == 24 || errno == 28
}
return false
}

View File

@@ -0,0 +1,13 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build !linux
package fs
func reachedMaxUserWatches(err error) bool {
return false
}

View File

@@ -0,0 +1,17 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build solaris,cgo
package fs
import "github.com/zillode/notify"
const (
subEventMask = notify.Create | notify.FileModified | notify.FileRenameFrom | notify.FileDelete | notify.FileRenameTo
permEventMask = notify.FileAttrib
rmEventMask = notify.FileDelete | notify.FileRenameFrom
)

View File

@@ -0,0 +1,17 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build linux
package fs
import "github.com/zillode/notify"
const (
subEventMask = notify.InCreate | notify.InMovedTo | notify.InDelete | notify.InDeleteSelf | notify.InModify | notify.InMovedFrom | notify.InMoveSelf
permEventMask = notify.InAttrib
rmEventMask = notify.InDelete | notify.InDeleteSelf | notify.InMovedFrom | notify.InMoveSelf
)

View File

@@ -0,0 +1,17 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build dragonfly freebsd netbsd openbsd
package fs
import "github.com/zillode/notify"
const (
subEventMask = notify.NoteDelete | notify.NoteWrite | notify.NoteRename
permEventMask = notify.NoteAttrib
rmEventMask = notify.NoteDelete | notify.NoteRename
)

View File

@@ -0,0 +1,21 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build !linux,!windows,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris
// +build !darwin darwin,cgo
// Catch all platforms that are not specifically handled to use the generic
// event types.
package fs
import "github.com/zillode/notify"
const (
subEventMask = notify.All
permEventMask = 0
rmEventMask = notify.Remove | notify.Rename
)

View File

@@ -0,0 +1,17 @@
// Copyright (C) 2017 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build windows
package fs
import "github.com/zillode/notify"
const (
subEventMask = notify.FileNotifyChangeFileName | notify.FileNotifyChangeDirName | notify.FileNotifyChangeSize | notify.FileNotifyChangeCreation
permEventMask = notify.FileNotifyChangeAttributes
rmEventMask = notify.FileActionRemoved | notify.FileActionRenamedOldName
)

View File

@@ -0,0 +1,295 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build !solaris,!darwin solaris,cgo darwin,cgo
package fs
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"strconv"
"testing"
"time"
"github.com/zillode/notify"
)
func TestMain(m *testing.M) {
if err := os.RemoveAll(testDir); err != nil {
panic(err)
}
dir, err := filepath.Abs(".")
if err != nil {
panic("Cannot get absolute path to working dir")
}
dir, err = filepath.EvalSymlinks(dir)
if err != nil {
panic("Cannot get real path to working dir")
}
testDirAbs = filepath.Join(dir, testDir)
testFs = newBasicFilesystem(testDirAbs)
if l.ShouldDebug("filesystem") {
testFs = &logFilesystem{testFs}
}
backendBuffer = 10
defer func() {
backendBuffer = 500
}()
os.Exit(m.Run())
}
const (
testDir = "temporary_test_root"
)
var (
testDirAbs string
testFs Filesystem
)
func TestWatchIgnore(t *testing.T) {
name := "ignore"
file := "file"
ignored := "ignored"
testCase := func() {
createTestFile(name, file)
createTestFile(name, ignored)
}
expectedEvents := []Event{
{file, NonRemove},
}
testScenario(t, name, testCase, expectedEvents, false, ignored)
}
func TestWatchRename(t *testing.T) {
name := "rename"
old := createTestFile(name, "oldfile")
new := "newfile"
testCase := func() {
renameTestFile(name, old, new)
}
destEvent := Event{new, Remove}
// Only on these platforms the removed file can be differentiated from
// the created file during renaming
if runtime.GOOS == "windows" || runtime.GOOS == "linux" || runtime.GOOS == "solaris" {
destEvent = Event{new, NonRemove}
}
expectedEvents := []Event{
{old, Remove},
destEvent,
}
testScenario(t, name, testCase, expectedEvents, false, "")
}
// TestWatchOutside checks that no changes from outside the folder make it in
func TestWatchOutside(t *testing.T) {
outChan := make(chan Event)
backendChan := make(chan notify.EventInfo, backendBuffer)
ctx, cancel := context.WithCancel(context.Background())
// testFs is Filesystem, but we need BasicFilesystem here
fs := newBasicFilesystem(testDirAbs)
go func() {
defer func() {
if recover() == nil {
t.Fatalf("Watch did not panic on receiving event outside of folder")
}
cancel()
}()
fs.watchLoop(".", testDirAbs, backendChan, outChan, fakeMatcher{}, ctx)
}()
backendChan <- fakeEventInfo(filepath.Join(filepath.Dir(testDirAbs), "outside"))
}
func TestWatchSubpath(t *testing.T) {
outChan := make(chan Event)
backendChan := make(chan notify.EventInfo, backendBuffer)
ctx, cancel := context.WithCancel(context.Background())
// testFs is Filesystem, but we need BasicFilesystem here
fs := newBasicFilesystem(testDirAbs)
abs, _ := fs.rooted("sub")
go fs.watchLoop("sub", abs, backendChan, outChan, fakeMatcher{}, ctx)
backendChan <- fakeEventInfo(filepath.Join(abs, "file"))
timeout := time.NewTimer(2 * time.Second)
select {
case <-timeout.C:
t.Errorf("Timed out before receiving an event")
cancel()
case ev := <-outChan:
if ev.Name != filepath.Join("sub", "file") {
t.Errorf("While watching a subfolder, received an event with unexpected path %v", ev.Name)
}
}
cancel()
}
// TestWatchOverflow checks that an event at the root is sent when maxFiles is reached
func TestWatchOverflow(t *testing.T) {
name := "overflow"
testCase := func() {
for i := 0; i < 5*backendBuffer; i++ {
createTestFile(name, "file"+strconv.Itoa(i))
}
}
expectedEvents := []Event{
{".", NonRemove},
}
testScenario(t, name, testCase, expectedEvents, true, "")
}
// path relative to folder root, also creates parent dirs if necessary
func createTestFile(name string, file string) string {
joined := filepath.Join(name, file)
if err := testFs.MkdirAll(filepath.Dir(joined), 0755); err != nil {
panic(fmt.Sprintf("Failed to create parent directory for %s: %s", joined, err))
}
handle, err := testFs.Create(joined)
if err != nil {
panic(fmt.Sprintf("Failed to create test file %s: %s", joined, err))
}
handle.Close()
return file
}
func renameTestFile(name string, old string, new string) {
old = filepath.Join(name, old)
new = filepath.Join(name, new)
if err := testFs.Rename(old, new); err != nil {
panic(fmt.Sprintf("Failed to rename %s to %s: %s", old, new, err))
}
}
func sleepMs(ms int) {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
func testScenario(t *testing.T, name string, testCase func(), expectedEvents []Event, allowOthers bool, ignored string) {
if err := testFs.MkdirAll(name, 0755); err != nil {
panic(fmt.Sprintf("Failed to create directory %s: %s", name, err))
}
// Tests pick up the previously created files/dirs, probably because
// they get flushed to disk with a delay.
initDelayMs := 500
if runtime.GOOS == "darwin" {
initDelayMs = 900
}
sleepMs(initDelayMs)
ctx, cancel := context.WithCancel(context.Background())
if ignored != "" {
ignored = filepath.Join(name, ignored)
}
eventChan, err := testFs.Watch(name, fakeMatcher{ignored}, ctx, false)
if err != nil {
panic(err)
}
go testWatchOutput(t, name, eventChan, expectedEvents, allowOthers, ctx, cancel)
timeout := time.NewTimer(2 * time.Second)
testCase()
select {
case <-timeout.C:
t.Errorf("Timed out before receiving all expected events")
cancel()
case <-ctx.Done():
}
if err := testFs.RemoveAll(name); err != nil {
panic(fmt.Sprintf("Failed to remove directory %s: %s", name, err))
}
}
func testWatchOutput(t *testing.T, name string, in <-chan Event, expectedEvents []Event, allowOthers bool, ctx context.Context, cancel context.CancelFunc) {
var expected = make(map[Event]struct{})
for _, ev := range expectedEvents {
ev.Name = filepath.Join(name, ev.Name)
expected[ev] = struct{}{}
}
var received Event
var last Event
for {
if len(expected) == 0 {
cancel()
return
}
select {
case <-ctx.Done():
return
case received = <-in:
}
// apparently the backend sometimes sends repeat events
if last == received {
continue
}
if _, ok := expected[received]; !ok {
if allowOthers {
sleepMs(100) // To facilitate overflow
continue
}
t.Errorf("Received unexpected event %v expected one of %v", received, expected)
cancel()
return
}
delete(expected, received)
last = received
}
}
type fakeMatcher struct{ match string }
func (fm fakeMatcher) ShouldIgnore(name string) bool {
return name == fm.match
}
type fakeEventInfo string
func (e fakeEventInfo) Path() string {
return string(e)
}
func (e fakeEventInfo) Event() notify.Event {
return notify.Write
}
func (e fakeEventInfo) Sys() interface{} {
return nil
}

View File

@@ -0,0 +1,15 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// +build solaris,!cgo darwin,!cgo
package fs
import "context"
func (f *BasicFilesystem) Watch(path string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, error) {
return nil, ErrWatchNotSupported
}

View File

@@ -6,7 +6,10 @@
package fs
import "time"
import (
"context"
"time"
)
type errorFilesystem struct {
err error
@@ -39,3 +42,6 @@ func (fs *errorFilesystem) Roots() ([]string, error)
func (fs *errorFilesystem) Usage(name string) (Usage, error) { return Usage{}, fs.err }
func (fs *errorFilesystem) Type() FilesystemType { return fs.fsType }
func (fs *errorFilesystem) URI() string { return fs.uri }
func (fs *errorFilesystem) Watch(path string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, error) {
return nil, fs.err
}

View File

@@ -7,6 +7,7 @@
package fs
import (
"context"
"errors"
"io"
"os"
@@ -33,7 +34,8 @@ type Filesystem interface {
Rename(oldname, newname string) error
Stat(name string) (FileInfo, error)
SymlinksSupported() bool
Walk(root string, walkFn WalkFunc) error
Walk(name string, walkFn WalkFunc) error
Watch(path string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, error)
Hide(name string) error
Unhide(name string) error
Glob(pattern string) ([]string, error)
@@ -82,6 +84,42 @@ type Usage struct {
Total int64
}
type Matcher interface {
ShouldIgnore(name string) bool
}
type MatchResult interface {
IsIgnored() bool
}
type Event struct {
Name string
Type EventType
}
type EventType int
const (
NonRemove EventType = 1 + iota
Remove
Mixed // Should probably not be necessary to be used in filesystem interface implementation
)
func (evType EventType) String() string {
switch {
case evType == NonRemove:
return "non-remove"
case evType == Remove:
return "remove"
case evType == Mixed:
return "mixed"
default:
panic("bug: Unknown event type")
}
}
var ErrWatchNotSupported = errors.New("watching is not supported")
// Equivalents from os package.
const ModePerm = FileMode(os.ModePerm)

View File

@@ -7,6 +7,7 @@
package fs
import (
"context"
"fmt"
"path/filepath"
"runtime"
@@ -127,6 +128,12 @@ func (fs *logFilesystem) Walk(root string, walkFn WalkFunc) error {
return err
}
func (fs *logFilesystem) Watch(path string, ignore Matcher, ctx context.Context, ignorePerms bool) (<-chan Event, error) {
evChan, err := fs.Filesystem.Watch(path, ignore, ctx, ignorePerms)
l.Debugln(getCaller(), fs.Type(), fs.URI(), "Watch", path, ignore, ignorePerms, err)
return evChan, err
}
func (fs *logFilesystem) Unhide(name string) error {
err := fs.Filesystem.Unhide(name)
l.Debugln(getCaller(), fs.Type(), fs.URI(), "Unhide", name, err)

View File

@@ -11,6 +11,7 @@ import (
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/watchaggregator"
)
type folder struct {
@@ -22,6 +23,9 @@ type folder struct {
ctx context.Context
cancel context.CancelFunc
initialScanFinished chan struct{}
watchCancel context.CancelFunc
watchChan chan []string
ignoresUpdated chan struct{} // The ignores changed, we need to restart watcher
}
func newFolder(model *Model, cfg config.FolderConfiguration) folder {
@@ -92,3 +96,35 @@ func (f *folder) scanTimerFired() {
f.scan.Reschedule()
}
func (f *folder) startWatcher() {
ctx, cancel := context.WithCancel(f.ctx)
f.model.fmut.RLock()
ignores := f.model.folderIgnores[f.folderID]
f.model.fmut.RUnlock()
eventChan, err := f.Filesystem().Watch(".", ignores, ctx, f.IgnorePerms)
if err != nil {
l.Warnf("Failed to start filesystem watcher for folder %s: %v", f.Description(), err)
} else {
f.watchChan = make(chan []string)
f.watchCancel = cancel
watchaggregator.Aggregate(eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, ctx)
l.Infoln("Started filesystem watcher for folder", f.Description())
}
}
func (f *folder) restartWatcher() {
f.watchCancel()
f.startWatcher()
f.Scan(nil)
}
func (f *folder) IgnoresUpdated() {
select {
case f.ignoresUpdated <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull.
}
}

View File

@@ -48,6 +48,7 @@ type service interface {
BringToFront(string)
DelayScan(d time.Duration)
IndexUpdated() // Remote index was updated notification
IgnoresUpdated() // ignore matcher was updated notification
Jobs() ([]string, []string) // In progress, Queued
Scan(subs []string) error
Serve()
@@ -260,6 +261,7 @@ func (m *Model) startFolderLocked(folder string) config.FolderType {
ffs.Hide(".stignore")
p := folderFactory(m, cfg, ver, ffs)
m.folderRunners[folder] = p
m.warnAboutOverwritingProtectedFiles(folder)
@@ -1858,7 +1860,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
defer func() {
if ignores.Hash() != oldHash {
l.Debugln("Folder", folder, "ignore patterns changed; triggering puller")
runner.IndexUpdated()
runner.IgnoresUpdated()
}
}()

View File

@@ -34,11 +34,20 @@ func (f *sendOnlyFolder) Serve() {
f.scan.timer.Stop()
}()
if f.FSWatcherEnabled {
f.startWatcher()
}
for {
select {
case <-f.ctx.Done():
return
case <-f.ignoresUpdated:
if f.FSWatcherEnabled {
f.restartWatcher()
}
case <-f.scan.timer.C:
l.Debugln(f, "Scanning subdirectories")
f.scanTimerFired()
@@ -48,6 +57,10 @@ func (f *sendOnlyFolder) Serve() {
case next := <-f.scan.delay:
f.scan.timer.Reset(next)
case fsEvents := <-f.watchChan:
l.Debugln(f, "filesystem notification rescan")
f.scanSubdirs(fsEvents)
}
}
}

View File

@@ -164,6 +164,10 @@ func (f *sendReceiveFolder) Serve() {
var prevSec int64
var prevIgnoreHash string
if f.FSWatcherEnabled {
f.startWatcher()
}
for {
select {
case <-f.ctx.Done():
@@ -174,6 +178,12 @@ func (f *sendReceiveFolder) Serve() {
f.pullTimer.Reset(0)
l.Debugln(f, "remote index updated, rescheduling pull")
case <-f.ignoresUpdated:
if f.FSWatcherEnabled {
f.restartWatcher()
}
f.IndexUpdated()
case <-f.pullTimer.C:
select {
case <-f.initialScanFinished:
@@ -278,6 +288,10 @@ func (f *sendReceiveFolder) Serve() {
case next := <-f.scan.delay:
f.scan.timer.Reset(next)
case fsEvents := <-f.watchChan:
l.Debugln(f, "filesystem notification rescan")
f.scanSubdirs(fsEvents)
}
}
}

View File

@@ -0,0 +1,438 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package watchaggregator
import (
"context"
"fmt"
"path/filepath"
"strings"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
)
// Not meant to be changed, but must be changeable for tests
var (
maxFiles = 512
maxFilesPerDir = 128
)
// aggregatedEvent represents potentially multiple events at and/or recursively
// below one path until it times out and a scan is scheduled.
type aggregatedEvent struct {
firstModTime time.Time
lastModTime time.Time
evType fs.EventType
}
// Stores pointers to both aggregated events directly within this directory and
// child directories recursively containing aggregated events themselves.
type eventDir struct {
events map[string]*aggregatedEvent
dirs map[string]*eventDir
}
func newEventDir() *eventDir {
return &eventDir{
events: make(map[string]*aggregatedEvent),
dirs: make(map[string]*eventDir),
}
}
func (dir *eventDir) eventCount() int {
count := len(dir.events)
for _, dir := range dir.dirs {
count += dir.eventCount()
}
return count
}
func (dir *eventDir) childCount() int {
return len(dir.events) + len(dir.dirs)
}
func (dir *eventDir) firstModTime() time.Time {
if dir.childCount() == 0 {
panic("bug: firstModTime must not be used on empty eventDir")
}
firstModTime := time.Now()
for _, childDir := range dir.dirs {
dirTime := childDir.firstModTime()
if dirTime.Before(firstModTime) {
firstModTime = dirTime
}
}
for _, event := range dir.events {
if event.firstModTime.Before(firstModTime) {
firstModTime = event.firstModTime
}
}
return firstModTime
}
func (dir *eventDir) eventType() fs.EventType {
if dir.childCount() == 0 {
panic("bug: eventType must not be used on empty eventDir")
}
var evType fs.EventType
for _, childDir := range dir.dirs {
evType |= childDir.eventType()
if evType == fs.Mixed {
return fs.Mixed
}
}
for _, event := range dir.events {
evType |= event.evType
if evType == fs.Mixed {
return fs.Mixed
}
}
return evType
}
type aggregator struct {
folderCfg config.FolderConfiguration
folderCfgUpdate chan config.FolderConfiguration
// Time after which an event is scheduled for scanning when no modifications occur.
notifyDelay time.Duration
// Time after which an event is scheduled for scanning even though modifications occur.
notifyTimeout time.Duration
notifyTimer *time.Timer
notifyTimerNeedsReset bool
notifyTimerResetChan chan time.Duration
ctx context.Context
}
func new(folderCfg config.FolderConfiguration, ctx context.Context) *aggregator {
a := &aggregator{
folderCfgUpdate: make(chan config.FolderConfiguration),
notifyTimerNeedsReset: false,
notifyTimerResetChan: make(chan time.Duration),
ctx: ctx,
}
a.updateConfig(folderCfg)
return a
}
func Aggregate(in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg *config.Wrapper, ctx context.Context) {
a := new(folderCfg, ctx)
// Necessary for unit tests where the backend is mocked
go a.mainLoop(in, out, cfg)
}
func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg *config.Wrapper) {
a.notifyTimer = time.NewTimer(a.notifyDelay)
defer a.notifyTimer.Stop()
inProgress := make(map[string]struct{})
inProgressItemSubscription := events.Default.Subscribe(events.ItemStarted | events.ItemFinished)
cfg.Subscribe(a)
rootEventDir := newEventDir()
for {
select {
case event := <-in:
a.newEvent(event, rootEventDir, inProgress)
case event := <-inProgressItemSubscription.C():
updateInProgressSet(event, inProgress)
case <-a.notifyTimer.C:
a.actOnTimer(rootEventDir, out)
case interval := <-a.notifyTimerResetChan:
a.resetNotifyTimer(interval)
case folderCfg := <-a.folderCfgUpdate:
a.updateConfig(folderCfg)
case <-a.ctx.Done():
cfg.Unsubscribe(a)
l.Debugln(a, "Stopped")
return
}
}
}
func (a *aggregator) newEvent(event fs.Event, rootEventDir *eventDir, inProgress map[string]struct{}) {
if _, ok := rootEventDir.events["."]; ok {
l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
return
}
if _, ok := inProgress[event.Name]; ok {
l.Debugln(a, "Skipping path we modified:", event.Name)
return
}
a.aggregateEvent(event, time.Now(), rootEventDir)
}
func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time, rootEventDir *eventDir) {
if event.Name == "." || rootEventDir.eventCount() == maxFiles {
l.Debugln(a, "Scan entire folder")
firstModTime := evTime
if rootEventDir.childCount() != 0 {
event.Type |= rootEventDir.eventType()
firstModTime = rootEventDir.firstModTime()
}
rootEventDir.dirs = make(map[string]*eventDir)
rootEventDir.events = make(map[string]*aggregatedEvent)
rootEventDir.events["."] = &aggregatedEvent{
firstModTime: firstModTime,
lastModTime: evTime,
evType: event.Type,
}
a.resetNotifyTimerIfNeeded()
return
}
parentDir := rootEventDir
// Check if any parent directory is already tracked or will exceed
// events per directory limit bottom up
pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
// As root dir cannot be further aggregated, allow up to maxFiles
// children.
localMaxFilesPerDir := maxFiles
var currPath string
for i, name := range pathSegments[:len(pathSegments)-1] {
currPath = filepath.Join(currPath, name)
if ev, ok := parentDir.events[name]; ok {
ev.lastModTime = evTime
ev.evType |= event.Type
l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
return
}
if parentDir.childCount() == localMaxFilesPerDir {
l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
event.Name = filepath.Dir(currPath)
a.aggregateEvent(event, evTime, rootEventDir)
return
}
// If there are no events below path, but we need to recurse
// into that path, create eventDir at path.
if newParent, ok := parentDir.dirs[name]; ok {
parentDir = newParent
} else {
l.Debugln(a, "Creating eventDir at:", currPath)
newParent = newEventDir()
parentDir.dirs[name] = newParent
parentDir = newParent
}
// Reset allowed children count to maxFilesPerDir for non-root
if i == 0 {
localMaxFilesPerDir = maxFilesPerDir
}
}
name := pathSegments[len(pathSegments)-1]
if ev, ok := parentDir.events[name]; ok {
ev.lastModTime = evTime
ev.evType |= event.Type
l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
return
}
childDir, ok := parentDir.dirs[name]
// If a dir existed at path, it would be removed from dirs, thus
// childCount would not increase.
if !ok && parentDir.childCount() == localMaxFilesPerDir {
l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
event.Name = filepath.Dir(event.Name)
a.aggregateEvent(event, evTime, rootEventDir)
return
}
firstModTime := evTime
if ok {
firstModTime = childDir.firstModTime()
event.Type |= childDir.eventType()
delete(parentDir.dirs, name)
}
l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
parentDir.events[name] = &aggregatedEvent{
firstModTime: firstModTime,
lastModTime: evTime,
evType: event.Type,
}
a.resetNotifyTimerIfNeeded()
}
func (a *aggregator) resetNotifyTimerIfNeeded() {
if a.notifyTimerNeedsReset {
a.resetNotifyTimer(a.notifyDelay)
}
}
// resetNotifyTimer should only ever be called when notifyTimer has stopped
// and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
func (a *aggregator) resetNotifyTimer(duration time.Duration) {
l.Debugln(a, "Resetting notifyTimer to", duration.String())
a.notifyTimerNeedsReset = false
a.notifyTimer.Reset(duration)
}
func (a *aggregator) actOnTimer(rootEventDir *eventDir, out chan<- []string) {
eventCount := rootEventDir.eventCount()
if eventCount == 0 {
l.Debugln(a, "No tracked events, waiting for new event.")
a.notifyTimerNeedsReset = true
return
}
oldevents := a.popOldEvents(rootEventDir, ".", time.Now())
if len(oldevents) == 0 {
l.Debugln(a, "No old fs events")
a.resetNotifyTimer(a.notifyDelay)
return
}
// Sending to channel might block for a long time, but we need to keep
// reading from notify backend channel to avoid overflow
go a.notify(oldevents, out)
}
// Schedule scan for given events dispatching deletes last and reset notification
// afterwards to set up for the next scan scheduling.
func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
timeBeforeSending := time.Now()
l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
separatedBatches := make(map[fs.EventType][]string)
for path, event := range oldEvents {
separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
}
for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
currBatch := separatedBatches[evType]
if len(currBatch) != 0 {
select {
case out <- currBatch:
case <-a.ctx.Done():
return
}
}
}
// If sending to channel blocked for a long time,
// shorten next notifyDelay accordingly.
duration := time.Since(timeBeforeSending)
buffer := time.Millisecond
var nextDelay time.Duration
switch {
case duration < a.notifyDelay/10:
nextDelay = a.notifyDelay
case duration+buffer > a.notifyDelay:
nextDelay = buffer
default:
nextDelay = a.notifyDelay - duration
}
select {
case a.notifyTimerResetChan <- nextDelay:
case <-a.ctx.Done():
}
}
// popOldEvents finds events that should be scheduled for scanning recursively in dirs,
// removes those events and empty eventDirs and returns a map with all the removed
// events referenced by their filesystem path
func (a *aggregator) popOldEvents(dir *eventDir, dirPath string, currTime time.Time) map[string]*aggregatedEvent {
oldEvents := make(map[string]*aggregatedEvent)
for childName, childDir := range dir.dirs {
for evPath, event := range a.popOldEvents(childDir, filepath.Join(dirPath, childName), currTime) {
oldEvents[evPath] = event
}
if childDir.childCount() == 0 {
delete(dir.dirs, childName)
}
}
for name, event := range dir.events {
if a.isOld(event, currTime) {
oldEvents[filepath.Join(dirPath, name)] = event
delete(dir.events, name)
}
}
return oldEvents
}
func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time) bool {
// Deletes should always be scanned last, therefore they are always
// delayed by letting them time out (see below).
// An event that has not registered any new modifications recently is scanned.
// a.notifyDelay is the user facing value signifying the normal delay between
// a picking up a modification and scanning it. As scheduling scans happens at
// regular intervals of a.notifyDelay the delay of a single event is not exactly
// a.notifyDelay, but lies in in the range of 0.5 to 1.5 times a.notifyDelay.
if ev.evType == fs.NonRemove && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
return true
}
// When an event registers repeat modifications or involves removals it
// is delayed to reduce resource usage, but after a certain time (notifyTimeout)
// passed it is scanned anyway.
return currTime.Sub(ev.firstModTime) > a.notifyTimeout
}
func (a *aggregator) String() string {
return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
}
func (a *aggregator) VerifyConfiguration(from, to config.Configuration) error {
return nil
}
func (a *aggregator) CommitConfiguration(from, to config.Configuration) bool {
for _, folderCfg := range to.Folders {
if folderCfg.ID == a.folderCfg.ID {
select {
case a.folderCfgUpdate <- folderCfg:
case <-a.ctx.Done():
}
return true
}
}
// Nothing to do, model will soon stop this
return true
}
func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
a.folderCfg = folderCfg
}
func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
if event.Type == events.ItemStarted {
path := event.Data.(map[string]string)["item"]
inProgress[path] = struct{}{}
} else if event.Type == events.ItemFinished {
path := event.Data.(map[string]interface{})["item"].(string)
delete(inProgress, path)
}
}
// Events that involve removals or continuously receive new modifications are
// delayed but must time out at some point. The following numbers come out of thin
// air, they were just considered as a sensible compromise between fast updates and
// saving resources. For short delays the timeout is 6 times the delay, capped at 1
// minute. For delays longer than 1 minute, the delay and timeout are equal.
func notifyTimeout(eventDelayS int) time.Duration {
shortDelayS := 10
shortDelayMultiplicator := 6
longDelayS := 60
longDelayTimeout := time.Duration(1) * time.Minute
if eventDelayS < shortDelayS {
return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
}
if eventDelayS < longDelayS {
return longDelayTimeout
}
return time.Duration(eventDelayS) * time.Second
}

View File

@@ -0,0 +1,281 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package watchaggregator
import (
"context"
"os"
"path/filepath"
"strconv"
"testing"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
)
func TestMain(m *testing.M) {
maxFiles = 32
maxFilesPerDir = 8
defer func() {
maxFiles = 512
maxFilesPerDir = 128
}()
os.Exit(m.Run())
}
const (
testNotifyDelayS = 1
testNotifyTimeout = 2 * time.Second
)
var (
folderRoot = filepath.Clean("/home/someuser/syncthing")
defaultFolderCfg = config.FolderConfiguration{
FilesystemType: fs.FilesystemTypeBasic,
Path: folderRoot,
FSWatcherDelayS: testNotifyDelayS,
}
defaultCfg = config.Wrap("", config.Configuration{
Folders: []config.FolderConfiguration{defaultFolderCfg},
})
)
type expectedBatch struct {
paths []string
afterMs int
beforeMs int
}
// TestAggregate checks whether maxFilesPerDir+1 events in one dir are
// aggregated to parent dir
func TestAggregate(t *testing.T) {
evDir := newEventDir()
inProgress := make(map[string]struct{})
folderCfg := defaultFolderCfg.Copy()
folderCfg.ID = "Aggregate"
ctx, _ := context.WithCancel(context.Background())
a := new(folderCfg, ctx)
// checks whether maxFilesPerDir events in one dir are kept as is
for i := 0; i < maxFilesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
}
if len(getEventPaths(evDir, ".", a)) != maxFilesPerDir {
t.Errorf("Unexpected number of events stored")
}
// checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, evDir, inProgress)
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
// checks that adding an event below "parent" does not change anything
a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, evDir, inProgress)
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
// again test aggregation in "parent" but with event in subdirs
evDir = newEventDir()
for i := 0; i < maxFilesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
}
a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
// test aggregation in root
evDir = newEventDir()
for i := 0; i < maxFiles; i++ {
a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, evDir, inProgress)
}
if len(getEventPaths(evDir, ".", a)) != maxFiles {
t.Errorf("Unexpected number of events stored in root")
}
a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
// checks that adding an event when "." is already stored is a noop
a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, evDir, inProgress)
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
// TestOverflow checks that the entire folder is scanned when maxFiles is reached
evDir = newEventDir()
filesPerDir := maxFilesPerDir / 2
dirs := make([]string, maxFiles/filesPerDir+1)
for i := 0; i < maxFiles/filesPerDir+1; i++ {
dirs[i] = "dir" + strconv.Itoa(i)
}
for _, dir := range dirs {
for i := 0; i < filesPerDir; i++ {
a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
}
}
compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
}
// TestInProgress checks that ignoring files currently edited by Syncthing works
func TestInProgress(t *testing.T) {
testCase := func(c chan<- fs.Event) {
events.Default.Log(events.ItemStarted, map[string]string{
"item": "inprogress",
})
sleepMs(100)
c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
sleepMs(1000)
events.Default.Log(events.ItemFinished, map[string]interface{}{
"item": "inprogress",
})
sleepMs(100)
c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
sleepMs(800)
}
expectedBatches := []expectedBatch{
{[]string{"notinprogress"}, 2000, 3500},
}
testScenario(t, "InProgress", testCase, expectedBatches)
}
// TestDelay checks that recurring changes to the same path are delayed
// and different types separated and ordered correctly
func TestDelay(t *testing.T) {
file := filepath.Join("parent", "file")
delayed := "delayed"
del := "deleted"
both := filepath.Join("parent", "sub", "both")
testCase := func(c chan<- fs.Event) {
sleepMs(200)
c <- fs.Event{Name: file, Type: fs.NonRemove}
delay := time.Duration(300) * time.Millisecond
timer := time.NewTimer(delay)
<-timer.C
timer.Reset(delay)
c <- fs.Event{Name: delayed, Type: fs.NonRemove}
c <- fs.Event{Name: both, Type: fs.NonRemove}
c <- fs.Event{Name: both, Type: fs.Remove}
c <- fs.Event{Name: del, Type: fs.Remove}
for i := 0; i < 9; i++ {
<-timer.C
timer.Reset(delay)
c <- fs.Event{Name: delayed, Type: fs.NonRemove}
}
<-timer.C
}
// batches that we expect to receive with time interval in milliseconds
expectedBatches := []expectedBatch{
{[]string{file}, 500, 2500},
{[]string{delayed}, 2500, 4500},
{[]string{both}, 2500, 4500},
{[]string{del}, 2500, 4500},
{[]string{delayed}, 3600, 6500},
}
testScenario(t, "Delay", testCase, expectedBatches)
}
func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
var paths []string
for childName, childDir := range dir.dirs {
for _, path := range getEventPaths(childDir, filepath.Join(dirPath, childName), a) {
paths = append(paths, path)
}
}
for name := range dir.events {
paths = append(paths, filepath.Join(dirPath, name))
}
return paths
}
func sleepMs(ms int) {
time.Sleep(time.Duration(ms) * time.Millisecond)
}
func durationMs(ms int) time.Duration {
return time.Duration(ms) * time.Millisecond
}
func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string) {
for _, expected := range expectedPaths {
expected = filepath.Clean(expected)
found := false
for i, received := range batch {
if expected == received {
found = true
batch = append(batch[:i], batch[i+1:]...)
break
}
}
if !found {
t.Errorf("Did not receive event %s", expected)
}
}
for _, received := range batch {
t.Errorf("Received unexpected event %s", received)
}
}
func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
ctx, cancel := context.WithCancel(context.Background())
eventChan := make(chan fs.Event)
watchChan := make(chan []string)
folderCfg := defaultFolderCfg.Copy()
folderCfg.ID = name
a := new(folderCfg, ctx)
a.notifyTimeout = testNotifyTimeout
startTime := time.Now()
go a.mainLoop(eventChan, watchChan, defaultCfg)
sleepMs(10)
go testAggregatorOutput(t, watchChan, expectedBatches, startTime, ctx)
testCase(eventChan)
timeout := time.NewTimer(time.Duration(expectedBatches[len(expectedBatches)-1].beforeMs+100) * time.Millisecond)
<-timeout.C
cancel()
}
func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time, ctx context.Context) {
var received []string
var elapsedTime time.Duration
batchIndex := 0
for {
select {
case <-ctx.Done():
if batchIndex != len(expectedBatches) {
t.Errorf("Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
}
return
case received = <-fsWatchChan:
}
if batchIndex >= len(expectedBatches) {
t.Errorf("Received batch %d (only %d expected)", batchIndex+1, len(expectedBatches))
continue
}
elapsedTime = time.Since(startTime)
expected := expectedBatches[batchIndex]
switch {
case elapsedTime < durationMs(expected.afterMs):
t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
case elapsedTime > durationMs(expected.beforeMs):
t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
case len(received) != len(expected.paths):
t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1)
}
compareBatchToExpected(t, received, expected.paths)
batchIndex++
}
}

View File

@@ -0,0 +1,24 @@
// Copyright (C) 2016 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package watchaggregator
import (
"os"
"strings"
"github.com/syncthing/syncthing/lib/logger"
)
var facilityName = "watchaggregator"
var (
l = logger.DefaultLogger.NewFacility(facilityName, "Filesystem event watcher")
)
func init() {
l.SetDebug(facilityName, strings.Contains(os.Getenv("STTRACE"), facilityName) || os.Getenv("STTRACE") == "all")
}