lib/model: Refactor encapsulation of the folder scanning
GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3017
This commit is contained in:
parent
4bf3e7485b
commit
af0bc95de5
@ -86,7 +86,7 @@ type modelIntf interface {
|
|||||||
DelayScan(folder string, next time.Duration)
|
DelayScan(folder string, next time.Duration)
|
||||||
ScanFolder(folder string) error
|
ScanFolder(folder string) error
|
||||||
ScanFolders() map[string]error
|
ScanFolders() map[string]error
|
||||||
ScanFolderSubs(folder string, subs []string) error
|
ScanFolderSubdirs(folder string, subs []string) error
|
||||||
BringToFront(folder, file string)
|
BringToFront(folder, file string)
|
||||||
ConnectedTo(deviceID protocol.DeviceID) bool
|
ConnectedTo(deviceID protocol.DeviceID) bool
|
||||||
GlobalSize(folder string) (nfiles, deleted int, bytes int64)
|
GlobalSize(folder string) (nfiles, deleted int, bytes int64)
|
||||||
@ -1071,7 +1071,7 @@ func (s *apiService) postDBScan(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
subs := qs["sub"]
|
subs := qs["sub"]
|
||||||
err = s.model.ScanFolderSubs(folder, subs)
|
err = s.model.ScanFolderSubdirs(folder, subs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), 500)
|
http.Error(w, err.Error(), 500)
|
||||||
return
|
return
|
||||||
|
|||||||
@ -85,7 +85,7 @@ func (m *mockedModel) ScanFolders() map[string]error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockedModel) ScanFolderSubs(folder string, subs []string) error {
|
func (m *mockedModel) ScanFolderSubdirs(folder string, subs []string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import "time"
|
|||||||
|
|
||||||
type folder struct {
|
type folder struct {
|
||||||
stateTracker
|
stateTracker
|
||||||
scan folderscan
|
scan folderScanner
|
||||||
model *Model
|
model *Model
|
||||||
stop chan struct{}
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,49 +0,0 @@
|
|||||||
// Copyright (C) 2016 The Syncthing Authors.
|
|
||||||
//
|
|
||||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
||||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
||||||
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
||||||
|
|
||||||
package model
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type rescanRequest struct {
|
|
||||||
subdirs []string
|
|
||||||
err chan error
|
|
||||||
}
|
|
||||||
|
|
||||||
// bundle all folder scan activity
|
|
||||||
type folderscan struct {
|
|
||||||
interval time.Duration
|
|
||||||
timer *time.Timer
|
|
||||||
now chan rescanRequest
|
|
||||||
delay chan time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *folderscan) reschedule() {
|
|
||||||
if s.interval == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
|
||||||
sleepNanos := (s.interval.Nanoseconds()*3 + rand.Int63n(2*s.interval.Nanoseconds())) / 4
|
|
||||||
interval := time.Duration(sleepNanos) * time.Nanosecond
|
|
||||||
l.Debugln(s, "next rescan in", interval)
|
|
||||||
s.timer.Reset(interval)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *folderscan) Scan(subdirs []string) error {
|
|
||||||
req := rescanRequest{
|
|
||||||
subdirs: subdirs,
|
|
||||||
err: make(chan error),
|
|
||||||
}
|
|
||||||
s.now <- req
|
|
||||||
return <-req.err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *folderscan) Delay(next time.Duration) {
|
|
||||||
s.delay <- next
|
|
||||||
}
|
|
||||||
63
lib/model/folderscanner.go
Normal file
63
lib/model/folderscanner.go
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
// Copyright (C) 2016 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
|
"math/rand"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rescanRequest struct {
|
||||||
|
subdirs []string
|
||||||
|
err chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
// bundle all folder scan activity
|
||||||
|
type folderScanner struct {
|
||||||
|
interval time.Duration
|
||||||
|
timer *time.Timer
|
||||||
|
now chan rescanRequest
|
||||||
|
delay chan time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFolderScanner(config config.FolderConfiguration) folderScanner {
|
||||||
|
return folderScanner{
|
||||||
|
interval: time.Duration(config.RescanIntervalS) * time.Second,
|
||||||
|
timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
||||||
|
now: make(chan rescanRequest),
|
||||||
|
delay: make(chan time.Duration),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *folderScanner) Reschedule() {
|
||||||
|
if f.interval == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
||||||
|
sleepNanos := (f.interval.Nanoseconds()*3 + rand.Int63n(2*f.interval.Nanoseconds())) / 4
|
||||||
|
interval := time.Duration(sleepNanos) * time.Nanosecond
|
||||||
|
l.Debugln(f, "next rescan in", interval)
|
||||||
|
f.timer.Reset(interval)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *folderScanner) Scan(subdirs []string) error {
|
||||||
|
req := rescanRequest{
|
||||||
|
subdirs: subdirs,
|
||||||
|
err: make(chan error),
|
||||||
|
}
|
||||||
|
f.now <- req
|
||||||
|
return <-req.err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *folderScanner) Delay(next time.Duration) {
|
||||||
|
f.delay <- next
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *folderScanner) HasNoInterval() bool {
|
||||||
|
return f.interval == 0
|
||||||
|
}
|
||||||
@ -46,6 +46,13 @@ type stateTracker struct {
|
|||||||
changed time.Time
|
changed time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStateTracker(id string) stateTracker {
|
||||||
|
return stateTracker{
|
||||||
|
folderID: id,
|
||||||
|
mut: sync.NewMutex(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// setState sets the new folder state, for states other than FolderError.
|
// setState sets the new folder state, for states other than FolderError.
|
||||||
func (s *stateTracker) setState(newState folderState) {
|
func (s *stateTracker) setState(newState folderState) {
|
||||||
if newState == FolderError {
|
if newState == FolderError {
|
||||||
|
|||||||
@ -47,18 +47,18 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type service interface {
|
type service interface {
|
||||||
Serve()
|
|
||||||
Stop()
|
|
||||||
Jobs() ([]string, []string) // In progress, Queued
|
|
||||||
BringToFront(string)
|
BringToFront(string)
|
||||||
DelayScan(d time.Duration)
|
DelayScan(d time.Duration)
|
||||||
IndexUpdated() // Remote index was updated notification
|
IndexUpdated() // Remote index was updated notification
|
||||||
|
Jobs() ([]string, []string) // In progress, Queued
|
||||||
Scan(subs []string) error
|
Scan(subs []string) error
|
||||||
|
Serve()
|
||||||
|
Stop()
|
||||||
|
|
||||||
setState(state folderState)
|
|
||||||
setError(err error)
|
|
||||||
clearError()
|
|
||||||
getState() (folderState, time.Time, error)
|
getState() (folderState, time.Time, error)
|
||||||
|
setState(state folderState)
|
||||||
|
clearError()
|
||||||
|
setError(err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Availability struct {
|
type Availability struct {
|
||||||
@ -1431,10 +1431,10 @@ func (m *Model) ScanFolders() map[string]error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) ScanFolder(folder string) error {
|
func (m *Model) ScanFolder(folder string) error {
|
||||||
return m.ScanFolderSubs(folder, nil)
|
return m.ScanFolderSubdirs(folder, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) ScanFolderSubs(folder string, subs []string) error {
|
func (m *Model) ScanFolderSubdirs(folder string, subs []string) error {
|
||||||
m.fmut.Lock()
|
m.fmut.Lock()
|
||||||
runner, ok := m.folderRunners[folder]
|
runner, ok := m.folderRunners[folder]
|
||||||
m.fmut.Unlock()
|
m.fmut.Unlock()
|
||||||
@ -1449,13 +1449,13 @@ func (m *Model) ScanFolderSubs(folder string, subs []string) error {
|
|||||||
return runner.Scan(subs)
|
return runner.Scan(subs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
|
func (m *Model) internalScanFolderSubdirs(folder string, subDirs []string) error {
|
||||||
for i, sub := range subs {
|
for i, sub := range subDirs {
|
||||||
sub = osutil.NativeFilename(sub)
|
sub = osutil.NativeFilename(sub)
|
||||||
if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) {
|
if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) {
|
||||||
return errors.New("invalid subpath")
|
return errors.New("invalid subpath")
|
||||||
}
|
}
|
||||||
subs[i] = sub
|
subDirs[i] = sub
|
||||||
}
|
}
|
||||||
|
|
||||||
m.fmut.Lock()
|
m.fmut.Lock()
|
||||||
@ -1488,7 +1488,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
|
|||||||
// Clean the list of subitems to ensure that we start at a known
|
// Clean the list of subitems to ensure that we start at a known
|
||||||
// directory, and don't scan subdirectories of things we've already
|
// directory, and don't scan subdirectories of things we've already
|
||||||
// scanned.
|
// scanned.
|
||||||
subs = unifySubs(subs, func(f string) bool {
|
subDirs = unifySubs(subDirs, func(f string) bool {
|
||||||
_, ok := fs.Get(protocol.LocalDeviceID, f)
|
_, ok := fs.Get(protocol.LocalDeviceID, f)
|
||||||
return ok
|
return ok
|
||||||
})
|
})
|
||||||
@ -1503,7 +1503,7 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
|
|||||||
fchan, err := scanner.Walk(scanner.Config{
|
fchan, err := scanner.Walk(scanner.Config{
|
||||||
Folder: folderCfg.ID,
|
Folder: folderCfg.ID,
|
||||||
Dir: folderCfg.Path(),
|
Dir: folderCfg.Path(),
|
||||||
Subs: subs,
|
Subs: subDirs,
|
||||||
Matcher: ignores,
|
Matcher: ignores,
|
||||||
BlockSize: protocol.BlockSize,
|
BlockSize: protocol.BlockSize,
|
||||||
TempNamer: defTempNamer,
|
TempNamer: defTempNamer,
|
||||||
@ -1556,15 +1556,15 @@ func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error {
|
|||||||
m.updateLocalsFromScanning(folder, batch)
|
m.updateLocalsFromScanning(folder, batch)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(subs) == 0 {
|
if len(subDirs) == 0 {
|
||||||
// If we have no specific subdirectories to traverse, set it to one
|
// If we have no specific subdirectories to traverse, set it to one
|
||||||
// empty prefix so we traverse the entire folder contents once.
|
// empty prefix so we traverse the entire folder contents once.
|
||||||
subs = []string{""}
|
subDirs = []string{""}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Do a scan of the database for each prefix, to check for deleted files.
|
// Do a scan of the database for each prefix, to check for deleted files.
|
||||||
batch = batch[:0]
|
batch = batch[:0]
|
||||||
for _, sub := range subs {
|
for _, sub := range subDirs {
|
||||||
var iterError error
|
var iterError error
|
||||||
|
|
||||||
fs.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
|
fs.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
|
||||||
|
|||||||
@ -1402,7 +1402,7 @@ func TestIssue3028(t *testing.T) {
|
|||||||
|
|
||||||
os.Remove("testdata/testrm")
|
os.Remove("testdata/testrm")
|
||||||
os.Remove("testdata/testrm2")
|
os.Remove("testdata/testrm2")
|
||||||
m.ScanFolderSubs("default", []string{"testrm", "testrm2"})
|
m.ScanFolderSubdirs("default", []string{"testrm", "testrm2"})
|
||||||
|
|
||||||
// Verify that the number of files decreased by two and the number of
|
// Verify that the number of files decreased by two and the number of
|
||||||
// deleted files increases by two
|
// deleted files increases by two
|
||||||
|
|||||||
@ -8,10 +8,8 @@ package model
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/config"
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
"github.com/syncthing/syncthing/lib/sync"
|
|
||||||
"github.com/syncthing/syncthing/lib/versioner"
|
"github.com/syncthing/syncthing/lib/versioner"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -23,19 +21,11 @@ type roFolder struct {
|
|||||||
folder
|
folder
|
||||||
}
|
}
|
||||||
|
|
||||||
func newROFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner) service {
|
func newROFolder(model *Model, config config.FolderConfiguration, ver versioner.Versioner) service {
|
||||||
return &roFolder{
|
return &roFolder{
|
||||||
folder: folder{
|
folder: folder{
|
||||||
stateTracker: stateTracker{
|
stateTracker: newStateTracker(config.ID),
|
||||||
folderID: cfg.ID,
|
scan: newFolderScanner(config),
|
||||||
mut: sync.NewMutex(),
|
|
||||||
},
|
|
||||||
scan: folderscan{
|
|
||||||
interval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
|
||||||
timer: time.NewTimer(time.Millisecond),
|
|
||||||
now: make(chan rescanRequest),
|
|
||||||
delay: make(chan time.Duration),
|
|
||||||
},
|
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
model: model,
|
model: model,
|
||||||
},
|
},
|
||||||
@ -59,7 +49,7 @@ func (f *roFolder) Serve() {
|
|||||||
case <-f.scan.timer.C:
|
case <-f.scan.timer.C:
|
||||||
if err := f.model.CheckFolderHealth(f.folderID); err != nil {
|
if err := f.model.CheckFolderHealth(f.folderID); err != nil {
|
||||||
l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err)
|
l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err)
|
||||||
f.scan.reschedule()
|
f.scan.Reschedule()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -71,7 +61,7 @@ func (f *roFolder) Serve() {
|
|||||||
// the same one as returned by CheckFolderHealth, though
|
// the same one as returned by CheckFolderHealth, though
|
||||||
// duplicate set is handled by setError.
|
// duplicate set is handled by setError.
|
||||||
f.setError(err)
|
f.setError(err)
|
||||||
f.scan.reschedule()
|
f.scan.Reschedule()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -80,11 +70,11 @@ func (f *roFolder) Serve() {
|
|||||||
initialScanCompleted = true
|
initialScanCompleted = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if f.scan.interval == 0 {
|
if f.scan.HasNoInterval() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
f.scan.reschedule()
|
f.scan.Reschedule()
|
||||||
|
|
||||||
case req := <-f.scan.now:
|
case req := <-f.scan.now:
|
||||||
req.err <- f.scanSubdirsIfHealthy(req.subdirs)
|
req.err <- f.scanSubdirsIfHealthy(req.subdirs)
|
||||||
|
|||||||
@ -107,16 +107,8 @@ type rwFolder struct {
|
|||||||
func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner) service {
|
func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner) service {
|
||||||
f := &rwFolder{
|
f := &rwFolder{
|
||||||
folder: folder{
|
folder: folder{
|
||||||
stateTracker: stateTracker{
|
stateTracker: newStateTracker(cfg.ID),
|
||||||
folderID: cfg.ID,
|
scan: newFolderScanner(cfg),
|
||||||
mut: sync.NewMutex(),
|
|
||||||
},
|
|
||||||
scan: folderscan{
|
|
||||||
interval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
|
||||||
timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
|
||||||
now: make(chan rescanRequest),
|
|
||||||
delay: make(chan time.Duration),
|
|
||||||
},
|
|
||||||
stop: make(chan struct{}),
|
stop: make(chan struct{}),
|
||||||
model: model,
|
model: model,
|
||||||
},
|
},
|
||||||
@ -297,7 +289,7 @@ func (f *rwFolder) Serve() {
|
|||||||
// same time.
|
// same time.
|
||||||
case <-f.scan.timer.C:
|
case <-f.scan.timer.C:
|
||||||
err := f.scanSubdirsIfHealthy(nil)
|
err := f.scanSubdirsIfHealthy(nil)
|
||||||
f.scan.reschedule()
|
f.scan.Reschedule()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|||||||
@ -69,9 +69,7 @@ func setUpModel(file protocol.FileInfo) *Model {
|
|||||||
func setUpRwFolder(model *Model) rwFolder {
|
func setUpRwFolder(model *Model) rwFolder {
|
||||||
return rwFolder{
|
return rwFolder{
|
||||||
folder: folder{
|
folder: folder{
|
||||||
stateTracker: stateTracker{
|
stateTracker: newStateTracker("default"),
|
||||||
folderID: "default",
|
|
||||||
},
|
|
||||||
model: model,
|
model: model,
|
||||||
},
|
},
|
||||||
dir: "testdata",
|
dir: "testdata",
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user