Reschedule the next scan interval (fixes #1591)
This commit is contained in:
BIN
internal/model/.model.go.swp
Normal file
BIN
internal/model/.model.go.swp
Normal file
Binary file not shown.
@@ -49,6 +49,7 @@ type service interface {
|
||||
Stop()
|
||||
Jobs() ([]string, []string) // In progress, Queued
|
||||
BringToFront(string)
|
||||
DelayScan(d time.Duration)
|
||||
|
||||
setState(state folderState)
|
||||
setError(err error)
|
||||
@@ -1322,6 +1323,16 @@ nextSub:
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Model) DelayScan(folder string, next time.Duration) {
|
||||
m.fmut.Lock()
|
||||
runner, ok := m.folderRunners[folder]
|
||||
m.fmut.Unlock()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
runner.DelayScan(next)
|
||||
}
|
||||
|
||||
// numHashers returns the number of hasher routines to use for a given folder,
|
||||
// taking into account configuration and available CPU cores.
|
||||
func (m *Model) numHashers(folder string) int {
|
||||
|
||||
@@ -19,6 +19,8 @@ type roFolder struct {
|
||||
|
||||
folder string
|
||||
intv time.Duration
|
||||
timer *time.Timer
|
||||
tmut sync.Mutex // protects timer
|
||||
model *Model
|
||||
stop chan struct{}
|
||||
}
|
||||
@@ -31,6 +33,8 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder
|
||||
},
|
||||
folder: folder,
|
||||
intv: interval,
|
||||
timer: time.NewTimer(time.Millisecond),
|
||||
tmut: sync.NewMutex(),
|
||||
model: model,
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
@@ -42,13 +46,18 @@ func (s *roFolder) Serve() {
|
||||
defer l.Debugln(s, "exiting")
|
||||
}
|
||||
|
||||
timer := time.NewTimer(time.Millisecond)
|
||||
defer timer.Stop()
|
||||
defer func() {
|
||||
s.tmut.Lock()
|
||||
s.timer.Stop()
|
||||
s.tmut.Unlock()
|
||||
}()
|
||||
|
||||
reschedule := func() {
|
||||
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
||||
sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4
|
||||
timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
|
||||
s.tmut.Lock()
|
||||
s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
|
||||
s.tmut.Unlock()
|
||||
}
|
||||
|
||||
initialScanCompleted := false
|
||||
@@ -57,7 +66,7 @@ func (s *roFolder) Serve() {
|
||||
case <-s.stop:
|
||||
return
|
||||
|
||||
case <-timer.C:
|
||||
case <-s.timer.C:
|
||||
if err := s.model.CheckFolderHealth(s.folder); err != nil {
|
||||
l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err)
|
||||
reschedule()
|
||||
@@ -105,3 +114,9 @@ func (s *roFolder) BringToFront(string) {}
|
||||
func (s *roFolder) Jobs() ([]string, []string) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *roFolder) DelayScan(next time.Duration) {
|
||||
s.tmut.Lock()
|
||||
s.timer.Reset(next)
|
||||
s.tmut.Unlock()
|
||||
}
|
||||
|
||||
@@ -74,6 +74,9 @@ type rwFolder struct {
|
||||
stop chan struct{}
|
||||
queue *jobQueue
|
||||
dbUpdates chan protocol.FileInfo
|
||||
scanTimer *time.Timer
|
||||
pullTimer *time.Timer
|
||||
tmut sync.Mutex // protects scanTimer and pullTimer
|
||||
}
|
||||
|
||||
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
|
||||
@@ -96,8 +99,11 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
|
||||
shortID: shortID,
|
||||
order: cfg.Order,
|
||||
|
||||
stop: make(chan struct{}),
|
||||
queue: newJobQueue(),
|
||||
stop: make(chan struct{}),
|
||||
queue: newJobQueue(),
|
||||
pullTimer: time.NewTimer(checkPullIntv),
|
||||
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
||||
tmut: sync.NewMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,12 +115,11 @@ func (p *rwFolder) Serve() {
|
||||
defer l.Debugln(p, "exiting")
|
||||
}
|
||||
|
||||
pullTimer := time.NewTimer(checkPullIntv)
|
||||
scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
|
||||
|
||||
defer func() {
|
||||
pullTimer.Stop()
|
||||
scanTimer.Stop()
|
||||
p.tmut.Lock()
|
||||
p.pullTimer.Stop()
|
||||
p.scanTimer.Stop()
|
||||
p.tmut.Unlock()
|
||||
// TODO: Should there be an actual FolderStopped state?
|
||||
p.setState(FolderIdle)
|
||||
}()
|
||||
@@ -135,7 +140,9 @@ func (p *rwFolder) Serve() {
|
||||
if debug {
|
||||
l.Debugln(p, "next rescan in", intv)
|
||||
}
|
||||
scanTimer.Reset(intv)
|
||||
p.tmut.Lock()
|
||||
p.scanTimer.Reset(intv)
|
||||
p.tmut.Unlock()
|
||||
}
|
||||
|
||||
// We don't start pulling files until a scan has been completed.
|
||||
@@ -151,12 +158,14 @@ func (p *rwFolder) Serve() {
|
||||
// information is available. Before that though, I'd like to build a
|
||||
// repeatable benchmark of how long it takes to sync a change from
|
||||
// device A to device B, so we have something to work against.
|
||||
case <-pullTimer.C:
|
||||
case <-p.pullTimer.C:
|
||||
if !initialScanCompleted {
|
||||
if debug {
|
||||
l.Debugln(p, "skip (initial)")
|
||||
}
|
||||
pullTimer.Reset(nextPullIntv)
|
||||
p.tmut.Lock()
|
||||
p.pullTimer.Reset(nextPullIntv)
|
||||
p.tmut.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -180,7 +189,9 @@ func (p *rwFolder) Serve() {
|
||||
if debug {
|
||||
l.Debugln(p, "skip (curVer == prevVer)", prevVer)
|
||||
}
|
||||
pullTimer.Reset(checkPullIntv)
|
||||
p.tmut.Lock()
|
||||
p.pullTimer.Reset(checkPullIntv)
|
||||
p.tmut.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -218,7 +229,9 @@ func (p *rwFolder) Serve() {
|
||||
if debug {
|
||||
l.Debugln(p, "next pull in", nextPullIntv)
|
||||
}
|
||||
pullTimer.Reset(nextPullIntv)
|
||||
p.tmut.Lock()
|
||||
p.pullTimer.Reset(nextPullIntv)
|
||||
p.tmut.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
@@ -231,7 +244,9 @@ func (p *rwFolder) Serve() {
|
||||
if debug {
|
||||
l.Debugln(p, "next pull in", pauseIntv)
|
||||
}
|
||||
pullTimer.Reset(pauseIntv)
|
||||
p.tmut.Lock()
|
||||
p.pullTimer.Reset(pauseIntv)
|
||||
p.tmut.Unlock()
|
||||
break
|
||||
}
|
||||
}
|
||||
@@ -240,7 +255,7 @@ func (p *rwFolder) Serve() {
|
||||
// The reason for running the scanner from within the puller is that
|
||||
// this is the easiest way to make sure we are not doing both at the
|
||||
// same time.
|
||||
case <-scanTimer.C:
|
||||
case <-p.scanTimer.C:
|
||||
if err := p.model.CheckFolderHealth(p.folder); err != nil {
|
||||
l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
|
||||
rescheduleScan()
|
||||
@@ -1165,6 +1180,12 @@ func (p *rwFolder) Jobs() ([]string, []string) {
|
||||
return p.queue.Jobs()
|
||||
}
|
||||
|
||||
func (p *rwFolder) DelayScan(next time.Duration) {
|
||||
p.tmut.Lock()
|
||||
p.scanTimer.Reset(next)
|
||||
p.tmut.Unlock()
|
||||
}
|
||||
|
||||
// dbUpdaterRoutine aggregates db updates and commits them in batches no
|
||||
// larger than 1000 items, and no more delayed than 2 seconds.
|
||||
func (p *rwFolder) dbUpdaterRoutine() {
|
||||
|
||||
Reference in New Issue
Block a user