diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 2cb08ba1..1846a302 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -898,12 +898,15 @@ func (s *apiService) postSystemConfig(w http.ResponseWriter, r *http.Request) { } } - // Activate and save + // Activate and save. Wait for the configuration to become active before + // completing the request. - if _, err := s.cfg.Replace(to); err != nil { + if wg, err := s.cfg.Replace(to); err != nil { l.Warnln("Replacing config:", err) http.Error(w, err.Error(), http.StatusInternalServerError) return + } else { + wg.Wait() } if err := s.cfg.Save(); err != nil { diff --git a/cmd/syncthing/mocked_config_test.go b/cmd/syncthing/mocked_config_test.go index 8669adc3..bf97087b 100644 --- a/cmd/syncthing/mocked_config_test.go +++ b/cmd/syncthing/mocked_config_test.go @@ -39,7 +39,7 @@ func (c *mockedConfig) Options() config.OptionsConfiguration { } func (c *mockedConfig) Replace(cfg config.Configuration) (config.Waiter, error) { - return nil, nil + return noopWaiter{}, nil } func (c *mockedConfig) Subscribe(cm config.Committer) {} @@ -53,11 +53,11 @@ func (c *mockedConfig) Devices() map[protocol.DeviceID]config.DeviceConfiguratio } func (c *mockedConfig) SetDevice(config.DeviceConfiguration) (config.Waiter, error) { - return nil, nil + return noopWaiter{}, nil } func (c *mockedConfig) SetDevices([]config.DeviceConfiguration) (config.Waiter, error) { - return nil, nil + return noopWaiter{}, nil } func (c *mockedConfig) Save() error { @@ -67,3 +67,7 @@ func (c *mockedConfig) Save() error { func (c *mockedConfig) RequiresRestart() bool { return false } + +type noopWaiter struct{} + +func (noopWaiter) Wait() {} diff --git a/lib/model/folder.go b/lib/model/folder.go index add212e6..41db4b68 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -11,6 +11,7 @@ import ( "errors" "fmt" "math/rand" + "sync/atomic" "time" "github.com/syncthing/syncthing/lib/config" @@ -89,6 +90,9 @@ func newFolder(model *Model, cfg config.FolderConfiguration) folder { } func (f *folder) Serve() { + atomic.AddInt32(&f.model.foldersRunning, 1) + defer atomic.AddInt32(&f.model.foldersRunning, -1) + l.Debugln(f, "starting") defer l.Debugln(f, "exiting") diff --git a/lib/model/model.go b/lib/model/model.go index 7b11283c..8eb49c8b 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -20,6 +20,7 @@ import ( "runtime" "sort" "strings" + stdsync "sync" "time" "github.com/syncthing/syncthing/lib/config" @@ -94,6 +95,7 @@ type Model struct { clientName string clientVersion string + fmut sync.RWMutex // protects the below folderCfgs map[string]config.FolderConfiguration // folder -> cfg folderFiles map[string]*db.FileSet // folder -> files deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef @@ -101,14 +103,16 @@ type Model struct { folderRunners map[string]service // folder -> puller or scanner folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef - fmut sync.RWMutex // protects the above + folderRestartMuts syncMutexMap // folder -> restart mutex + pmut sync.RWMutex // protects the below conn map[protocol.DeviceID]connections.Connection closed map[protocol.DeviceID]chan struct{} helloMessages map[protocol.DeviceID]protocol.HelloResult deviceDownloads map[protocol.DeviceID]*deviceDownloadState remotePausedFolders map[protocol.DeviceID][]string // deviceID -> folders - pmut sync.RWMutex // protects the above + + foldersRunning int32 // for testing only } type folderFactory func(*Model, config.FolderConfiguration, versioner.Versioner, fs.Filesystem) service @@ -372,11 +376,26 @@ func (m *Model) tearDownFolderLocked(cfg config.FolderConfiguration) { func (m *Model) RestartFolder(from, to config.FolderConfiguration) { if len(to.ID) == 0 { - panic("cannot add empty folder id") + panic("bug: cannot restart empty folder ID") } + if to.ID != from.ID { + panic(fmt.Sprintf("bug: folder restart cannot change ID %q -> %q", from.ID, to.ID)) + } + + // This mutex protects the entirety of the restart operation, preventing + // there from being more than one folder restart operation in progress + // at any given time. The usual fmut/pmut stuff doesn't cover this, + // because those locks are released while we are waiting for the folder + // to shut down (and must be so because the folder might need them as + // part of its operations before shutting down). + restartMut := m.folderRestartMuts.Get(to.ID) + restartMut.Lock() + defer restartMut.Unlock() m.fmut.Lock() m.pmut.Lock() + defer m.fmut.Unlock() + defer m.pmut.Unlock() m.tearDownFolderLocked(from) if to.Paused { @@ -386,9 +405,6 @@ func (m *Model) RestartFolder(from, to config.FolderConfiguration) { folderType := m.startFolderLocked(to.ID) l.Infoln("Restarted folder", to.Description(), fmt.Sprintf("(%s)", folderType)) } - - m.pmut.Unlock() - m.fmut.Unlock() } func (m *Model) UsageReportingStats(version int, preview bool) map[string]interface{} { @@ -2976,3 +2992,13 @@ func (b *fileInfoBatch) reset() { b.infos = b.infos[:0] b.size = 0 } + +// syncMutexMap is a type safe wrapper for a sync.Map that holds mutexes +type syncMutexMap struct { + inner stdsync.Map +} + +func (m *syncMutexMap) Get(key string) sync.Mutex { + v, _ := m.inner.LoadOrStore(key, sync.NewMutex()) + return v.(sync.Mutex) +} diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 817b150f..7bb78315 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "testing" "time" @@ -3850,6 +3851,59 @@ func addFakeConn(m *Model, dev protocol.DeviceID) *fakeConnection { return fc } +func TestFolderRestartZombies(t *testing.T) { + // This is for issue 5233, where multiple concurrent folder restarts + // would leave more than one folder runner alive. + + wrapper := createTmpWrapper(defaultCfg.Copy()) + folderCfg, _ := wrapper.Folder("default") + folderCfg.FilesystemType = fs.FilesystemTypeFake + wrapper.SetFolder(folderCfg) + + db := db.OpenMemory() + m := NewModel(wrapper, protocol.LocalDeviceID, "syncthing", "dev", db, nil) + m.AddFolder(folderCfg) + m.StartFolder("default") + + m.ServeBackground() + defer m.Stop() + + // Make sure the folder is up and running, because we want to count it. + m.ScanFolder("default") + + // Check how many running folders we have running before the test. + if r := atomic.LoadInt32(&m.foldersRunning); r != 1 { + t.Error("Expected one running folder, not", r) + } + + // Run a few parallel configuration changers for one second. Each waits + // for the commit to complete, but there are many of them. + var wg sync.WaitGroup + for i := 0; i < 25; i++ { + wg.Add(1) + go func() { + defer wg.Done() + t0 := time.Now() + for time.Since(t0) < time.Second { + cfg := folderCfg.Copy() + cfg.MaxConflicts = rand.Int() // safe change that should cause a folder restart + w, err := wrapper.SetFolder(cfg) + if err != nil { + panic(err) + } + w.Wait() + } + }() + } + + // Wait for the above to complete and check how many folders we have + // running now. It should not have increased. + wg.Wait() + if r := atomic.LoadInt32(&m.foldersRunning); r != 1 { + t.Error("Expected one running folder, not", r) + } +} + type fakeAddr struct{} func (fakeAddr) Network() string {