all: Revert simultaneously walk fs and db on scan (fixes #4756) (#4757)

This reverts commit 6d3f9d5154.
This commit is contained in:
Simon Frei
2018-02-14 08:59:46 +01:00
committed by Jakob Borg
parent b57c9b6af5
commit 68c1b2dd47
9 changed files with 261 additions and 680 deletions

View File

@@ -8,9 +8,7 @@ package scanner
import (
"context"
"errors"
"runtime"
"strings"
"sync/atomic"
"time"
"unicode/utf8"
@@ -50,8 +48,8 @@ type Config struct {
Matcher *ignore.Matcher
// Number of hours to keep temporary files for
TempLifetime time.Duration
// Walks over file infos as present in the db before the scan alphabetically.
Have haveWalker
// If CurrentFiler is not nil, it is queried for the current file before rescanning.
CurrentFiler CurrentFiler
// The Filesystem provides an abstraction on top of the actual filesystem.
Filesystem fs.Filesystem
// If IgnorePerms is true, changes to permission bits will not be
@@ -72,28 +70,16 @@ type Config struct {
UseWeakHashes bool
}
type haveWalker interface {
// Walk passes all local file infos from the db which start with prefix
// to out and aborts early if ctx is cancelled.
Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo)
type CurrentFiler interface {
// CurrentFile returns the file as seen at last scan.
CurrentFile(name string) (protocol.FileInfo, bool)
}
type fsWalkResult struct {
path string
info fs.FileInfo
err error
}
type ScanResult struct {
New protocol.FileInfo
Old protocol.FileInfo
}
func Walk(ctx context.Context, cfg Config) chan ScanResult {
func Walk(ctx context.Context, cfg Config) chan protocol.FileInfo {
w := walker{cfg}
if w.Have == nil {
w.Have = noHaveWalker{}
if w.CurrentFiler == nil {
w.CurrentFiler = noCurrentFiler{}
}
if w.Filesystem == nil {
panic("no filesystem specified")
@@ -111,19 +97,25 @@ type walker struct {
// Walk returns the list of files found in the local folder by scanning the
// file system. Files are blockwise hashed.
func (w *walker) walk(ctx context.Context) chan ScanResult {
func (w *walker) walk(ctx context.Context) chan protocol.FileInfo {
l.Debugln("Walk", w.Subs, w.BlockSize, w.Matcher)
haveChan := make(chan protocol.FileInfo)
haveCtx, haveCancel := context.WithCancel(ctx)
go w.dbWalkerRoutine(haveCtx, haveChan)
toHashChan := make(chan protocol.FileInfo)
finishedChan := make(chan protocol.FileInfo)
fsChan := make(chan fsWalkResult)
go w.fsWalkerRoutine(ctx, fsChan, haveCancel)
toHashChan := make(chan ScanResult)
finishedChan := make(chan ScanResult)
go w.processWalkResults(ctx, fsChan, haveChan, toHashChan, finishedChan)
// A routine which walks the filesystem tree, and sends files which have
// been modified to the counter routine.
go func() {
hashFiles := w.walkAndHashFiles(ctx, toHashChan, finishedChan)
if len(w.Subs) == 0 {
w.Filesystem.Walk(".", hashFiles)
} else {
for _, sub := range w.Subs {
w.Filesystem.Walk(sub, hashFiles)
}
}
close(toHashChan)
}()
// We're not required to emit scan progress events, just kick off hashers,
// and feed inputs directly from the walker.
@@ -147,15 +139,15 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
// Parallel hasher is stopped by this routine when we close the channel over
// which it receives the files we ask it to hash.
go func() {
var filesToHash []ScanResult
var filesToHash []protocol.FileInfo
var total int64 = 1
for file := range toHashChan {
filesToHash = append(filesToHash, file)
total += file.New.Size
total += file.Size
}
realToHashChan := make(chan ScanResult)
realToHashChan := make(chan protocol.FileInfo)
done := make(chan struct{})
progress := newByteCounter()
@@ -191,7 +183,7 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
loop:
for _, file := range filesToHash {
l.Debugln("real to hash:", file.New.Name)
l.Debugln("real to hash:", file.Name)
select {
case realToHashChan <- file:
case <-ctx.Done():
@@ -204,49 +196,15 @@ func (w *walker) walk(ctx context.Context) chan ScanResult {
return finishedChan
}
// dbWalkerRoutine walks the db and sends back file infos to be compared to scan results.
func (w *walker) dbWalkerRoutine(ctx context.Context, haveChan chan<- protocol.FileInfo) {
defer close(haveChan)
if len(w.Subs) == 0 {
w.Have.Walk("", ctx, haveChan)
return
}
for _, sub := range w.Subs {
select {
case <-ctx.Done():
return
default:
}
w.Have.Walk(sub, ctx, haveChan)
}
}
// fsWalkerRoutine walks the filesystem tree and sends back file infos and potential
// errors at paths that need to be processed.
func (w *walker) fsWalkerRoutine(ctx context.Context, fsChan chan<- fsWalkResult, haveCancel context.CancelFunc) {
defer close(fsChan)
walkFn := w.createFSWalkFn(ctx, fsChan)
if len(w.Subs) == 0 {
if err := w.Filesystem.Walk(".", walkFn); err != nil {
haveCancel()
}
return
}
for _, sub := range w.Subs {
if err := w.Filesystem.Walk(sub, walkFn); err != nil {
haveCancel()
break
}
}
}
func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult) fs.WalkFunc {
func (w *walker) walkAndHashFiles(ctx context.Context, fchan, dchan chan protocol.FileInfo) fs.WalkFunc {
now := time.Now()
return func(path string, info fs.FileInfo, err error) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Return value used when we are returning early and don't want to
// process the item. For directories, this means do-not-descend.
var skip error // nil
@@ -256,9 +214,7 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult)
}
if err != nil {
if sendErr := fsWalkError(ctx, fsChan, path, err); sendErr != nil {
return sendErr
}
l.Debugln("error:", path, info, err)
return skip
}
@@ -266,6 +222,12 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult)
return nil
}
info, err = w.Filesystem.Lstat(path)
// An error here would be weird as we've already gotten to this point, but act on it nonetheless
if err != nil {
return skip
}
if fs.IsTemporary(path) {
l.Debugln("temporary:", path)
if info.IsRegular() && info.ModTime().Add(w.TempLifetime).Before(now) {
@@ -276,170 +238,48 @@ func (w *walker) createFSWalkFn(ctx context.Context, fsChan chan<- fsWalkResult)
}
if fs.IsInternal(path) {
l.Debugln("skip walking (internal):", path)
l.Debugln("ignored (internal):", path)
return skip
}
if w.Matcher.Match(path).IsIgnored() {
l.Debugln("skip walking (patterns):", path)
l.Debugln("ignored (patterns):", path)
return skip
}
if !utf8.ValidString(path) {
if err := fsWalkError(ctx, fsChan, path, errors.New("path isn't a valid utf8 string")); err != nil {
return err
}
l.Warnf("File name %q is not in UTF8 encoding; skipping.", path)
return skip
}
path, shouldSkip := w.normalizePath(path, info)
if shouldSkip {
if err := fsWalkError(ctx, fsChan, path, errors.New("failed to normalize path")); err != nil {
return skip
}
switch {
case info.IsSymlink():
if err := w.walkSymlink(ctx, path, dchan); err != nil {
return err
}
return skip
}
if info.IsDir() {
// under no circumstances shall we descend into a symlink
return fs.SkipDir
}
return nil
select {
case fsChan <- fsWalkResult{
path: path,
info: info,
err: nil,
}:
case <-ctx.Done():
return ctx.Err()
}
case info.IsDir():
err = w.walkDir(ctx, path, info, dchan)
// under no circumstances shall we descend into a symlink
if info.IsSymlink() && info.IsDir() {
l.Debugln("skip walking (symlinked directory):", path)
return skip
case info.IsRegular():
err = w.walkRegular(ctx, path, info, fchan)
}
return err
}
}
func fsWalkError(ctx context.Context, dst chan<- fsWalkResult, path string, err error) error {
select {
case dst <- fsWalkResult{
path: path,
info: nil,
err: err,
}:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (w *walker) processWalkResults(ctx context.Context, fsChan <-chan fsWalkResult, haveChan <-chan protocol.FileInfo, toHashChan, finishedChan chan<- ScanResult) {
ctxChan := ctx.Done()
fsRes, fsChanOpen := <-fsChan
currDBFile, haveChanOpen := <-haveChan
for fsChanOpen {
if haveChanOpen {
// File infos below an error walking the filesystem tree
// may be marked as ignored but should not be deleted.
if fsRes.err != nil && (strings.HasPrefix(currDBFile.Name, fsRes.path+string(fs.PathSeparator)) || fsRes.path == ".") {
w.checkIgnoredAndInvalidate(currDBFile, finishedChan, ctxChan)
currDBFile, haveChanOpen = <-haveChan
continue
}
// Delete file infos that were not encountered when
// walking the filesystem tree, except on error (see
// above) or if they are ignored.
if currDBFile.Name < fsRes.path {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
currDBFile, haveChanOpen = <-haveChan
continue
}
}
var oldFile protocol.FileInfo
if haveChanOpen && currDBFile.Name == fsRes.path {
oldFile = currDBFile
currDBFile, haveChanOpen = <-haveChan
}
if fsRes.err != nil {
if fs.IsNotExist(fsRes.err) && !oldFile.IsEmpty() && !oldFile.Deleted {
select {
case finishedChan <- ScanResult{
New: oldFile.DeletedCopy(w.ShortID),
Old: oldFile,
}:
case <-ctx.Done():
return
}
}
fsRes, fsChanOpen = <-fsChan
continue
}
switch {
case fsRes.info.IsDir():
w.walkDir(ctx, fsRes.path, fsRes.info, oldFile, finishedChan)
case fsRes.info.IsSymlink():
w.walkSymlink(ctx, fsRes.path, oldFile, finishedChan)
case fsRes.info.IsRegular():
w.walkRegular(ctx, fsRes.path, fsRes.info, oldFile, toHashChan)
}
fsRes, fsChanOpen = <-fsChan
}
// Filesystem tree walking finished, if there is anything left in the
// db, mark it as deleted, except when it's ignored.
if haveChanOpen {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
for currDBFile = range haveChan {
w.checkIgnoredAndDelete(currDBFile, finishedChan, ctxChan)
}
}
close(toHashChan)
}
func (w *walker) checkIgnoredAndDelete(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) {
if w.checkIgnoredAndInvalidate(f, finishedChan, done) {
return
}
if !f.Invalid && !f.Deleted {
select {
case finishedChan <- ScanResult{
New: f.DeletedCopy(w.ShortID),
Old: f,
}:
case <-done:
}
}
}
func (w *walker) checkIgnoredAndInvalidate(f protocol.FileInfo, finishedChan chan<- ScanResult, done <-chan struct{}) bool {
if !w.Matcher.Match(f.Name).IsIgnored() {
return false
}
if !f.Invalid {
select {
case finishedChan <- ScanResult{
New: f.InvalidatedCopy(w.ShortID),
Old: f,
}:
case <-done:
}
}
return true
}
func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, toHashChan chan<- ScanResult) {
func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileInfo, fchan chan protocol.FileInfo) error {
curMode := uint32(info.Mode())
if runtime.GOOS == "windows" && osutil.IsWindowsExecutable(relPath) {
curMode |= 0111
@@ -454,38 +294,40 @@ func (w *walker) walkRegular(ctx context.Context, relPath string, info fs.FileIn
// - was not a symlink (since it's a file now)
// - was not invalid (since it looks valid now)
// - has the same size as previously
if !cf.IsEmpty() {
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode)
if permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() &&
!cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() {
return
}
cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, curMode)
if ok && permUnchanged && !cf.IsDeleted() && cf.ModTime().Equal(info.ModTime()) && !cf.IsDirectory() &&
!cf.IsSymlink() && !cf.IsInvalid() && cf.Size == info.Size() {
return nil
}
if ok {
l.Debugln("rescan:", cf, info.ModTime().Unix(), info.Mode()&fs.ModePerm)
}
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeFile,
Version: cf.Version.Update(w.ShortID),
Permissions: curMode & uint32(maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
Size: info.Size(),
},
Old: cf,
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeFile,
Version: cf.Version.Update(w.ShortID),
Permissions: curMode & uint32(maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
Size: info.Size(),
}
l.Debugln("to hash:", relPath, f)
select {
case toHashChan <- f:
case fchan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, cf protocol.FileInfo, finishedChan chan<- ScanResult) {
func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo, dchan chan protocol.FileInfo) error {
// A directory is "unchanged", if it
// - exists
// - has the same permissions as previously, unless we are ignoring permissions
@@ -493,41 +335,40 @@ func (w *walker) walkDir(ctx context.Context, relPath string, info fs.FileInfo,
// - was a directory previously (not a file or something else)
// - was not a symlink (since it's a directory now)
// - was not invalid (since it looks valid now)
if !cf.IsEmpty() {
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode()))
if permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() {
return
}
cf, ok := w.CurrentFiler.CurrentFile(relPath)
permUnchanged := w.IgnorePerms || !cf.HasPermissionBits() || PermsEqual(cf.Permissions, uint32(info.Mode()))
if ok && permUnchanged && !cf.IsDeleted() && cf.IsDirectory() && !cf.IsSymlink() && !cf.IsInvalid() {
return nil
}
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeDirectory,
Version: cf.Version.Update(w.ShortID),
Permissions: uint32(info.Mode() & maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
},
Old: cf,
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeDirectory,
Version: cf.Version.Update(w.ShortID),
Permissions: uint32(info.Mode() & maskModePerm),
NoPermissions: w.IgnorePerms,
ModifiedS: info.ModTime().Unix(),
ModifiedNs: int32(info.ModTime().Nanosecond()),
ModifiedBy: w.ShortID,
}
l.Debugln("dir:", relPath, f)
select {
case finishedChan <- f:
case dchan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// walkSymlink returns nil or an error, if the error is of the nature that
// it should stop the entire walk.
func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.FileInfo, finishedChan chan<- ScanResult) {
func (w *walker) walkSymlink(ctx context.Context, relPath string, dchan chan protocol.FileInfo) error {
// Symlinks are not supported on Windows. We ignore instead of returning
// an error.
if runtime.GOOS == "windows" {
return
return nil
}
// We always rehash symlinks as they have no modtime or
@@ -538,7 +379,7 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.Fi
target, err := w.Filesystem.ReadSymlink(relPath)
if err != nil {
l.Debugln("readlink error:", relPath, err)
return
return nil
}
// A symlink is "unchanged", if
@@ -547,27 +388,28 @@ func (w *walker) walkSymlink(ctx context.Context, relPath string, cf protocol.Fi
// - it was a symlink
// - it wasn't invalid
// - the target was the same
if !cf.IsEmpty() && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target {
return
cf, ok := w.CurrentFiler.CurrentFile(relPath)
if ok && !cf.IsDeleted() && cf.IsSymlink() && !cf.IsInvalid() && cf.SymlinkTarget == target {
return nil
}
f := ScanResult{
New: protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeSymlink,
Version: cf.Version.Update(w.ShortID),
NoPermissions: true, // Symlinks don't have permissions of their own
SymlinkTarget: target,
},
Old: cf,
f := protocol.FileInfo{
Name: relPath,
Type: protocol.FileInfoTypeSymlink,
Version: cf.Version.Update(w.ShortID),
NoPermissions: true, // Symlinks don't have permissions of their own
SymlinkTarget: target,
}
l.Debugln("symlink changedb:", relPath, f)
select {
case finishedChan <- f:
case dchan <- f:
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// normalizePath returns the normalized relative path (possibly after fixing
@@ -690,6 +532,10 @@ func (c *byteCounter) Close() {
close(c.stop)
}
type noHaveWalker struct{}
// A no-op CurrentFiler
func (noHaveWalker) Walk(prefix string, ctx context.Context, out chan<- protocol.FileInfo) {}
type noCurrentFiler struct{}
func (noCurrentFiler) CurrentFile(name string) (protocol.FileInfo, bool) {
return protocol.FileInfo{}, false
}