diff --git a/lib/model/model.go b/lib/model/model.go index e580f319..59798b5a 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -1877,8 +1877,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su return err } - defer runner.SchedulePull() - // Clean the list of subitems to ensure that we start at a known // directory, and don't scan subdirectories of things we've already // scanned. @@ -1918,6 +1916,15 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles) batchSizeBytes := 0 + changes := 0 + + // Schedule a pull after scanning, but only if we actually detected any + // changes. + defer func() { + if changes > 0 { + runner.SchedulePull() + } + }() for f := range fchan { if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes { @@ -1929,8 +1936,10 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch = batch[:0] batchSizeBytes = 0 } + batch = append(batch, f) batchSizeBytes += f.ProtoSize() + changes++ } if err := runner.CheckHealth(); err != nil { @@ -1972,6 +1981,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su nf := f.ConvertToInvalidFileInfo(m.id.Short()) batch = append(batch, nf) batchSizeBytes += nf.ProtoSize() + changes++ case !f.IsInvalid() && !f.IsDeleted(): // The file is valid and not deleted. Lets check if it's @@ -1998,6 +2008,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su batch = append(batch, nf) batchSizeBytes += nf.ProtoSize() + changes++ } } return true diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index 3cc40690..69c0a6a4 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -147,7 +147,6 @@ func (f *sendReceiveFolder) Serve() { f.setState(FolderIdle) }() - var prevSeq int64 var prevIgnoreHash string var success bool pullFailTimer := time.NewTimer(time.Duration(0)) @@ -157,6 +156,8 @@ func (f *sendReceiveFolder) Serve() { f.startWatch() } + initialCompleted := f.initialScanFinished + for { select { case <-f.ctx.Done(): @@ -169,13 +170,13 @@ func (f *sendReceiveFolder) Serve() { default: } - if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success { + if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { // Pulling failed, try again later. pullFailTimer.Reset(f.pause) } case <-pullFailTimer.C: - if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success { + if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { // Pulling failed, try again later. pullFailTimer.Reset(f.pause) // Back off from retrying to pull with an upper limit. @@ -184,6 +185,14 @@ func (f *sendReceiveFolder) Serve() { } } + case <-initialCompleted: + // Initial scan has completed, we should do a pull + initialCompleted = nil // never hit this case again + if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success { + // Pulling failed, try again later. + pullFailTimer.Reset(f.pause) + } + // The reason for running the scanner from within the puller is that // this is the easiest way to make sure we are not doing both at the // same time. @@ -222,41 +231,27 @@ func (f *sendReceiveFolder) String() string { return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f) } -func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq int64, curIgnoreHash string, success bool) { +func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, success bool) { select { case <-f.initialScanFinished: default: // Once the initial scan finished, a pull will be scheduled - return prevSeq, prevIgnoreHash, true + return prevIgnoreHash, true } f.model.fmut.RLock() curIgnores := f.model.folderIgnores[f.folderID] f.model.fmut.RUnlock() - curSeq = prevSeq curIgnoreHash = curIgnores.Hash() ignoresChanged := curIgnoreHash != prevIgnoreHash - if ignoresChanged { - // The ignore patterns have changed. We need to re-evaluate if - // there are files we need now that were ignored before. - l.Debugln(f, "ignore patterns have changed, resetting curSeq") - curSeq = 0 - } - - // RemoteSequence() is a fast call, doesn't touch the database. - remoteSeq, ok := f.model.RemoteSequence(f.folderID) - if !ok || remoteSeq == curSeq { - l.Debugln(f, "skip (remoteSeq == curSeq)", curSeq, ok) - return curSeq, curIgnoreHash, true - } if err := f.CheckHealth(); err != nil { l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err) - return curSeq, curIgnoreHash, true + return curIgnoreHash, true } - l.Debugln(f, "pulling", curSeq, remoteSeq) + l.Debugln(f, "pulling") f.setState(FolderSyncing) f.clearErrors() @@ -273,20 +268,6 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i // No files were changed by the puller, so we are in // sync. Update the local version number. - if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < remoteSeq { - // There's a corner case where the device we needed - // files from disconnected during the puller - // iteration. The files will have been removed from - // the index, so we've concluded that we don't need - // them, but at the same time we have the old remote sequence - // that includes those files in remoteSeq. So we - // catch the case that this sequence might have - // decreased here. - l.Debugf("%v adjusting remoteSeq from %d to %d", remoteSeq, lv) - remoteSeq = lv - } - curSeq = remoteSeq - f.pause = f.basePause() break @@ -313,7 +294,7 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i f.setState(FolderIdle) - return curSeq, curIgnoreHash, changed == 0 + return curIgnoreHash, changed == 0 } // pullerIteration runs a single puller iteration for the given folder and diff --git a/test/reconnect_test.go b/test/reconnect_test.go index 12f3afd9..25f499f3 100644 --- a/test/reconnect_test.go +++ b/test/reconnect_test.go @@ -10,27 +10,19 @@ package integration import ( "log" - "sync" "testing" "time" ) func TestReconnectReceiverDuringTransfer(t *testing.T) { - testReconnectDuringTransfer(t, false, true, 0, 0) + testReconnectDuringTransfer(t, false, true) } func TestReconnectSenderDuringTransfer(t *testing.T) { - testReconnectDuringTransfer(t, true, false, 0, 0) + testReconnectDuringTransfer(t, true, false) } -func TestReconnectSenderAndReceiverDuringTransfer(t *testing.T) { - // Give the receiver some time to rot with needed files but - // without any peer. This triggers - // https://github.com/syncthing/syncthing/issues/463 - testReconnectDuringTransfer(t, true, true, 10*time.Second, 0) -} - -func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceiver bool, senderDelay, receiverDelay time.Duration) { +func testReconnectDuringTransfer(t *testing.T, restartSender, restartReceiver bool) { log.Println("Cleaning...") err := removeAll("s1", "s2", "h1/index*", "h2/index*") if err != nil { @@ -38,7 +30,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive } log.Println("Generating files...") - err = generateFiles("s1", 2500, 20, "../LICENSE") + err = generateFiles("s1", 250, 20, "../LICENSE") if err != nil { t.Fatal(err) } @@ -63,8 +55,9 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive if err != nil { t.Fatal(err) } - cfg.Options.MaxRecvKbps = 100 - cfg.Options.MaxSendKbps = 100 + cfg.Options.MaxRecvKbps = 750 + cfg.Options.MaxSendKbps = 750 + cfg.Options.LimitBandwidthInLan = true if err := receiver.PostConfig(cfg); err != nil { t.Fatal(err) } @@ -86,42 +79,22 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive // Receiver has made progress prevBytes = recv.InSyncBytes - if ReconnectReceiver { - log.Printf("Pausing receiver...") - receiver.PauseAll() + if restartReceiver { + log.Printf("Stopping receiver...") + receiver.Stop() + receiver = startInstance(t, 2) + receiver.ResumeAll() } - if ReconnectSender { - log.Printf("Pausing sender...") - sender.PauseAll() + if restartSender { + log.Printf("Stopping sender...") + sender.Stop() + sender = startInstance(t, 1) + sender.ResumeAll() } - - var wg sync.WaitGroup - - if ReconnectReceiver { - wg.Add(1) - go func() { - time.Sleep(receiverDelay) - log.Printf("Resuming receiver...") - receiver.ResumeAll() - wg.Done() - }() - } - - if ReconnectSender { - wg.Add(1) - go func() { - time.Sleep(senderDelay) - log.Printf("Resuming sender...") - sender.ResumeAll() - wg.Done() - }() - } - - wg.Wait() } - time.Sleep(time.Second) + time.Sleep(250 * time.Millisecond) } // Reset rate limits @@ -131,6 +104,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive } cfg.Options.MaxRecvKbps = 0 cfg.Options.MaxSendKbps = 0 + cfg.Options.LimitBandwidthInLan = false if err := receiver.PostConfig(cfg); err != nil { t.Fatal(err) } diff --git a/test/reset_test.go b/test/reset_test.go index e0b1c202..bb9711fe 100644 --- a/test/reset_test.go +++ b/test/reset_test.go @@ -10,7 +10,9 @@ package integration import ( "bytes" + "fmt" "io" + "io/ioutil" "log" "os" "path/filepath" @@ -128,25 +130,15 @@ func TestReset(t *testing.T) { } func createFiles(t *testing.T) int { - // Create eight empty files and directories - files := []string{"f1", "f2", "f3", "f4", "f11", "f12", "f13", "f14"} - dirs := []string{"d1", "d2", "d3", "d4", "d11", "d12", "d13", "d14"} - all := append(files, dirs...) + // Create a few files - for _, file := range files { - fd, err := os.Create(filepath.Join("s1", file)) - if err != nil { - t.Fatal(err) - } - fd.Close() - } - - for _, dir := range dirs { - err := os.Mkdir(filepath.Join("s1", dir), 0755) - if err != nil { + const n = 8 + for i := 0; i < n; i++ { + file := fmt.Sprintf("f%d", i) + if err := ioutil.WriteFile(filepath.Join("s1", file), []byte("data"), 0644); err != nil { t.Fatal(err) } } - return len(all) + return n } diff --git a/test/sync_test.go b/test/sync_test.go index 600dcd3c..ad5d1388 100644 --- a/test/sync_test.go +++ b/test/sync_test.go @@ -20,8 +20,8 @@ import ( ) const ( - longTimeLimit = 5 * time.Minute - shortTimeLimit = 45 * time.Second + longTimeLimit = 1 * time.Minute + shortTimeLimit = 25 * time.Second s12Folder = `¯\_(ツ)_/¯ Räksmörgås 动作 Адрес` // This was renamed to ensure arbitrary folder IDs are fine. )