Make puller pause configurable

This commit is contained in:
Audrius Butkevicius 2015-10-08 00:25:32 +01:00
parent 49f29a0453
commit d985ed553a
2 changed files with 36 additions and 24 deletions

View File

@ -108,7 +108,9 @@ type FolderConfiguration struct {
Hashers int `xml:"hashers" json:"hashers"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing. Hashers int `xml:"hashers" json:"hashers"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing.
Order PullOrder `xml:"order" json:"order"` Order PullOrder `xml:"order" json:"order"`
IgnoreDelete bool `xml:"ignoreDelete" json:"ignoreDelete"` IgnoreDelete bool `xml:"ignoreDelete" json:"ignoreDelete"`
ScanProgressIntervalS int `xml:"scanProgressInterval" json:"scanProgressInterval"` // Set to a negative value to disable. Value of 0 will get replaced with value of 2 (default value) ScanProgressIntervalS int `xml:"scanProgressIntervalS" json:"scanProgressIntervalS"` // Set to a negative value to disable. Value of 0 will get replaced with value of 2 (default value)
PullerSleepS int `xml:"pullerSleepS" json:"pullerSleepS"`
PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"`
Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved Invalid string `xml:"-" json:"invalid"` // Set at runtime when there is an error, not saved
} }

View File

@ -30,12 +30,6 @@ import (
// TODO: Stop on errors // TODO: Stop on errors
const (
pauseIntv = 60 * time.Second
nextPullIntv = 10 * time.Second
shortPullIntv = time.Second
)
// A pullBlockState is passed to the puller routine for each block that needs // A pullBlockState is passed to the puller routine for each block that needs
// to be fetched. // to be fetched.
type pullBlockState struct { type pullBlockState struct {
@ -67,8 +61,10 @@ const (
) )
const ( const (
defaultCopiers = 1 defaultCopiers = 1
defaultPullers = 16 defaultPullers = 16
defaultPullerSleep = 10 * time.Second
defaultPullerPause = 60 * time.Second
) )
type dbUpdateJob struct { type dbUpdateJob struct {
@ -92,6 +88,8 @@ type rwFolder struct {
pullers int pullers int
shortID uint64 shortID uint64
order config.PullOrder order config.PullOrder
sleep time.Duration
pause time.Duration
stop chan struct{} stop chan struct{}
queue *jobQueue queue *jobQueue
@ -128,7 +126,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
stop: make(chan struct{}), stop: make(chan struct{}),
queue: newJobQueue(), queue: newJobQueue(),
pullTimer: time.NewTimer(shortPullIntv), pullTimer: time.NewTimer(time.Second),
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
delayScan: make(chan time.Duration), delayScan: make(chan time.Duration),
scanNow: make(chan rescanRequest), scanNow: make(chan rescanRequest),
@ -144,6 +142,18 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
p.pullers = defaultPullers p.pullers = defaultPullers
} }
if cfg.PullerPauseS == 0 {
p.pause = defaultPullerPause
} else {
p.pause = time.Duration(cfg.PullerPauseS) * time.Second
}
if cfg.PullerSleepS == 0 {
p.sleep = defaultPullerSleep
} else {
p.sleep = time.Duration(cfg.PullerSleepS) * time.Second
}
return p return p
} }
@ -194,19 +204,13 @@ func (p *rwFolder) Serve() {
case <-p.remoteIndex: case <-p.remoteIndex:
prevVer = 0 prevVer = 0
p.pullTimer.Reset(shortPullIntv) p.pullTimer.Reset(0)
l.Debugln(p, "remote index updated, rescheduling pull") l.Debugln(p, "remote index updated, rescheduling pull")
case <-p.pullTimer.C: case <-p.pullTimer.C:
if !initialScanCompleted { if !initialScanCompleted {
l.Debugln(p, "skip (initial)") l.Debugln(p, "skip (initial)")
p.pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(p.sleep)
continue
}
if err := p.model.CheckFolderHealth(p.folder); err != nil {
l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
p.pullTimer.Reset(nextPullIntv)
continue continue
} }
@ -226,7 +230,13 @@ func (p *rwFolder) Serve() {
curVer, ok := p.model.RemoteLocalVersion(p.folder) curVer, ok := p.model.RemoteLocalVersion(p.folder)
if !ok || curVer == prevVer { if !ok || curVer == prevVer {
l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok) l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok)
p.pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(p.sleep)
continue
}
if err := p.model.CheckFolderHealth(p.folder); err != nil {
l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
p.pullTimer.Reset(p.sleep)
continue continue
} }
@ -260,8 +270,8 @@ func (p *rwFolder) Serve() {
curVer = lv curVer = lv
} }
prevVer = curVer prevVer = curVer
l.Debugln(p, "next pull in", nextPullIntv) l.Debugln(p, "next pull in", p.sleep)
p.pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(p.sleep)
break break
} }
@ -270,8 +280,8 @@ func (p *rwFolder) Serve() {
// we're not making it. Probably there are write // we're not making it. Probably there are write
// errors preventing us. Flag this with a warning and // errors preventing us. Flag this with a warning and
// wait a bit longer before retrying. // wait a bit longer before retrying.
l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, pauseIntv) l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, p.pause)
l.Debugln(p, "next pull in", pauseIntv) l.Debugln(p, "next pull in", p.pause)
if folderErrors := p.currentErrors(); len(folderErrors) > 0 { if folderErrors := p.currentErrors(); len(folderErrors) > 0 {
events.Default.Log(events.FolderErrors, map[string]interface{}{ events.Default.Log(events.FolderErrors, map[string]interface{}{
@ -280,7 +290,7 @@ func (p *rwFolder) Serve() {
}) })
} }
p.pullTimer.Reset(pauseIntv) p.pullTimer.Reset(p.pause)
break break
} }
} }