diff --git a/lib/model/folder.go b/lib/model/folder.go index 48dcf8ff..df35aeab 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -39,6 +39,12 @@ type folder struct { restartWatchChan chan struct{} watchErr error watchErrMut sync.Mutex + + puller puller +} + +type puller interface { + pull() bool // true when successfull and should not be retried } func newFolder(model *Model, cfg config.FolderConfiguration) folder { @@ -64,6 +70,89 @@ func newFolder(model *Model, cfg config.FolderConfiguration) folder { } } +func (f *folder) Serve() { + l.Debugln(f, "starting") + defer l.Debugln(f, "exiting") + + defer func() { + f.scan.timer.Stop() + f.setState(FolderIdle) + }() + + pause := f.basePause() + pullFailTimer := time.NewTimer(0) + <-pullFailTimer.C + + if f.FSWatcherEnabled && f.CheckHealth() == nil { + f.startWatch() + } + + initialCompleted := f.initialScanFinished + + for { + select { + case <-f.ctx.Done(): + return + + case <-f.pullScheduled: + pullFailTimer.Stop() + select { + case <-pullFailTimer.C: + default: + } + + if !f.puller.pull() { + // Pulling failed, try again later. + pullFailTimer.Reset(pause) + } + + case <-pullFailTimer.C: + if f.puller.pull() { + // We're good. Don't schedule another fail pull and reset + // the pause interval. + pause = f.basePause() + continue + } + + // Pulling failed, try again later. + l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause) + pullFailTimer.Reset(pause) + // Back off from retrying to pull with an upper limit. + if pause < 60*f.basePause() { + pause *= 2 + } + + case <-initialCompleted: + // Initial scan has completed, we should do a pull + initialCompleted = nil // never hit this case again + if !f.puller.pull() { + // Pulling failed, try again later. + pullFailTimer.Reset(pause) + } + + // The reason for running the scanner from within the puller is that + // this is the easiest way to make sure we are not doing both at the + // same time. + case <-f.scan.timer.C: + l.Debugln(f, "Scanning subdirectories") + f.scanTimerFired() + + case req := <-f.scan.now: + req.err <- f.scanSubdirs(req.subdirs) + + case next := <-f.scan.delay: + f.scan.timer.Reset(next) + + case fsEvents := <-f.watchChan: + l.Debugln(f, "filesystem notification rescan") + f.scanSubdirs(fsEvents) + + case <-f.restartWatchChan: + f.restartWatch() + } + } +} + func (f *folder) BringToFront(string) {} func (f *folder) DelayScan(next time.Duration) { @@ -258,3 +347,10 @@ func (f *folder) setError(err error) { f.stateTracker.setError(err) } + +func (f *folder) basePause() time.Duration { + if f.PullerPauseS == 0 { + return defaultPullerPause + } + return time.Duration(f.PullerPauseS) * time.Second +} diff --git a/lib/model/rofolder.go b/lib/model/rofolder.go index f2589642..4a5f9111 100644 --- a/lib/model/rofolder.go +++ b/lib/model/rofolder.go @@ -25,47 +25,11 @@ type sendOnlyFolder struct { } func newSendOnlyFolder(model *Model, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem) service { - return &sendOnlyFolder{folder: newFolder(model, cfg)} -} - -func (f *sendOnlyFolder) Serve() { - l.Debugln(f, "starting") - defer l.Debugln(f, "exiting") - - defer func() { - f.scan.timer.Stop() - }() - - if f.FSWatcherEnabled && f.CheckHealth() == nil { - f.startWatch() - } - - for { - select { - case <-f.ctx.Done(): - return - - case <-f.pullScheduled: - f.pull() - - case <-f.restartWatchChan: - f.restartWatch() - - case <-f.scan.timer.C: - l.Debugln(f, "Scanning subdirectories") - f.scanTimerFired() - - case req := <-f.scan.now: - req.err <- f.scanSubdirs(req.subdirs) - - case next := <-f.scan.delay: - f.scan.timer.Reset(next) - - case fsEvents := <-f.watchChan: - l.Debugln(f, "filesystem notification rescan") - f.scanSubdirs(fsEvents) - } + f := &sendOnlyFolder{ + folder: newFolder(model, cfg), } + f.folder.puller = f + return f } func (f *sendOnlyFolder) String() string { @@ -77,12 +41,12 @@ func (f *sendOnlyFolder) PullErrors() []FileError { } // pull checks need for files that only differ by metadata (no changes on disk) -func (f *sendOnlyFolder) pull() { +func (f *sendOnlyFolder) pull() bool { select { case <-f.initialScanFinished: default: // Once the initial scan finished, a pull will be scheduled - return + return false } f.model.fmut.RLock() @@ -133,4 +97,6 @@ func (f *sendOnlyFolder) pull() { if len(batch) > 0 { f.model.updateLocalsFromPulling(f.folderID, batch) } + + return true } diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index 8007222c..ac42cf37 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -94,9 +94,9 @@ type dbUpdateJob struct { type sendReceiveFolder struct { folder - fs fs.Filesystem - versioner versioner.Versioner - pause time.Duration + prevIgnoreHash string + fs fs.Filesystem + versioner versioner.Versioner queue *jobQueue @@ -106,15 +106,13 @@ type sendReceiveFolder struct { func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem) service { f := &sendReceiveFolder{ - folder: newFolder(model, cfg), - + folder: newFolder(model, cfg), fs: fs, versioner: ver, - - queue: newJobQueue(), - + queue: newJobQueue(), errorsMut: sync.NewMutex(), } + f.folder.puller = f if f.Copiers == 0 { f.Copiers = defaultCopiers @@ -130,126 +128,32 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers f.PullerMaxPendingKiB = blockSizeKiB } - f.pause = f.basePause() - return f } -// Serve will run scans and pulls. It will return when Stop()ed or on a -// critical error. -func (f *sendReceiveFolder) Serve() { - l.Debugln(f, "starting") - defer l.Debugln(f, "exiting") - - defer func() { - f.scan.timer.Stop() - // TODO: Should there be an actual FolderStopped state? - f.setState(FolderIdle) - }() - - var prevIgnoreHash string - var success bool - pullFailTimer := time.NewTimer(time.Duration(0)) - <-pullFailTimer.C - - if f.FSWatcherEnabled && f.CheckHealth() == nil { - f.startWatch() - } - - initialCompleted := f.initialScanFinished - - for { - select { - case <-f.ctx.Done(): - return - - case <-f.pullScheduled: - pullFailTimer.Stop() - select { - case <-pullFailTimer.C: - default: - } - - if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { - // Pulling failed, try again later. - pullFailTimer.Reset(f.pause) - } - - case <-pullFailTimer.C: - if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { - // Pulling failed, try again later. - pullFailTimer.Reset(f.pause) - // Back off from retrying to pull with an upper limit. - if f.pause < 60*f.basePause() { - f.pause *= 2 - } - } - - case <-initialCompleted: - // Initial scan has completed, we should do a pull - initialCompleted = nil // never hit this case again - if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { - // Pulling failed, try again later. - pullFailTimer.Reset(f.pause) - } - - // The reason for running the scanner from within the puller is that - // this is the easiest way to make sure we are not doing both at the - // same time. - case <-f.scan.timer.C: - l.Debugln(f, "Scanning subdirectories") - f.scanTimerFired() - - case req := <-f.scan.now: - req.err <- f.scanSubdirs(req.subdirs) - - case next := <-f.scan.delay: - f.scan.timer.Reset(next) - - case fsEvents := <-f.watchChan: - l.Debugln(f, "filesystem notification rescan") - f.scanSubdirs(fsEvents) - - case <-f.restartWatchChan: - f.restartWatch() - } - } -} - -func (f *sendReceiveFolder) SchedulePull() { - select { - case f.pullScheduled <- struct{}{}: - default: - // We might be busy doing a pull and thus not reading from this - // channel. The channel is 1-buffered, so one notification will be - // queued to ensure we recheck after the pull, but beyond that we must - // make sure to not block index receiving. - } -} - func (f *sendReceiveFolder) String() string { return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f) } -func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, success bool) { +func (f *sendReceiveFolder) pull() bool { select { case <-f.initialScanFinished: default: // Once the initial scan finished, a pull will be scheduled - return prevIgnoreHash, true + return true } if err := f.CheckHealth(); err != nil { l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err) - return prevIgnoreHash, true + return true } f.model.fmut.RLock() curIgnores := f.model.folderIgnores[f.folderID] f.model.fmut.RUnlock() - curIgnoreHash = curIgnores.Hash() - ignoresChanged := curIgnoreHash != prevIgnoreHash + curIgnoreHash := curIgnores.Hash() + ignoresChanged := curIgnoreHash != f.prevIgnoreHash l.Debugf("%v pulling (ignoresChanged=%v)", f, ignoresChanged) @@ -270,10 +174,7 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s if changed == 0 { // No files were changed by the puller, so we are in - // sync. Update the local version number. - - f.pause = f.basePause() - + // sync. break } @@ -288,10 +189,6 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s "errors": folderErrors, }) } - - l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause) - l.Debugln(f, "next pull in", f.pause) - break } } @@ -301,10 +198,11 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s close(scanChan) if changed == 0 { - return curIgnoreHash, true + f.prevIgnoreHash = curIgnoreHash + return true } - return prevIgnoreHash, false + return false } // pullerIteration runs a single puller iteration for the given folder and @@ -1800,13 +1698,6 @@ func (f *sendReceiveFolder) PullErrors() []FileError { return errors } -func (f *sendReceiveFolder) basePause() time.Duration { - if f.PullerPauseS == 0 { - return defaultPullerPause - } - return time.Duration(f.PullerPauseS) * time.Second -} - // deleteDir attempts to delete a directory. It checks for files/dirs inside // the directory and removes them if possible or returns an error if it fails func (f *sendReceiveFolder) deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error {