lib/model: Improve scan scheduling and dir del during pull (fixes #4475 #4476)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4493
This commit is contained in:
Simon Frei
2017-12-07 08:42:03 +00:00
parent 47429d01e8
commit cce634f340
4 changed files with 355 additions and 137 deletions

View File

@@ -60,6 +60,9 @@ var (
activity = newDeviceActivity()
errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
errSymlinksUnsupported = errors.New("symlinks not supported")
errDirHasToBeScanned = errors.New("directory contains unexpected files, scheduling scan")
errDirHasIgnored = errors.New("directory contains ignored files (see ignore documentation for (?d) prefix)")
errDirNotEmpty = errors.New("directory is not empty")
)
const (
@@ -92,7 +95,6 @@ type sendReceiveFolder struct {
pause time.Duration
queue *jobQueue
dbUpdates chan dbUpdateJob
pullScheduled chan struct{}
errors map[string]string // path -> error string
@@ -255,13 +257,17 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
f.setState(FolderSyncing)
f.clearErrors()
scanChan := make(chan string)
go f.pullScannerRoutine(scanChan)
var changed int
tries := 0
for {
tries++
changed := f.pullerIteration(curIgnores, ignoresChanged)
changed = f.pullerIteration(curIgnores, ignoresChanged, scanChan)
l.Debugln(f, "changed", changed)
if changed == 0 {
@@ -294,6 +300,8 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
f.setState(FolderIdle)
close(scanChan)
if changed == 0 {
return curIgnoreHash, true
}
@@ -305,23 +313,23 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
// returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently
// flagged as needed in the folder.
func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChanged bool) int {
func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChanged bool, scanChan chan<- string) int {
pullChan := make(chan pullBlockState)
copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState)
dbUpdateChan := make(chan dbUpdateJob)
updateWg := sync.NewWaitGroup()
copyWg := sync.NewWaitGroup()
pullWg := sync.NewWaitGroup()
copyWg := sync.NewWaitGroup()
doneWg := sync.NewWaitGroup()
updateWg := sync.NewWaitGroup()
l.Debugln(f, "c", f.Copiers, "p", f.Pullers)
f.dbUpdates = make(chan dbUpdateJob)
updateWg.Add(1)
go func() {
// dbUpdaterRoutine finishes when f.dbUpdates is closed
f.dbUpdaterRoutine()
// dbUpdaterRoutine finishes when dbUpdateChan is closed
f.dbUpdaterRoutine(dbUpdateChan)
updateWg.Done()
}()
@@ -346,7 +354,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
go func() {
f.finisherRoutine(finisherChan)
f.finisherRoutine(ignores, finisherChan, dbUpdateChan, scanChan)
doneWg.Done()
}()
@@ -388,7 +396,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
case ignores.ShouldIgnore(file.Name):
file.Invalidate(f.model.id.Short())
l.Debugln(f, "Handling ignored file", file)
f.dbUpdates <- dbUpdateJob{file, dbUpdateInvalidate}
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
case file.IsDeleted():
processDirectly = append(processDirectly, file)
@@ -411,7 +419,7 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
case runtime.GOOS == "windows" && file.IsSymlink():
file.Invalidate(f.model.id.Short())
l.Debugln(f, "Invalidating symlink (unsupported)", file.Name)
f.dbUpdates <- dbUpdateJob{file, dbUpdateInvalidate}
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
default:
// Directories, symlinks
@@ -464,11 +472,12 @@ func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher, ignoresChan
case fi.IsDirectory() && !fi.IsSymlink():
l.Debugln(f, "Handling directory", fi.Name)
f.handleDir(fi)
f.handleDir(fi, dbUpdateChan)
case fi.IsSymlink():
l.Debugln("Handling symlink", fi.Name)
l.Debugln(f, "Handling symlink", fi.Name)
f.handleSymlink(fi)
f.handleSymlink(fi, dbUpdateChan)
default:
l.Warnln(fi)
@@ -523,13 +532,33 @@ nextFile:
continue
}
dirName := filepath.Dir(fi.Name)
// Verify that the thing we are handling lives inside a directory,
// and not a symlink or empty space.
if err := osutil.TraversesSymlink(f.fs, filepath.Dir(fi.Name)); err != nil {
if err := osutil.TraversesSymlink(f.fs, dirName); err != nil {
f.newError("traverses q", fi.Name, err)
continue
}
// issues #114 and #4475: This works around a race condition
// between two devices, when one device removes a directory and the
// other creates a file in it. However that happens, we end up with
// a directory for "foo" with the delete bit, but a file "foo/bar"
// that we want to sync. We never create the directory, and hence
// fail to create the file and end up looping forever on it. This
// breaks that by creating the directory and scheduling a scan,
// where it will be found and the delete bit on it removed. The
// user can then clean up as they like...
if _, err := f.fs.Lstat(dirName); fs.IsNotExist(err) {
l.Debugln("%v resurrecting parent directory of %v", f, fi.Name)
if err := f.fs.MkdirAll(dirName, 0755); err != nil {
f.newError("resurrecting parent dir", fi.Name, err)
continue
}
scanChan <- dirName
}
// Check our list of files to be removed for a match, in which case
// we can just do a rename instead.
key := string(fi.Blocks[0].Hash)
@@ -547,7 +576,7 @@ nextFile:
// Remove the pending deletion (as we perform it by renaming)
delete(fileDeletions, candidate.Name)
f.renameFile(desired, fi)
f.renameFile(desired, fi, dbUpdateChan)
f.queue.Done(fileName)
continue nextFile
@@ -555,7 +584,7 @@ nextFile:
}
// Handle the file normally, by coping and pulling, etc.
f.handleFile(fi, copyChan, finisherChan)
f.handleFile(fi, copyChan, finisherChan, dbUpdateChan)
}
// Signal copy and puller routines that we are done with the in data for
@@ -573,24 +602,24 @@ nextFile:
for _, file := range fileDeletions {
l.Debugln(f, "Deleting file", file.Name)
f.deleteFile(file)
f.deleteFile(file, dbUpdateChan)
}
for i := range dirDeletions {
dir := dirDeletions[len(dirDeletions)-i-1]
l.Debugln(f, "Deleting dir", dir.Name)
f.deleteDir(dir, ignores)
f.handleDeleteDir(dir, ignores, dbUpdateChan, scanChan)
}
// Wait for db updates to complete
close(f.dbUpdates)
// Wait for db updates and scan scheduling to complete
close(dbUpdateChan)
updateWg.Wait()
return changed
}
// handleDir creates or updates the given directory
func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
@@ -658,7 +687,7 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
}
if err = osutil.InWritableDir(mkdir, f.fs, file.Name); err == nil {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
} else {
f.newError("dir mkdir", file.Name, err)
}
@@ -674,16 +703,16 @@ func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
// don't handle modification times on directories, because that sucks...)
// It's OK to change mode bits on stuff within non-writable directories.
if f.ignorePermissions(file) {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
} else if err := f.fs.Chmod(file.Name, mode|(fs.FileMode(info.Mode())&retainBits)); err == nil {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
} else {
f.newError("dir chmod", file.Name, err)
}
}
// handleSymlink creates or updates the given symlink
func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo) {
func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
@@ -736,14 +765,14 @@ func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo) {
}
if err = osutil.InWritableDir(createLink, f.fs, file.Name); err == nil {
f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleSymlink}
dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleSymlink}
} else {
f.newError("symlink create", file.Name, err)
}
}
// deleteDir attempts to delete the given directory
func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) {
// handleDeleteDir attempts to remove a directory that was deleted on a remote
func (f *sendReceiveFolder) handleDeleteDir(file protocol.FileInfo, ignores *ignore.Matcher, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
@@ -765,33 +794,18 @@ func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Ma
})
}()
// Delete any temporary files lying around in the directory
err = f.deleteDir(file.Name, ignores, scanChan)
files, _ := f.fs.DirNames(file.Name)
for _, dirFile := range files {
fullDirFile := filepath.Join(file.Name, dirFile)
if fs.IsTemporary(dirFile) || (matcher != nil && matcher.Match(fullDirFile).IsDeletable()) {
f.fs.RemoveAll(fullDirFile)
}
}
err = osutil.InWritableDir(f.fs.Remove, f.fs, file.Name)
if err == nil || fs.IsNotExist(err) {
// It was removed or it doesn't exist to start with
f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
} else if _, serr := f.fs.Lstat(file.Name); serr != nil && !fs.IsPermission(serr) {
// We get an error just looking at the directory, and it's not a
// permission problem. Lets assume the error is in fact some variant
// of "file does not exist" (possibly expressed as some parent being a
// file and not a directory etc) and that the delete is handled.
f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
} else {
if err != nil {
f.newError("delete dir", file.Name, err)
return
}
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
}
// deleteFile attempts to delete the given file
func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo) {
func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
@@ -830,13 +844,13 @@ func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo) {
if err == nil || fs.IsNotExist(err) {
// It was removed or it doesn't exist to start with
f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
} else if _, serr := f.fs.Lstat(file.Name); serr != nil && !fs.IsPermission(serr) {
// We get an error just looking at the file, and it's not a permission
// problem. Lets assume the error is in fact some variant of "file
// does not exist" (possibly expressed as some parent being a file and
// not a directory etc) and that the delete is handled.
f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
} else {
f.newError("delete file", file.Name, err)
}
@@ -844,7 +858,7 @@ func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo) {
// renameFile attempts to rename an existing file to a destination
// and set the right attributes on it.
func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
// Used in the defer closure below, updated by the function body. Take
// care not declare another err.
var err error
@@ -900,7 +914,7 @@ func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
// of the source and the creation of the target. Fix-up the metadata,
// and update the local index of the target file.
f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
dbUpdateChan <- dbUpdateJob{source, dbUpdateDeleteFile}
err = f.shortcutFile(target)
if err != nil {
@@ -909,7 +923,7 @@ func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
return
}
f.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
dbUpdateChan <- dbUpdateJob{target, dbUpdateHandleFile}
} else {
// We failed the rename so we have a source file that we still need to
// get rid of. Attempt to delete it instead so that we make *some*
@@ -922,7 +936,7 @@ func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
return
}
f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
dbUpdateChan <- dbUpdateJob{source, dbUpdateDeleteFile}
}
}
@@ -962,7 +976,7 @@ func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
// handleFile queues the copies and pulls as necessary for a single new or
// changed file.
func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState, dbUpdateChan chan<- dbUpdateJob) {
curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name)
have, need := scanner.BlockDiff(curFile.Blocks, file.Blocks)
@@ -994,7 +1008,7 @@ func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- c
if err != nil {
f.newError("shortcut", file.Name, err)
} else {
f.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
dbUpdateChan <- dbUpdateJob{file, dbUpdateShortcutFile}
}
return
@@ -1353,7 +1367,7 @@ func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *
}
}
func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
func (f *sendReceiveFolder) performFinish(ignores *ignore.Matcher, state *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
// Set the correct permission bits on the new file
if !f.ignorePermissions(state.file) {
if err := f.fs.Chmod(state.tempName, fs.FileMode(state.file.Permissions&0777)); err != nil {
@@ -1407,14 +1421,7 @@ func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
}
if changed {
// Scan() is synchronous (i.e. blocks until the scan is
// completed and returns an error), but a scan can't happen
// while we're in the puller routine. Request the scan in the
// background and it'll be handled when the current pulling
// sweep is complete. As we do retries, we'll queue the scan
// for this file up to ten times, but the last nine of those
// scans will be cheap...
go f.Scan([]string{state.curFile.Name})
scanChan <- state.curFile.Name
return fmt.Errorf("file modified but not rescanned; will try again later")
}
@@ -1424,11 +1431,7 @@ func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
// archived for conflicts, only removed (which of course fails for
// non-empty directories).
// TODO: This is the place where we want to remove temporary files
// and future hard ignores before attempting a directory delete.
// Should share code with f.deletDir().
if err = osutil.InWritableDir(f.fs.Remove, f.fs, state.file.Name); err != nil {
if err = f.deleteDir(state.file.Name, ignores, scanChan); err != nil {
return err
}
@@ -1467,11 +1470,11 @@ func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
f.fs.Chtimes(state.file.Name, state.file.ModTime(), state.file.ModTime()) // never fails
// Record the updated file in the index
f.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
dbUpdateChan <- dbUpdateJob{state.file, dbUpdateHandleFile}
return nil
}
func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState) {
func (f *sendReceiveFolder) finisherRoutine(ignores *ignore.Matcher, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
for state := range in {
if closed, err := state.finalClose(); closed {
l.Debugln(f, "closing", state.file.Name)
@@ -1479,7 +1482,7 @@ func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState) {
f.queue.Done(state.file.Name)
if err == nil {
err = f.performFinish(state)
err = f.performFinish(ignores, state, dbUpdateChan, scanChan)
}
if err != nil {
@@ -1523,7 +1526,7 @@ func (f *sendReceiveFolder) Jobs() ([]string, []string) {
// dbUpdaterRoutine aggregates db updates and commits them in batches no
// larger than 1000 items, and no more delayed than 2 seconds.
func (f *sendReceiveFolder) dbUpdaterRoutine() {
func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
const maxBatchTime = 2 * time.Second
batch := make([]dbUpdateJob, 0, maxBatchSizeFiles)
@@ -1592,7 +1595,7 @@ func (f *sendReceiveFolder) dbUpdaterRoutine() {
loop:
for {
select {
case job, ok := <-f.dbUpdates:
case job, ok := <-dbUpdateChan:
if !ok {
break loop
}
@@ -1619,6 +1622,25 @@ loop:
}
}
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
// scheduled once when scanChan is closed (scanning can not happen during pulling).
func (f *sendReceiveFolder) pullScannerRoutine(scanChan <-chan string) {
toBeScanned := make(map[string]struct{})
for path := range scanChan {
toBeScanned[path] = struct{}{}
}
if len(toBeScanned) != 0 {
scanList := make([]string, 0, len(toBeScanned))
for path := range toBeScanned {
l.Debugln(f, "scheduling scan after pulling for", path)
scanList = append(scanList, path)
}
f.Scan(scanList)
}
}
func (f *sendReceiveFolder) inConflict(current, replacement protocol.Vector) bool {
if current.Concurrent(replacement) {
// Obvious case
@@ -1732,6 +1754,66 @@ func (f *sendReceiveFolder) IgnoresUpdated() {
f.SchedulePull()
}
// deleteDir attempts to delete a directory. It checks for files/dirs inside
// the directory and removes them if possible or returns an error if it fails
func (f *sendReceiveFolder) deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error {
files, _ := f.fs.DirNames(dir)
toBeDeleted := make([]string, 0, len(files))
hasIgnored := false
hasKnown := false
hasToBeScanned := false
for _, dirFile := range files {
fullDirFile := filepath.Join(dir, dirFile)
if fs.IsTemporary(dirFile) || ignores.Match(fullDirFile).IsDeletable() {
toBeDeleted = append(toBeDeleted, fullDirFile)
} else if ignores != nil && ignores.Match(fullDirFile).IsIgnored() {
hasIgnored = true
} else if cf, ok := f.model.CurrentFolderFile(f.ID, fullDirFile); !ok || cf.IsDeleted() || cf.IsInvalid() {
// Something appeared in the dir that we either are not
// aware of at all, that we think should be deleted or that
// is invalid, but not currently ignored -> schedule scan
scanChan <- fullDirFile
hasToBeScanned = true
} else {
// Dir contains file that is valid according to db and
// not ignored -> something weird is going on
hasKnown = true
}
}
if hasToBeScanned {
return errDirHasToBeScanned
}
if hasIgnored {
return errDirHasIgnored
}
if hasKnown {
return errDirNotEmpty
}
for _, del := range toBeDeleted {
f.fs.RemoveAll(del)
}
err := osutil.InWritableDir(f.fs.Remove, f.fs, dir)
if err == nil || fs.IsNotExist(err) {
// It was removed or it doesn't exist to start with
return nil
}
if _, serr := f.fs.Lstat(dir); serr != nil && !fs.IsPermission(serr) {
// We get an error just looking at the directory, and it's not a
// permission problem. Lets assume the error is in fact some variant
// of "file does not exist" (possibly expressed as some parent being a
// file and not a directory etc) and that the delete is handled.
return nil
}
return err
}
// A []fileError is sent as part of an event and will be JSON serialized.
type fileError struct {
Path string `json:"path"`