From 2d9fcf6828b4470bbd0b2ba66d99f08321f83b1c Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Fri, 26 Jun 2015 13:31:30 +0200 Subject: [PATCH] Collect puller errors, send FolderErrors event --- internal/events/events.go | 3 ++ internal/model/rwfolder.go | 83 ++++++++++++++++++++++++++++++++- internal/model/rwfolder_test.go | 37 ++++++++++----- 3 files changed, 110 insertions(+), 13 deletions(-) diff --git a/internal/events/events.go b/internal/events/events.go index 8b293f40..05ef7b8e 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -35,6 +35,7 @@ const ( DownloadProgress FolderSummary FolderCompletion + FolderErrors AllEvents = (1 << iota) - 1 ) @@ -75,6 +76,8 @@ func (t EventType) String() string { return "FolderSummary" case FolderCompletion: return "FolderCompletion" + case FolderErrors: + return "FolderErrors" default: return "Unknown" } diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index e4468c5b..27d43c04 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -13,6 +13,7 @@ import ( "math/rand" "os" "path/filepath" + "sort" "time" "github.com/syncthing/protocol" @@ -92,6 +93,9 @@ type rwFolder struct { delayScan chan time.Duration scanNow chan rescanRequest remoteIndex chan struct{} // An index update was received, we should re-evaluate needs + + errors map[string]string // path -> error string + errorsMut sync.Mutex } func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { @@ -121,6 +125,8 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo delayScan: make(chan time.Duration), scanNow: make(chan rescanRequest), remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes. + + errorsMut: sync.NewMutex(), } } @@ -216,8 +222,11 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "pulling", prevVer, curVer) } + p.setState(FolderSyncing) + p.clearErrors() tries := 0 + for { tries++ @@ -256,10 +265,18 @@ func (p *rwFolder) Serve() { // we're not making it. Probably there are write // errors preventing us. Flag this with a warning and // wait a bit longer before retrying. - l.Warnf("Folder %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.folder, pauseIntv) + l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, pauseIntv) if debug { l.Debugln(p, "next pull in", pauseIntv) } + + if folderErrors := p.currentErrors(); len(folderErrors) > 0 { + events.Default.Log(events.FolderErrors, map[string]interface{}{ + "folder": p.folder, + "errors": folderErrors, + }) + } + p.pullTimer.Reset(pauseIntv) break } @@ -612,6 +629,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { err = osutil.InWritableDir(osutil.Remove, realName) if err != nil { l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) + p.newError(file.Name, err) return } fallthrough @@ -633,12 +651,14 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} } else { l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) + p.newError(file.Name, err) } return // Weird error when stat()'ing the dir. Probably won't work to do // anything else with it if we can't even stat() it. case err != nil: l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) + p.newError(file.Name, err) return } @@ -652,6 +672,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} } else { l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) + p.newError(file.Name, err) } } @@ -698,6 +719,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) { p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir} } else { l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err) + p.newError(file.Name, err) } } @@ -746,6 +768,7 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) { p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile} } else { l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err) + p.newError(file.Name, err) } } @@ -808,6 +831,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { err = p.shortcutFile(target) if err != nil { l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err) + p.newError(target.Name, err) return } @@ -820,6 +844,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { err = osutil.InWritableDir(osutil.Remove, from) if err != nil { l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err) + p.newError(target.Name, err) return } @@ -900,6 +925,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks if err != nil { l.Infoln("Puller: shortcut:", err) + p.newError(file.Name, err) } else { p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile} } @@ -988,6 +1014,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) error { if !p.ignorePermissions(file) { if err := os.Chmod(realName, os.FileMode(file.Flags&0777)); err != nil { l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", p.folder, file.Name, err) + p.newError(file.Name, err) return err } } @@ -998,6 +1025,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) error { info, err := os.Stat(realName) if err != nil { l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", p.folder, file.Name, err) + p.newError(file.Name, err) return err } @@ -1018,6 +1046,7 @@ func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) { err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags) if err != nil { l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err) + p.newError(file.Name, err) } return } @@ -1255,6 +1284,7 @@ func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) { if err != nil { l.Infoln("Puller: final:", err) + p.newError(state.file.Name, err) } events.Default.Log(events.ItemFinished, map[string]interface{}{ "folder": p.folder, @@ -1402,3 +1432,54 @@ func moveForConflict(name string) error { } return err } + +func (p *rwFolder) newError(path string, err error) { + p.errorsMut.Lock() + defer p.errorsMut.Unlock() + + // We might get more than one error report for a file (i.e. error on + // Write() followed by Close()); we keep the first error as that is + // probably closer to the root cause. + if _, ok := p.errors[path]; ok { + return + } + + p.errors[path] = err.Error() +} + +func (p *rwFolder) clearErrors() { + p.errorsMut.Lock() + p.errors = make(map[string]string) + p.errorsMut.Unlock() +} + +func (p *rwFolder) currentErrors() []fileError { + p.errorsMut.Lock() + errors := make([]fileError, 0, len(p.errors)) + for path, err := range p.errors { + errors = append(errors, fileError{path, err}) + } + sort.Sort(fileErrorList(errors)) + p.errorsMut.Unlock() + return errors +} + +// A []fileError is sent as part of an event and will be JSON serialized. +type fileError struct { + Path string `json:"path"` + Err string `json:"error"` +} + +type fileErrorList []fileError + +func (l fileErrorList) Len() int { + return len(l) +} + +func (l fileErrorList) Less(a, b int) bool { + return l[a].Path < l[b].Path +} + +func (l fileErrorList) Swap(a, b int) { + l[a], l[b] = l[b], l[a] +} diff --git a/internal/model/rwfolder_test.go b/internal/model/rwfolder_test.go index 53df1e3a..2f7aaa11 100644 --- a/internal/model/rwfolder_test.go +++ b/internal/model/rwfolder_test.go @@ -14,6 +14,7 @@ import ( "github.com/syncthing/protocol" "github.com/syncthing/syncthing/internal/scanner" + "github.com/syncthing/syncthing/internal/sync" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/storage" @@ -73,9 +74,11 @@ func TestHandleFile(t *testing.T) { m.updateLocals("default", []protocol.FileInfo{existingFile}) p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, + folder: "default", + dir: "testdata", + model: m, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } copyChan := make(chan copyBlocksState, 1) @@ -127,9 +130,11 @@ func TestHandleFileWithTemp(t *testing.T) { m.updateLocals("default", []protocol.FileInfo{existingFile}) p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, + folder: "default", + dir: "testdata", + model: m, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } copyChan := make(chan copyBlocksState, 1) @@ -198,9 +203,11 @@ func TestCopierFinder(t *testing.T) { } p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, + folder: "default", + dir: "testdata", + model: m, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } copyChan := make(chan copyBlocksState) @@ -332,9 +339,11 @@ func TestLastResortPulling(t *testing.T) { } p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, + folder: "default", + dir: "testdata", + model: m, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } copyChan := make(chan copyBlocksState) @@ -390,6 +399,8 @@ func TestDeregisterOnFailInCopy(t *testing.T) { model: m, queue: newJobQueue(), progressEmitter: emitter, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } // queue.Done should be called by the finisher routine @@ -477,6 +488,8 @@ func TestDeregisterOnFailInPull(t *testing.T) { model: m, queue: newJobQueue(), progressEmitter: emitter, + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } // queue.Done should be called by the finisher routine