diff --git a/internal/model/model.go b/internal/model/model.go index 2de748c4..58da1f0f 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -55,6 +55,7 @@ type service interface { BringToFront(string) DelayScan(d time.Duration) IndexUpdated() // Remote index was updated notification + Scan(subs []string) error setState(state folderState) setError(err error) @@ -1226,6 +1227,21 @@ func (m *Model) ScanFolder(folder string) error { } func (m *Model) ScanFolderSubs(folder string, subs []string) error { + m.fmut.Lock() + runner, ok := m.folderRunners[folder] + m.fmut.Unlock() + + // Folders are added to folderRunners only when they are started. We can't + // scan them before they have started, so that's what we need to check for + // here. + if !ok { + return errors.New("no such folder") + } + + return runner.Scan(subs) +} + +func (m *Model) internalScanFolderSubs(folder string, subs []string) error { for i, sub := range subs { sub = osutil.NativeFilename(sub) if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) { diff --git a/internal/model/rofolder.go b/internal/model/rofolder.go index 74a9cce1..e737fee5 100644 --- a/internal/model/rofolder.go +++ b/internal/model/rofolder.go @@ -22,9 +22,15 @@ type roFolder struct { timer *time.Timer model *Model stop chan struct{} + scanNow chan rescanRequest delayScan chan time.Duration } +type rescanRequest struct { + subs []string + err chan error +} + func newROFolder(model *Model, folder string, interval time.Duration) *roFolder { return &roFolder{ stateTracker: stateTracker{ @@ -36,6 +42,7 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder timer: time.NewTimer(time.Millisecond), model: model, stop: make(chan struct{}), + scanNow: make(chan rescanRequest), delayScan: make(chan time.Duration), } } @@ -76,7 +83,7 @@ func (s *roFolder) Serve() { l.Debugln(s, "rescan") } - if err := s.model.ScanFolder(s.folder); err != nil { + if err := s.model.internalScanFolderSubs(s.folder, nil); err != nil { // Potentially sets the error twice, once in the scanner just // by doing a check, and once here, if the error returned is // the same one as returned by CheckFolderHealth, though @@ -92,11 +99,34 @@ func (s *roFolder) Serve() { } if s.intv == 0 { - return + continue } reschedule() + case req := <-s.scanNow: + if err := s.model.CheckFolderHealth(s.folder); err != nil { + l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err) + req.err <- err + continue + } + + if debug { + l.Debugln(s, "forced rescan") + } + + if err := s.model.internalScanFolderSubs(s.folder, req.subs); err != nil { + // Potentially sets the error twice, once in the scanner just + // by doing a check, and once here, if the error returned is + // the same one as returned by CheckFolderHealth, though + // duplicate set is handled by setError. + s.setError(err) + req.err <- err + continue + } + + req.err <- nil + case next := <-s.delayScan: s.timer.Reset(next) } @@ -110,6 +140,15 @@ func (s *roFolder) Stop() { func (s *roFolder) IndexUpdated() { } +func (s *roFolder) Scan(subs []string) error { + req := rescanRequest{ + subs: subs, + err: make(chan error), + } + s.scanNow <- req + return <-req.err +} + func (s *roFolder) String() string { return fmt.Sprintf("roFolder/%s@%p", s.folder, s) } diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index 30e6e669..ff0627fa 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -32,7 +32,7 @@ import ( const ( pauseIntv = 60 * time.Second nextPullIntv = 10 * time.Second - shortPullIntv = 5 * time.Second + shortPullIntv = time.Second ) // A pullBlockState is passed to the puller routine for each block that needs @@ -90,6 +90,7 @@ type rwFolder struct { scanTimer *time.Timer pullTimer *time.Timer delayScan chan time.Duration + scanNow chan rescanRequest remoteIndex chan struct{} // An index update was received, we should re-evaluate needs } @@ -118,6 +119,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo pullTimer: time.NewTimer(shortPullIntv), scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. delayScan: make(chan time.Duration), + scanNow: make(chan rescanRequest), remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes. } } @@ -278,7 +280,7 @@ func (p *rwFolder) Serve() { l.Debugln(p, "rescan") } - if err := p.model.ScanFolder(p.folder); err != nil { + if err := p.model.internalScanFolderSubs(p.folder, nil); err != nil { // Potentially sets the error twice, once in the scanner just // by doing a check, and once here, if the error returned is // the same one as returned by CheckFolderHealth, though @@ -296,6 +298,29 @@ func (p *rwFolder) Serve() { initialScanCompleted = true } + case req := <-p.scanNow: + if err := p.model.CheckFolderHealth(p.folder); err != nil { + l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err) + req.err <- err + continue + } + + if debug { + l.Debugln(p, "forced rescan") + } + + if err := p.model.internalScanFolderSubs(p.folder, req.subs); err != nil { + // Potentially sets the error twice, once in the scanner just + // by doing a check, and once here, if the error returned is + // the same one as returned by CheckFolderHealth, though + // duplicate set is handled by setError. + p.setError(err) + req.err <- err + continue + } + + req.err <- nil + case next := <-p.delayScan: p.scanTimer.Reset(next) } @@ -317,6 +342,15 @@ func (p *rwFolder) IndexUpdated() { } } +func (p *rwFolder) Scan(subs []string) error { + req := rescanRequest{ + subs: subs, + err: make(chan error), + } + p.scanNow <- req + return <-req.err +} + func (p *rwFolder) String() string { return fmt.Sprintf("rwFolder/%s@%p", p.folder, p) } diff --git a/test/sync_test.go b/test/sync_test.go index cdb4fac9..4bcb9f1e 100644 --- a/test/sync_test.go +++ b/test/sync_test.go @@ -10,6 +10,7 @@ package integration import ( "fmt" + "io/ioutil" "log" "math/rand" "os" @@ -82,6 +83,18 @@ func TestSyncClusterStaggeredVersioning(t *testing.T) { testSyncCluster(t) } +func TestSyncClusterForcedRescan(t *testing.T) { + // Use no versioning + id, _ := protocol.DeviceIDFromString(id2) + cfg, _ := config.Load("h2/config.xml", id) + fld := cfg.Folders()["default"] + fld.Versioning = config.VersioningConfiguration{} + cfg.SetFolder(fld) + cfg.Save() + + testSyncClusterForcedRescan(t) +} + func testSyncCluster(t *testing.T) { // This tests syncing files back and forth between three cluster members. // Their configs are in h1, h2 and h3. The folder "default" is shared @@ -287,6 +300,116 @@ func testSyncCluster(t *testing.T) { } } +func testSyncClusterForcedRescan(t *testing.T) { + // During this test, we create 1K files, remove and then create them + // again. However, during these operations we will perform scan operations + // such that other nodes will retrieve these options while data is + // changing. + + // When -short is passed, keep it more reasonable. + timeLimit := longTimeLimit + if testing.Short() { + timeLimit = shortTimeLimit + } + + log.Println("Cleaning...") + err := removeAll("s1", "s12-1", + "s2", "s12-2", "s23-2", + "s3", "s23-3", + "h1/index*", "h2/index*", "h3/index*") + if err != nil { + t.Fatal(err) + } + + // Create initial folder contents. All three devices have stuff in + // "default", which should be merged. The other two folders are initially + // empty on one side. + + log.Println("Generating files...") + if err := os.MkdirAll("s1/test-stable-files", 0755); err != nil { + t.Fatal(err) + } + for i := 0; i < 1000; i++ { + name := fmt.Sprintf("s1/test-stable-files/%d", i) + if err := ioutil.WriteFile(name, []byte(time.Now().Format(time.RFC3339Nano)), 0644); err != nil { + t.Fatal(err) + } + } + + // Prepare the expected state of folders after the sync + expected, err := directoryContents("s1") + if err != nil { + t.Fatal(err) + } + + // Start the syncers + p0 := startInstance(t, 1) + defer checkedStop(t, p0) + p1 := startInstance(t, 2) + defer checkedStop(t, p1) + p2 := startInstance(t, 3) + defer checkedStop(t, p2) + + p := []*rc.Process{p0, p1, p2} + + start := time.Now() + for time.Since(start) < timeLimit { + rescan := func() { + for i := range p { + if err := p[i].Rescan("default"); err != nil { + t.Fatal(err) + } + } + } + + log.Println("Forcing rescan...") + rescan() + + // Sync stuff and verify it looks right + err = scSyncAndCompare(p, [][]fileInfo{expected}) + if err != nil { + t.Fatal(err) + } + + log.Println("Altering...") + + // Delete and recreate stable files while scanners and pullers are active + for i := 0; i < 1000; i++ { + name := fmt.Sprintf("s1/test-stable-files/%d", i) + if err := os.Remove(name); err != nil { + t.Fatal(err) + } + if rand.Intn(10) == 0 { + rescan() + } + } + + rescan() + + time.Sleep(50 * time.Millisecond) + for i := 0; i < 1000; i++ { + name := fmt.Sprintf("s1/test-stable-files/%d", i) + if err := ioutil.WriteFile(name, []byte(time.Now().Format(time.RFC3339Nano)), 0644); err != nil { + t.Fatal(err) + } + if rand.Intn(10) == 0 { + rescan() + } + } + + rescan() + + // Prepare the expected state of folders after the sync + expected, err = directoryContents("s1") + if err != nil { + t.Fatal(err) + } + if len(expected) != 1001 { + t.Fatal("s1 does not have 1001 files;", len(expected)) + } + } +} + func scSyncAndCompare(p []*rc.Process, expected [][]fileInfo) error { log.Println("Syncing...")