diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 8ce194fd..7e91f9c7 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -155,7 +155,7 @@ func (s *apiSvc) Serve() { postRestMux.HandleFunc("/rest/db/prio", s.postDBPrio) // folder file [perpage] [page] postRestMux.HandleFunc("/rest/db/ignores", s.postDBIgnores) // folder postRestMux.HandleFunc("/rest/db/override", s.postDBOverride) // folder - postRestMux.HandleFunc("/rest/db/scan", s.postDBScan) // folder [sub...] + postRestMux.HandleFunc("/rest/db/scan", s.postDBScan) // folder [sub...] [delay] postRestMux.HandleFunc("/rest/system/config", s.postSystemConfig) // postRestMux.HandleFunc("/rest/system/discovery", s.postSystemDiscovery) // device addr postRestMux.HandleFunc("/rest/system/error", s.postSystemError) // @@ -779,14 +779,21 @@ func (s *apiSvc) postDBScan(w http.ResponseWriter, r *http.Request) { err := s.model.ScanFolderSubs(folder, subs) if err != nil { http.Error(w, err.Error(), 500) + return } } else { errors := s.model.ScanFolders() if len(errors) > 0 { http.Error(w, "Error scanning folders", 500) json.NewEncoder(w).Encode(errors) + return } } + nextStr := qs.Get("next") + next, err := strconv.Atoi(nextStr) + if err == nil { + s.model.DelayScan(folder, time.Duration(next)*time.Second) + } } func (s *apiSvc) postDBPrio(w http.ResponseWriter, r *http.Request) { diff --git a/internal/model/.model.go.swp b/internal/model/.model.go.swp new file mode 100644 index 00000000..2ab0c78a Binary files /dev/null and b/internal/model/.model.go.swp differ diff --git a/internal/model/model.go b/internal/model/model.go index fc77f550..a5bb03e8 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -49,6 +49,7 @@ type service interface { Stop() Jobs() ([]string, []string) // In progress, Queued BringToFront(string) + DelayScan(d time.Duration) setState(state folderState) setError(err error) @@ -1322,6 +1323,16 @@ nextSub: return nil } +func (m *Model) DelayScan(folder string, next time.Duration) { + m.fmut.Lock() + runner, ok := m.folderRunners[folder] + m.fmut.Unlock() + if !ok { + return + } + runner.DelayScan(next) +} + // numHashers returns the number of hasher routines to use for a given folder, // taking into account configuration and available CPU cores. func (m *Model) numHashers(folder string) int { diff --git a/internal/model/rofolder.go b/internal/model/rofolder.go index ac867273..76c928dd 100644 --- a/internal/model/rofolder.go +++ b/internal/model/rofolder.go @@ -19,6 +19,8 @@ type roFolder struct { folder string intv time.Duration + timer *time.Timer + tmut sync.Mutex // protects timer model *Model stop chan struct{} } @@ -31,6 +33,8 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder }, folder: folder, intv: interval, + timer: time.NewTimer(time.Millisecond), + tmut: sync.NewMutex(), model: model, stop: make(chan struct{}), } @@ -42,13 +46,18 @@ func (s *roFolder) Serve() { defer l.Debugln(s, "exiting") } - timer := time.NewTimer(time.Millisecond) - defer timer.Stop() + defer func() { + s.tmut.Lock() + s.timer.Stop() + s.tmut.Unlock() + }() reschedule := func() { // Sleep a random time between 3/4 and 5/4 of the configured interval. sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4 - timer.Reset(time.Duration(sleepNanos) * time.Nanosecond) + s.tmut.Lock() + s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond) + s.tmut.Unlock() } initialScanCompleted := false @@ -57,7 +66,7 @@ func (s *roFolder) Serve() { case <-s.stop: return - case <-timer.C: + case <-s.timer.C: if err := s.model.CheckFolderHealth(s.folder); err != nil { l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err) reschedule() @@ -105,3 +114,9 @@ func (s *roFolder) BringToFront(string) {} func (s *roFolder) Jobs() ([]string, []string) { return nil, nil } + +func (s *roFolder) DelayScan(next time.Duration) { + s.tmut.Lock() + s.timer.Reset(next) + s.tmut.Unlock() +} diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index fdb7b5cc..07849483 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -74,6 +74,9 @@ type rwFolder struct { stop chan struct{} queue *jobQueue dbUpdates chan protocol.FileInfo + scanTimer *time.Timer + pullTimer *time.Timer + tmut sync.Mutex // protects scanTimer and pullTimer } func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { @@ -96,8 +99,11 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo shortID: shortID, order: cfg.Order, - stop: make(chan struct{}), - queue: newJobQueue(), + stop: make(chan struct{}), + queue: newJobQueue(), + pullTimer: time.NewTimer(checkPullIntv), + scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. + tmut: sync.NewMutex(), } } @@ -109,12 +115,11 @@ func (p *rwFolder) Serve() { defer l.Debugln(p, "exiting") } - pullTimer := time.NewTimer(checkPullIntv) - scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately. - defer func() { - pullTimer.Stop() - scanTimer.Stop() + p.tmut.Lock() + p.pullTimer.Stop() + p.scanTimer.Stop() + p.tmut.Unlock() // TODO: Should there be an actual FolderStopped state? p.setState(FolderIdle) }() @@ -135,7 +140,9 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "next rescan in", intv) } - scanTimer.Reset(intv) + p.tmut.Lock() + p.scanTimer.Reset(intv) + p.tmut.Unlock() } // We don't start pulling files until a scan has been completed. @@ -151,12 +158,14 @@ func (p *rwFolder) Serve() { // information is available. Before that though, I'd like to build a // repeatable benchmark of how long it takes to sync a change from // device A to device B, so we have something to work against. - case <-pullTimer.C: + case <-p.pullTimer.C: if !initialScanCompleted { if debug { l.Debugln(p, "skip (initial)") } - pullTimer.Reset(nextPullIntv) + p.tmut.Lock() + p.pullTimer.Reset(nextPullIntv) + p.tmut.Unlock() continue } @@ -180,7 +189,9 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "skip (curVer == prevVer)", prevVer) } - pullTimer.Reset(checkPullIntv) + p.tmut.Lock() + p.pullTimer.Reset(checkPullIntv) + p.tmut.Unlock() continue } @@ -218,7 +229,9 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "next pull in", nextPullIntv) } - pullTimer.Reset(nextPullIntv) + p.tmut.Lock() + p.pullTimer.Reset(nextPullIntv) + p.tmut.Unlock() break } @@ -231,7 +244,9 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "next pull in", pauseIntv) } - pullTimer.Reset(pauseIntv) + p.tmut.Lock() + p.pullTimer.Reset(pauseIntv) + p.tmut.Unlock() break } } @@ -240,7 +255,7 @@ func (p *rwFolder) Serve() { // The reason for running the scanner from within the puller is that // this is the easiest way to make sure we are not doing both at the // same time. - case <-scanTimer.C: + case <-p.scanTimer.C: if err := p.model.CheckFolderHealth(p.folder); err != nil { l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err) rescheduleScan() @@ -1165,6 +1180,12 @@ func (p *rwFolder) Jobs() ([]string, []string) { return p.queue.Jobs() } +func (p *rwFolder) DelayScan(next time.Duration) { + p.tmut.Lock() + p.scanTimer.Reset(next) + p.tmut.Unlock() +} + // dbUpdaterRoutine aggregates db updates and commits them in batches no // larger than 1000 items, and no more delayed than 2 seconds. func (p *rwFolder) dbUpdaterRoutine() { diff --git a/test/delay_scan_test.go b/test/delay_scan_test.go new file mode 100644 index 00000000..081100aa --- /dev/null +++ b/test/delay_scan_test.go @@ -0,0 +1,91 @@ +// Copyright (C) 2014 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +// +build integration + +package integration + +import ( + "io/ioutil" + "log" + "sync" + "testing" + "time" +) + +func TestDelayScan(t *testing.T) { + log.Println("Cleaning...") + err := removeAll("s1", "h1/index*") + if err != nil { + t.Fatal(err) + } + + log.Println("Generating files...") + err = generateFiles("s1", 50, 18, "../LICENSE") + if err != nil { + t.Fatal(err) + } + + log.Println("Generating .stignore...") + err = ioutil.WriteFile("s1/.stignore", []byte("some ignore data\n"), 0644) + if err != nil { + t.Fatal(err) + } + + log.Println("Starting up...") + st := syncthingProcess{ // id1 + instance: "1", + argv: []string{"-home", "h1"}, + port: 8081, + apiKey: apiKey, + } + err = st.start() + if err != nil { + t.Fatal(err) + } + + // Wait for one scan to succeed, or up to 20 seconds... + // This is to let startup, UPnP etc complete. + for i := 0; i < 20; i++ { + err := st.rescan("default") + if err != nil { + time.Sleep(time.Second) + continue + } + break + } + + // Wait for UPnP and stuff + time.Sleep(10 * time.Second) + + var wg sync.WaitGroup + log.Println("Starting scans...") + for j := 0; j < 20; j++ { + j := j + wg.Add(1) + go func() { + defer wg.Done() + err := st.rescanNext("default", time.Duration(1)*time.Second) + log.Println(j) + if err != nil { + log.Println(err) + t.Fatal(err) + } + }() + } + + wg.Wait() + log.Println("Scans done") + time.Sleep(2 * time.Second) + + // This is where the real test is currently, since stop() checks for data + // race output in the log. + log.Println("Stopping...") + _, err = st.stop() + if err != nil { + t.Fatal(err) + } +} diff --git a/test/syncthingprocess.go b/test/syncthingprocess.go index 94e77eaa..18ccb0c1 100644 --- a/test/syncthingprocess.go +++ b/test/syncthingprocess.go @@ -20,6 +20,7 @@ import ( "net/http" "os" "os/exec" + "strconv" "time" "github.com/syncthing/protocol" @@ -322,6 +323,19 @@ func (p *syncthingProcess) rescan(folder string) error { return nil } +func (p *syncthingProcess) rescanNext(folder string, next time.Duration) error { + resp, err := p.post("/rest/db/scan?folder="+folder+"&next="+strconv.Itoa(int(next.Seconds())), nil) + if err != nil { + return err + } + data, _ := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if resp.StatusCode != 200 { + return fmt.Errorf("Rescan %q: status code %d: %s", folder, resp.StatusCode, data) + } + return nil +} + func (p *syncthingProcess) reset(folder string) error { resp, err := p.post("/rest/system/reset?folder="+folder, nil) if err != nil {