diff --git a/lib/config/wrapper.go b/lib/config/wrapper.go index 85631d30..671f76d3 100644 --- a/lib/config/wrapper.go +++ b/lib/config/wrapper.go @@ -94,6 +94,7 @@ type wrapper struct { path string evLogger events.Logger + waiter Waiter // Latest ongoing config change deviceMap map[protocol.DeviceID]DeviceConfiguration folderMap map[string]FolderConfiguration subs []Committer @@ -109,6 +110,7 @@ func Wrap(path string, cfg Configuration, evLogger events.Logger) Wrapper { cfg: cfg, path: path, evLogger: evLogger, + waiter: noopWaiter{}, // Noop until first config change mut: sync.NewMutex(), } return w @@ -144,7 +146,8 @@ func (w *wrapper) Subscribe(c Committer) { } // Unsubscribe de-registers the given handler from any future calls to -// configuration changes +// configuration changes and only returns after a potential ongoing config +// change is done. func (w *wrapper) Unsubscribe(c Committer) { w.mut.Lock() for i := range w.subs { @@ -155,7 +158,11 @@ func (w *wrapper) Unsubscribe(c Committer) { break } } + waiter := w.waiter w.mut.Unlock() + // Waiting mustn't be done under lock, as the goroutines in notifyListener + // may dead-lock when trying to access lock on config read operations. + waiter.Wait() } // RawCopy returns a copy of the currently wrapped Configuration object. @@ -191,7 +198,9 @@ func (w *wrapper) replaceLocked(to Configuration) (Waiter, error) { w.deviceMap = nil w.folderMap = nil - return w.notifyListeners(from.Copy(), to.Copy()), nil + w.waiter = w.notifyListeners(from.Copy(), to.Copy()) + + return w.waiter, nil } func (w *wrapper) notifyListeners(from, to Configuration) Waiter { diff --git a/lib/connections/service.go b/lib/connections/service.go index 4b880d13..69542d23 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -193,6 +193,12 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t return service } +func (s *service) Stop() { + s.cfg.Unsubscribe(s.limiter) + s.cfg.Unsubscribe(s) + s.Supervisor.Stop() +} + func (s *service) handle(ctx context.Context) { var c internalConn for { diff --git a/lib/model/model.go b/lib/model/model.go index de953f48..ef21da24 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -217,7 +217,6 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio } m.Add(m.progressEmitter) scanLimiter.setCapacity(cfg.Options().MaxConcurrentScans) - cfg.Subscribe(m) return m } @@ -241,9 +240,11 @@ func (m *model) onServe() { } m.newFolder(folderCfg) } + m.cfg.Subscribe(m) } func (m *model) Stop() { + m.cfg.Unsubscribe(m) m.Supervisor.Stop() devs := m.cfg.Devices() ids := make([]protocol.DeviceID, 0, len(devs)) diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index 5c4ce1c4..7d340a82 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -23,6 +23,7 @@ import ( type ProgressEmitter struct { suture.Service + cfg config.Wrapper registry map[string]map[string]*sharedPullerState // folder: name: puller interval time.Duration minBlocks int @@ -40,6 +41,7 @@ type ProgressEmitter struct { // DownloadProgress events every interval. func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter { t := &ProgressEmitter{ + cfg: cfg, registry: make(map[string]map[string]*sharedPullerState), timer: time.NewTimer(time.Millisecond), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), @@ -51,7 +53,6 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi t.Service = util.AsService(t.serve, t.String()) t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) - cfg.Subscribe(t) return t } @@ -59,6 +60,9 @@ func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmi // serve starts the progress emitter which starts emitting DownloadProgress // events as the progress happens. func (t *ProgressEmitter) serve(ctx context.Context) { + t.cfg.Subscribe(t) + defer t.cfg.Unsubscribe(t) + var lastUpdate time.Time var lastCount, newCount int for { diff --git a/lib/ur/usage_report.go b/lib/ur/usage_report.go index f12c4169..1d45061c 100644 --- a/lib/ur/usage_report.go +++ b/lib/ur/usage_report.go @@ -58,7 +58,6 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi forceRun: make(chan struct{}, 1), // Buffered to prevent locking } svc.Service = util.AsService(svc.serve, svc.String()) - cfg.Subscribe(svc) return svc } @@ -385,6 +384,9 @@ func (s *Service) sendUsageReport() error { } func (s *Service) serve(ctx context.Context) { + s.cfg.Subscribe(s) + defer s.cfg.Unsubscribe(s) + t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second) for { select {