diff --git a/lib/model/fakeconns_test.go b/lib/model/fakeconns_test.go index aa254dd1..b19939b5 100644 --- a/lib/model/fakeconns_test.go +++ b/lib/model/fakeconns_test.go @@ -33,7 +33,7 @@ type fakeConnection struct { folder string model *model indexFn func(string, []protocol.FileInfo) - requestFn func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) + requestFn func(ctx context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) closeFn func(error) mut sync.Mutex } @@ -82,11 +82,11 @@ func (f *fakeConnection) IndexUpdate(folder string, fs []protocol.FileInfo) erro return nil } -func (f *fakeConnection) Request(folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { +func (f *fakeConnection) Request(ctx context.Context, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { f.mut.Lock() defer f.mut.Unlock() if f.requestFn != nil { - return f.requestFn(folder, name, offset, size, hash, fromTemporary) + return f.requestFn(ctx, folder, name, offset, size, hash, fromTemporary) } return f.fileData[name], nil } diff --git a/lib/model/folder.go b/lib/model/folder.go index f5d6c7c5..1055c62c 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -15,6 +15,8 @@ import ( "sync/atomic" "time" + "github.com/pkg/errors" + "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/events" @@ -278,7 +280,7 @@ func (f *folder) getHealthError() error { dbPath := locations.Get(locations.Database) if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil { if err = config.CheckFreeSpace(f.model.cfg.Options().MinHomeDiskFree, usage); err != nil { - return fmt.Errorf("insufficient space on disk for database (%v): %v", dbPath, err) + return errors.Wrapf(err, "insufficient space on disk for database (%v)", dbPath) } } @@ -297,7 +299,7 @@ func (f *folder) scanSubdirs(subDirs []string) error { oldHash := f.ignores.Hash() if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) { - err = fmt.Errorf("loading ignores: %v", err) + err = errors.Wrap(err, "loading ignores") f.setError(err) return err } diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index b72670e9..7d3051c0 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -104,7 +104,8 @@ type sendReceiveFolder struct { queue *jobQueue - pullErrors map[string]string // path -> error string + pullErrors map[string]string // errors for most recent/current iteration + oldPullErrors map[string]string // errors from previous iterations for log filtering only pullErrorsMut sync.Mutex } @@ -169,7 +170,7 @@ func (f *sendReceiveFolder) pull() bool { } }() if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) { - err = fmt.Errorf("loading ignores: %v", err) + err = errors.Wrap(err, "loading ignores") f.setError(err) return false } @@ -210,9 +211,10 @@ func (f *sendReceiveFolder) pull() bool { } f.pullErrorsMut.Lock() - hasPullErrs := len(f.pullErrors) > 0 + pullErrNum := len(f.pullErrors) f.pullErrorsMut.Unlock() - if hasPullErrs { + if pullErrNum > 0 { + l.Infof("%v: Failed to sync %v items", f.Description(), pullErrNum) f.evLogger.Log(events.FolderErrors, map[string]interface{}{ "folder": f.folderID, "errors": f.Errors(), @@ -227,6 +229,11 @@ func (f *sendReceiveFolder) pull() bool { // might have failed). One puller iteration handles all files currently // flagged as needed in the folder. func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int { + f.pullErrorsMut.Lock() + f.oldPullErrors = f.pullErrors + f.pullErrors = make(map[string]string) + f.pullErrorsMut.Unlock() + pullChan := make(chan pullBlockState) copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) @@ -269,9 +276,6 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int { doneWg.Done() }() - // Clear out all previous errors - f.clearPullErrors() - changed, fileDeletions, dirDeletions, err := f.processNeeded(dbUpdateChan, copyChan, scanChan) // Signal copy and puller routines that we are done with the in data for @@ -294,6 +298,10 @@ func (f *sendReceiveFolder) pullerIteration(scanChan chan<- string) int { close(dbUpdateChan) updateWg.Wait() + f.pullErrorsMut.Lock() + f.oldPullErrors = nil + f.pullErrorsMut.Unlock() + return changed } @@ -1435,7 +1443,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu select { case <-f.ctx.Done(): state.fail(errors.Wrap(f.ctx.Err(), "folder stopped")) - return + break default: } @@ -1458,7 +1466,7 @@ func (f *sendReceiveFolder) pullBlock(state pullBlockState, out chan<- *sharedPu // leastBusy can select another device when someone else asks. activity.using(selected) var buf []byte - buf, lastError = f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, state.block.WeakHash, selected.FromTemporary) + buf, lastError = f.model.requestGlobal(f.ctx, selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, state.block.WeakHash, selected.FromTemporary) activity.done(selected) if lastError != nil { l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError) @@ -1757,6 +1765,11 @@ func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan cha } func (f *sendReceiveFolder) newPullError(path string, err error) { + if errors.Cause(err) == f.ctx.Err() { + // Error because the folder stopped - no point logging/tracking + return + } + f.pullErrorsMut.Lock() defer f.pullErrorsMut.Unlock() @@ -1767,18 +1780,19 @@ func (f *sendReceiveFolder) newPullError(path string, err error) { return } - l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err) - // Establish context to differentiate from errors while scanning. // Use "syncing" as opposed to "pulling" as the latter might be used // for errors occurring specificly in the puller routine. - f.pullErrors[path] = fmt.Sprintln("syncing:", err) -} + errStr := fmt.Sprintln("syncing:", err) + f.pullErrors[path] = errStr -func (f *sendReceiveFolder) clearPullErrors() { - f.pullErrorsMut.Lock() - f.pullErrors = make(map[string]string) - f.pullErrorsMut.Unlock() + if oldErr, ok := f.oldPullErrors[path]; ok && oldErr == errStr { + l.Debugf("Repeat error on puller (folder %s, item %q): %v", f.Description(), path, err) + delete(f.oldPullErrors, path) // Potential repeats are now caught by f.pullErrors itself + return + } + + l.Infof("Puller (folder %s, item %q): %v", f.Description(), path, err) } func (f *sendReceiveFolder) Errors() []FileError { diff --git a/lib/model/model.go b/lib/model/model.go index 00e8590d..4df65ea7 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -8,6 +8,7 @@ package model import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -431,18 +432,13 @@ func (m *model) stopFolder(cfg config.FolderConfiguration, err error) { tokens := m.folderRunnerTokens[cfg.ID] m.fmut.RUnlock() - // Close connections to affected devices - // Must happen before stopping the folder service to abort ongoing - // transmissions and thus allow timely service termination. - w := m.closeConns(cfg.DeviceIDs(), err) - for _, id := range tokens { m.RemoveAndWait(id, 0) } // Wait for connections to stop to ensure that no more calls to methods // expecting this folder to exist happen (e.g. .IndexUpdate). - w.Wait() + m.closeConns(cfg.DeviceIDs(), err).Wait() } // Need to hold lock on m.fmut when calling this. @@ -2103,7 +2099,7 @@ func (s *indexSender) sendIndexTo() error { return err } -func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { +func (m *model) requestGlobal(ctx context.Context, deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { m.pmut.RLock() nc, ok := m.conn[deviceID] m.pmut.RUnlock() @@ -2114,7 +2110,7 @@ func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, o l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x wh=%x ft=%t", m, deviceID, folder, name, offset, size, hash, weakHash, fromTemporary) - return nc.Request(folder, name, offset, size, hash, weakHash, fromTemporary) + return nc.Request(ctx, folder, name, offset, size, hash, weakHash, fromTemporary) } func (m *model) ScanFolders() map[string]error { diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 76002aa2..ecb714d6 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -8,6 +8,7 @@ package model import ( "bytes" + "context" "encoding/json" "fmt" "io" @@ -254,7 +255,7 @@ func BenchmarkRequestOut(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - data, err := m.requestGlobal(device1, "default", files[i%n].Name, 0, 32, nil, 0, false) + data, err := m.requestGlobal(context.Background(), device1, "default", files[i%n].Name, 0, 32, nil, 0, false) if err != nil { b.Error(err) } diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index f8365062..38c95bd0 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -8,6 +8,7 @@ package model import ( "bytes" + "context" "errors" "io/ioutil" "os" @@ -136,7 +137,7 @@ func TestSymlinkTraversalWrite(t *testing.T) { } } } - fc.requestFn = func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { + fc.requestFn = func(_ context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { if name != "symlink" && strings.HasPrefix(name, "symlink") { badReq <- name } @@ -411,7 +412,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) { } // Make sure pulling doesn't interfere, as index updates are racy and // thus we cannot distinguish between scan and pull results. - fc.requestFn = func(folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { + fc.requestFn = func(_ context.Context, folder, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) { return nil, nil } fc.mut.Unlock() @@ -996,7 +997,7 @@ func TestNeedFolderFiles(t *testing.T) { errPreventSync := errors.New("you aren't getting any of this") fc.mut.Lock() - fc.requestFn = func(string, string, int64, int, []byte, bool) ([]byte, error) { + fc.requestFn = func(context.Context, string, string, int64, int, []byte, bool) ([]byte, error) { return nil, errPreventSync } fc.mut.Unlock() diff --git a/lib/protocol/benchmark_test.go b/lib/protocol/benchmark_test.go index 308d7cc1..72b76f89 100644 --- a/lib/protocol/benchmark_test.go +++ b/lib/protocol/benchmark_test.go @@ -3,6 +3,7 @@ package protocol import ( + "context" "crypto/tls" "encoding/binary" "net" @@ -80,9 +81,9 @@ func benchmarkRequestsConnPair(b *testing.B, conn0, conn1 net.Conn) { // Use c0 and c1 for each alternating request, so we get as much // data flowing in both directions. if i%2 == 0 { - buf, err = c0.Request("folder", "file", int64(i), 128<<10, nil, 0, false) + buf, err = c0.Request(context.Background(), "folder", "file", int64(i), 128<<10, nil, 0, false) } else { - buf, err = c1.Request("folder", "file", int64(i), 128<<10, nil, 0, false) + buf, err = c1.Request(context.Background(), "folder", "file", int64(i), 128<<10, nil, 0, false) } if err != nil { diff --git a/lib/protocol/protocol.go b/lib/protocol/protocol.go index 7ffae79a..3f1ef3d1 100644 --- a/lib/protocol/protocol.go +++ b/lib/protocol/protocol.go @@ -3,6 +3,7 @@ package protocol import ( + "context" "crypto/sha256" "encoding/binary" "errors" @@ -136,7 +137,7 @@ type Connection interface { Name() string Index(folder string, files []FileInfo) error IndexUpdate(folder string, files []FileInfo) error - Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) + Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) ClusterConfig(config ClusterConfig) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) Statistics() Statistics @@ -255,7 +256,7 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(&Index{ + c.send(context.TODO(), &Index{ Folder: folder, Files: idx, }, nil) @@ -271,7 +272,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(&IndexUpdate{ + c.send(context.TODO(), &IndexUpdate{ Folder: folder, Files: idx, }, nil) @@ -280,7 +281,7 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error { } // Request returns the bytes for the specified block after fetching them from the connected peer. -func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { +func (c *rawConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { c.nextIDMut.Lock() id := c.nextID c.nextID++ @@ -294,7 +295,7 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(&Request{ + ok := c.send(ctx, &Request{ ID: id, Folder: folder, Name: name, @@ -308,11 +309,15 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i return nil, ErrClosed } - res, ok := <-rc - if !ok { - return nil, ErrClosed + select { + case res, ok := <-rc: + if !ok { + return nil, ErrClosed + } + return res.val, res.err + case <-ctx.Done(): + return nil, ctx.Err() } - return res.val, res.err } // ClusterConfig sends the cluster configuration message to the peer. @@ -336,14 +341,14 @@ func (c *rawConnection) Closed() bool { // DownloadProgress sends the progress updates for the files that are currently being downloaded. func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) { - c.send(&DownloadProgress{ + c.send(context.TODO(), &DownloadProgress{ Folder: folder, Updates: updates, }, nil) } func (c *rawConnection) ping() bool { - return c.send(&Ping{}, nil) + return c.send(context.Background(), &Ping{}, nil) } func (c *rawConnection) readerLoop() { @@ -613,14 +618,14 @@ func checkFilename(name string) error { func (c *rawConnection) handleRequest(req Request) { res, err := c.receiver.Request(c.id, req.Folder, req.Name, req.Size, req.Offset, req.Hash, req.WeakHash, req.FromTemporary) if err != nil { - c.send(&Response{ + c.send(context.Background(), &Response{ ID: req.ID, Code: errorToCode(err), }, nil) return } done := make(chan struct{}) - c.send(&Response{ + c.send(context.Background(), &Response{ ID: req.ID, Data: res.Data(), Code: errorToCode(nil), @@ -639,12 +644,13 @@ func (c *rawConnection) handleResponse(resp Response) { c.awaitingMut.Unlock() } -func (c *rawConnection) send(msg message, done chan struct{}) bool { +func (c *rawConnection) send(ctx context.Context, msg message, done chan struct{}) bool { select { case c.outbox <- asyncMessage{msg, done}: return true case <-c.preventSends: case <-c.closed: + case <-ctx.Done(): } if done != nil { close(done) diff --git a/lib/protocol/protocol_test.go b/lib/protocol/protocol_test.go index 4f973c61..3e048271 100644 --- a/lib/protocol/protocol_test.go +++ b/lib/protocol/protocol_test.go @@ -4,6 +4,7 @@ package protocol import ( "bytes" + "context" "crypto/sha256" "encoding/hex" "encoding/json" @@ -77,7 +78,7 @@ func TestClose(t *testing.T) { c0.Index("default", nil) c0.Index("default", nil) - if _, err := c0.Request("default", "foo", 0, 0, nil, 0, false); err == nil { + if _, err := c0.Request(context.Background(), "default", "foo", 0, 0, nil, 0, false); err == nil { t.Error("Request should return an error") } } @@ -194,7 +195,7 @@ func TestClusterConfigFirst(t *testing.T) { c.ClusterConfig(ClusterConfig{}) done := make(chan struct{}) - if ok := c.send(&Ping{}, done); !ok { + if ok := c.send(context.Background(), &Ping{}, done); !ok { t.Fatal("send ping after cluster config returned false") } select { diff --git a/lib/protocol/wireformat.go b/lib/protocol/wireformat.go index cf465d5c..7da80a12 100644 --- a/lib/protocol/wireformat.go +++ b/lib/protocol/wireformat.go @@ -3,6 +3,7 @@ package protocol import ( + "context" "path/filepath" "golang.org/x/text/unicode/norm" @@ -34,7 +35,7 @@ func (c wireFormatConnection) IndexUpdate(folder string, fs []FileInfo) error { return c.Connection.IndexUpdate(folder, myFs) } -func (c wireFormatConnection) Request(folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { +func (c wireFormatConnection) Request(ctx context.Context, folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) { name = norm.NFC.String(filepath.ToSlash(name)) - return c.Connection.Request(folder, name, offset, size, hash, weakHash, fromTemporary) + return c.Connection.Request(ctx, folder, name, offset, size, hash, weakHash, fromTemporary) }