diff --git a/lib/api/api.go b/lib/api/api.go index 9198a04c..64efbf3e 100644 --- a/lib/api/api.go +++ b/lib/api/api.go @@ -28,6 +28,10 @@ import ( "time" metrics "github.com/rcrowley/go-metrics" + "github.com/thejerf/suture" + "github.com/vitrun/qart/qr" + "golang.org/x/crypto/bcrypt" + "github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/connections" @@ -44,9 +48,7 @@ import ( "github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/ur" - "github.com/thejerf/suture" - "github.com/vitrun/qart/qr" - "golang.org/x/crypto/bcrypt" + "github.com/syncthing/syncthing/lib/util" ) // matches a bcrypt hash and not too much else @@ -60,6 +62,8 @@ const ( ) type service struct { + suture.Service + id protocol.DeviceID cfg config.Wrapper statics *staticsServer @@ -102,7 +106,7 @@ type Service interface { } func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, cpu Rater, contr Controller, noUpgrade bool) Service { - return &service{ + s := &service{ id: id, cfg: cfg, statics: newStaticsServer(cfg.GUI().Theme, assetDir), @@ -123,10 +127,11 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam contr: contr, noUpgrade: noUpgrade, tlsDefaultCommonName: tlsDefaultCommonName, - stop: make(chan struct{}), configChanged: make(chan struct{}), startedOnce: make(chan struct{}), } + s.Service = util.AsService(s.serve) + return s } func (s *service) WaitForStart() error { @@ -190,7 +195,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) { fmt.Fprintf(w, "%s\n", bs) } -func (s *service) Serve() { +func (s *service) serve(stop chan struct{}) { listener, err := s.getListener(s.cfg.GUI()) if err != nil { select { @@ -360,7 +365,7 @@ func (s *service) Serve() { // Wait for stop, restart or error signals select { - case <-s.stop: + case <-stop: // Shutting down permanently l.Debugln("shutting down (stop)") case <-s.configChanged: @@ -378,17 +383,11 @@ func (s *service) Complete() bool { select { case <-s.startedOnce: return s.startupErr != nil - case <-s.stop: - return true default: } return false } -func (s *service) Stop() { - close(s.stop) -} - func (s *service) String() string { return fmt.Sprintf("api.service@%p", s) } diff --git a/lib/beacon/multicast.go b/lib/beacon/multicast.go index 0bc91a7e..befc5598 100644 --- a/lib/beacon/multicast.go +++ b/lib/beacon/multicast.go @@ -14,6 +14,8 @@ import ( "github.com/thejerf/suture" "golang.org/x/net/ipv6" + + "github.com/syncthing/syncthing/lib/util" ) type Multicast struct { @@ -45,15 +47,15 @@ func NewMulticast(addr string) *Multicast { m.mr = &multicastReader{ addr: addr, outbox: m.outbox, - stop: make(chan struct{}), } + m.mr.Service = util.AsService(m.mr.serve) m.Add(m.mr) m.mw = &multicastWriter{ addr: addr, inbox: m.inbox, - stop: make(chan struct{}), } + m.mw.Service = util.AsService(m.mw.serve) m.Add(m.mw) return m @@ -76,13 +78,13 @@ func (m *Multicast) Error() error { } type multicastWriter struct { + suture.Service addr string inbox <-chan []byte errorHolder - stop chan struct{} } -func (w *multicastWriter) Serve() { +func (w *multicastWriter) serve(stop chan struct{}) { l.Debugln(w, "starting") defer l.Debugln(w, "stopping") @@ -106,7 +108,14 @@ func (w *multicastWriter) Serve() { HopLimit: 1, } - for bs := range w.inbox { + for { + var bs []byte + select { + case bs = <-w.inbox: + case <-stop: + return + } + intfs, err := net.Interfaces() if err != nil { l.Debugln(err) @@ -130,6 +139,12 @@ func (w *multicastWriter) Serve() { l.Debugf("sent %d bytes to %v on %s", len(bs), gaddr, intf.Name) success++ + + select { + case <-stop: + return + default: + } } if success > 0 { @@ -141,22 +156,18 @@ func (w *multicastWriter) Serve() { } } -func (w *multicastWriter) Stop() { - close(w.stop) -} - func (w *multicastWriter) String() string { return fmt.Sprintf("multicastWriter@%p", w) } type multicastReader struct { + suture.Service addr string outbox chan<- recv errorHolder - stop chan struct{} } -func (r *multicastReader) Serve() { +func (r *multicastReader) serve(stop chan struct{}) { l.Debugln(r, "starting") defer l.Debugln(r, "stopping") @@ -213,16 +224,14 @@ func (r *multicastReader) Serve() { copy(c, bs) select { case r.outbox <- recv{c, addr}: + case <-stop: + return default: l.Debugln("dropping message") } } } -func (r *multicastReader) Stop() { - close(r.stop) -} - func (r *multicastReader) String() string { return fmt.Sprintf("multicastReader@%p", r) } diff --git a/lib/connections/quic_listen.go b/lib/connections/quic_listen.go index d81aabde..4f908315 100644 --- a/lib/connections/quic_listen.go +++ b/lib/connections/quic_listen.go @@ -23,6 +23,7 @@ import ( "github.com/syncthing/syncthing/lib/connections/registry" "github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/stun" + "github.com/syncthing/syncthing/lib/util" ) func init() { @@ -33,6 +34,7 @@ func init() { } type quicListener struct { + util.ServiceWithError nat atomic.Value onAddressesChangedNotifier @@ -40,12 +42,10 @@ type quicListener struct { uri *url.URL cfg config.Wrapper tlsCfg *tls.Config - stop chan struct{} conns chan internalConn factory listenerFactory address *url.URL - err error mut sync.Mutex } @@ -77,20 +77,13 @@ func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string) } } -func (t *quicListener) Serve() { - t.mut.Lock() - t.err = nil - t.mut.Unlock() - +func (t *quicListener) serve(stop chan struct{}) error { network := strings.Replace(t.uri.Scheme, "quic", "udp", -1) packetConn, err := net.ListenPacket(network, t.uri.Host) if err != nil { - t.mut.Lock() - t.err = err - t.mut.Unlock() l.Infoln("Listen (BEP/quic):", err) - return + return err } defer func() { _ = packetConn.Close() }() @@ -105,11 +98,8 @@ func (t *quicListener) Serve() { listener, err := quic.Listen(conn, t.tlsCfg, quicConfig) if err != nil { - t.mut.Lock() - t.err = err - t.mut.Unlock() l.Infoln("Listen (BEP/quic):", err) - return + return err } l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr()) @@ -118,7 +108,7 @@ func (t *quicListener) Serve() { // Accept is forever, so handle stops externally. go func() { select { - case <-t.stop: + case <-stop: _ = listener.Close() } }() @@ -128,11 +118,11 @@ func (t *quicListener) Serve() { session, err := listener.Accept() select { - case <-t.stop: + case <-stop: if err == nil { _ = session.Close() } - return + return nil default: } if err != nil { @@ -150,7 +140,7 @@ func (t *quicListener) Serve() { select { case <-ok: return - case <-t.stop: + case <-stop: _ = session.Close() case <-time.After(10 * time.Second): l.Debugln("timed out waiting for AcceptStream on", session.RemoteAddr()) @@ -170,10 +160,6 @@ func (t *quicListener) Serve() { } } -func (t *quicListener) Stop() { - close(t.stop) -} - func (t *quicListener) URI() *url.URL { return t.uri } @@ -192,13 +178,6 @@ func (t *quicListener) LANAddresses() []*url.URL { return []*url.URL{t.uri} } -func (t *quicListener) Error() error { - t.mut.Lock() - err := t.err - t.mut.Unlock() - return err -} - func (t *quicListener) String() string { return t.uri.String() } @@ -227,9 +206,9 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls. cfg: cfg, tlsCfg: tlsCfg, conns: conns, - stop: make(chan struct{}), factory: f, } + l.ServiceWithError = util.AsServiceWithError(l.serve) l.nat.Store(stun.NATUnknown) return l } diff --git a/lib/connections/service.go b/lib/connections/service.go index 02b6b15b..e861562b 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -184,16 +184,22 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t // the common handling regardless of whether the connection was // incoming or outgoing. - service.Add(serviceFunc(service.connect)) - service.Add(serviceFunc(service.handle)) + service.Add(util.AsService(service.connect)) + service.Add(util.AsService(service.handle)) service.Add(service.listenerSupervisor) return service } -func (s *service) handle() { -next: - for c := range s.conns { +func (s *service) handle(stop chan struct{}) { + var c internalConn + for { + select { + case <-stop: + return + case c = <-s.conns: + } + cs := c.ConnectionState() // We should have negotiated the next level protocol "bep/1.0" as part @@ -298,7 +304,7 @@ next: // config. Warn instead of Info. l.Warnf("Bad certificate from %s at %s: %v", remoteID, c, err) c.Close() - continue next + continue } // Wrap the connection in rate limiters. The limiter itself will @@ -313,11 +319,11 @@ next: l.Infof("Established secure connection to %s at %s", remoteID, c) s.model.AddConnection(modelConn, hello) - continue next + continue } } -func (s *service) connect() { +func (s *service) connect(stop chan struct{}) { nextDial := make(map[string]time.Time) // Used as delay for the first few connection attempts, increases @@ -465,11 +471,16 @@ func (s *service) connect() { if initialRampup < sleep { l.Debugln("initial rampup; sleep", initialRampup, "and update to", initialRampup*2) - time.Sleep(initialRampup) + sleep = initialRampup initialRampup *= 2 } else { l.Debugln("sleep until next dial", sleep) - time.Sleep(sleep) + } + + select { + case <-time.After(sleep): + case <-stop: + return } } } diff --git a/lib/connections/structs.go b/lib/connections/structs.go index 8cffbaff..00d5eaff 100644 --- a/lib/connections/structs.go +++ b/lib/connections/structs.go @@ -191,13 +191,6 @@ type Model interface { GetHello(protocol.DeviceID) protocol.HelloIntf } -// serviceFunc wraps a function to create a suture.Service without stop -// functionality. -type serviceFunc func() - -func (f serviceFunc) Serve() { f() } -func (f serviceFunc) Stop() {} - type onAddressesChangedNotifier struct { callbacks []func(genericListener) } diff --git a/lib/connections/tcp_listen.go b/lib/connections/tcp_listen.go index 65573f0b..d958358f 100644 --- a/lib/connections/tcp_listen.go +++ b/lib/connections/tcp_listen.go @@ -16,6 +16,7 @@ import ( "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/nat" + "github.com/syncthing/syncthing/lib/util" ) func init() { @@ -26,43 +27,32 @@ func init() { } type tcpListener struct { + util.ServiceWithError onAddressesChangedNotifier uri *url.URL cfg config.Wrapper tlsCfg *tls.Config - stop chan struct{} conns chan internalConn factory listenerFactory natService *nat.Service mapping *nat.Mapping - err error mut sync.RWMutex } -func (t *tcpListener) Serve() { - t.mut.Lock() - t.err = nil - t.mut.Unlock() - +func (t *tcpListener) serve(stop chan struct{}) error { tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host) if err != nil { - t.mut.Lock() - t.err = err - t.mut.Unlock() l.Infoln("Listen (BEP/tcp):", err) - return + return err } listener, err := net.ListenTCP(t.uri.Scheme, tcaddr) if err != nil { - t.mut.Lock() - t.err = err - t.mut.Unlock() l.Infoln("Listen (BEP/tcp):", err) - return + return err } defer listener.Close() @@ -86,14 +76,14 @@ func (t *tcpListener) Serve() { listener.SetDeadline(time.Now().Add(time.Second)) conn, err := listener.Accept() select { - case <-t.stop: + case <-stop: if err == nil { conn.Close() } t.mut.Lock() t.mapping = nil t.mut.Unlock() - return + return nil default: } if err != nil { @@ -104,7 +94,7 @@ func (t *tcpListener) Serve() { if acceptFailures > maxAcceptFailures { // Return to restart the listener, because something // seems permanently damaged. - return + return err } // Slightly increased delay for each failure. @@ -137,10 +127,6 @@ func (t *tcpListener) Serve() { } } -func (t *tcpListener) Stop() { - close(t.stop) -} - func (t *tcpListener) URI() *url.URL { return t.uri } @@ -174,13 +160,6 @@ func (t *tcpListener) LANAddresses() []*url.URL { return []*url.URL{t.uri} } -func (t *tcpListener) Error() error { - t.mut.RLock() - err := t.err - t.mut.RUnlock() - return err -} - func (t *tcpListener) String() string { return t.uri.String() } @@ -196,15 +175,16 @@ func (t *tcpListener) NATType() string { type tcpListenerFactory struct{} func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener { - return &tcpListener{ + l := &tcpListener{ uri: fixupPort(uri, config.DefaultTCPPort), cfg: cfg, tlsCfg: tlsCfg, conns: conns, natService: natService, - stop: make(chan struct{}), factory: f, } + l.ServiceWithError = util.AsServiceWithError(l.serve) + return l } func (tcpListenerFactory) Valid(_ config.Configuration) error { diff --git a/lib/discover/global.go b/lib/discover/global.go index cf59472d..86738c39 100644 --- a/lib/discover/global.go +++ b/lib/discover/global.go @@ -19,19 +19,22 @@ import ( stdsync "sync" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/util" ) type globalClient struct { + suture.Service server string addrList AddressLister announceClient httpClient queryClient httpClient noAnnounce bool noLookup bool - stop chan struct{} errorHolder } @@ -122,8 +125,8 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister) (Fin queryClient: queryClient, noAnnounce: opts.noAnnounce, noLookup: opts.noLookup, - stop: make(chan struct{}), } + cl.Service = util.AsService(cl.serve) if !opts.noAnnounce { // If we are supposed to annonce, it's an error until we've done so. cl.setError(errors.New("not announced")) @@ -183,11 +186,11 @@ func (c *globalClient) String() string { return "global@" + c.server } -func (c *globalClient) Serve() { +func (c *globalClient) serve(stop chan struct{}) { if c.noAnnounce { // We're configured to not do announcements, only lookups. To maintain // the same interface, we just pause here if Serve() is run. - <-c.stop + <-stop return } @@ -207,7 +210,7 @@ func (c *globalClient) Serve() { case <-timer.C: c.sendAnnouncement(timer) - case <-c.stop: + case <-stop: return } } @@ -276,10 +279,6 @@ func (c *globalClient) sendAnnouncement(timer *time.Timer) { timer.Reset(defaultReannounceInterval) } -func (c *globalClient) Stop() { - close(c.stop) -} - func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry { // The globalClient doesn't do caching return nil diff --git a/lib/model/folder.go b/lib/model/folder.go index d8c69d88..fcb18fef 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -28,6 +28,8 @@ import ( "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/watchaggregator" + + "github.com/thejerf/suture" ) // scanLimiter limits the number of concurrent scans. A limit of zero means no limit. @@ -36,6 +38,7 @@ var scanLimiter = newByteSemaphore(0) var errWatchNotStarted = errors.New("not started") type folder struct { + suture.Service stateTracker config.FolderConfiguration *stats.FolderStatisticsReference @@ -54,7 +57,6 @@ type folder struct { scanNow chan rescanRequest scanDelay chan time.Duration initialScanFinished chan struct{} - stopped chan struct{} scanErrors []FileError scanErrorsMut sync.Mutex @@ -98,7 +100,6 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf scanNow: make(chan rescanRequest), scanDelay: make(chan time.Duration), initialScanFinished: make(chan struct{}), - stopped: make(chan struct{}), scanErrorsMut: sync.NewMutex(), pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes. @@ -109,7 +110,7 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf } } -func (f *folder) Serve() { +func (f *folder) serve(_ chan struct{}) { atomic.AddInt32(&f.model.foldersRunning, 1) defer atomic.AddInt32(&f.model.foldersRunning, -1) @@ -119,7 +120,6 @@ func (f *folder) Serve() { defer func() { f.scanTimer.Stop() f.setState(FolderIdle) - close(f.stopped) }() pause := f.basePause() @@ -256,7 +256,7 @@ func (f *folder) Delay(next time.Duration) { func (f *folder) Stop() { f.cancel() - <-f.stopped + f.Service.Stop() } // CheckHealth checks the folder for common errors, updates the folder state diff --git a/lib/model/folder_sendonly.go b/lib/model/folder_sendonly.go index 23f12961..5b144951 100644 --- a/lib/model/folder_sendonly.go +++ b/lib/model/folder_sendonly.go @@ -12,6 +12,7 @@ import ( "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/protocol" + "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" ) @@ -28,6 +29,7 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, folder: newFolder(model, fset, ignores, cfg), } f.folder.puller = f + f.folder.Service = util.AsService(f.serve) return f } diff --git a/lib/model/folder_sendrecv.go b/lib/model/folder_sendrecv.go index c7b9d50a..31011e7f 100644 --- a/lib/model/folder_sendrecv.go +++ b/lib/model/folder_sendrecv.go @@ -28,6 +28,7 @@ import ( "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/weakhash" ) @@ -116,6 +117,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche pullErrorsMut: sync.NewMutex(), } f.folder.puller = f + f.folder.Service = util.AsService(f.serve) if f.Copiers == 0 { f.Copiers = defaultCopiers diff --git a/lib/model/folder_summary.go b/lib/model/folder_summary.go index 4848de99..d9dd07f5 100644 --- a/lib/model/folder_summary.go +++ b/lib/model/folder_summary.go @@ -11,11 +11,13 @@ import ( "strings" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" - "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/util" ) const minSummaryInterval = time.Minute @@ -34,7 +36,6 @@ type folderSummaryService struct { cfg config.Wrapper model Model id protocol.DeviceID - stop chan struct{} immediate chan string // For keeping track of folders to recalculate for @@ -54,24 +55,18 @@ func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID) cfg: cfg, model: m, id: id, - stop: make(chan struct{}), immediate: make(chan string), folders: make(map[string]struct{}), foldersMut: sync.NewMutex(), lastEventReqMut: sync.NewMutex(), } - service.Add(serviceFunc(service.listenForUpdates)) - service.Add(serviceFunc(service.calculateSummaries)) + service.Add(util.AsService(service.listenForUpdates)) + service.Add(util.AsService(service.calculateSummaries)) return service } -func (c *folderSummaryService) Stop() { - c.Supervisor.Stop() - close(c.stop) -} - func (c *folderSummaryService) String() string { return fmt.Sprintf("FolderSummaryService@%p", c) } @@ -148,7 +143,7 @@ func (c *folderSummaryService) OnEventRequest() { // listenForUpdates subscribes to the event bus and makes note of folders that // need their data recalculated. -func (c *folderSummaryService) listenForUpdates() { +func (c *folderSummaryService) listenForUpdates(stop chan struct{}) { sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) defer events.Default.Unsubscribe(sub) @@ -158,7 +153,7 @@ func (c *folderSummaryService) listenForUpdates() { select { case ev := <-sub.C(): c.processUpdate(ev) - case <-c.stop: + case <-stop: return } } @@ -237,7 +232,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) { // calculateSummaries periodically recalculates folder summaries and // completion percentage, and sends the results on the event bus. -func (c *folderSummaryService) calculateSummaries() { +func (c *folderSummaryService) calculateSummaries(stop chan struct{}) { const pumpInterval = 2 * time.Second pump := time.NewTimer(pumpInterval) @@ -258,7 +253,7 @@ func (c *folderSummaryService) calculateSummaries() { case folder := <-c.immediate: c.sendSummary(folder) - case <-c.stop: + case <-stop: return } } @@ -319,10 +314,3 @@ func (c *folderSummaryService) sendSummary(folder string) { events.Default.Log(events.FolderCompletion, comp) } } - -// serviceFunc wraps a function to create a suture.Service without stop -// functionality. -type serviceFunc func() - -func (f serviceFunc) Serve() { f() } -func (f serviceFunc) Stop() {} diff --git a/lib/model/model.go b/lib/model/model.go index 73917ddf..e8b149db 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -32,6 +32,7 @@ import ( "github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/upgrade" + "github.com/syncthing/syncthing/lib/util" "github.com/syncthing/syncthing/lib/versioner" "github.com/thejerf/suture" ) @@ -1169,19 +1170,19 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon } } - // The token isn't tracked as the service stops when the connection - // terminates and is automatically removed from supervisor (by - // implementing suture.IsCompletable). - m.Add(&indexSender{ + is := &indexSender{ conn: conn, connClosed: closed, folder: folder.ID, fset: fs, prevSequence: startSequence, dropSymlinks: dropSymlinks, - stop: make(chan struct{}), - stopped: make(chan struct{}), - }) + } + is.Service = util.AsService(is.serve) + // The token isn't tracked as the service stops when the connection + // terminates and is automatically removed from supervisor (by + // implementing suture.IsCompletable). + m.Add(is) } m.pmut.Lock() @@ -1896,6 +1897,7 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) { } type indexSender struct { + suture.Service conn protocol.Connection folder string dev string @@ -1903,13 +1905,9 @@ type indexSender struct { prevSequence int64 dropSymlinks bool connClosed chan struct{} - stop chan struct{} - stopped chan struct{} } -func (s *indexSender) Serve() { - defer close(s.stopped) - +func (s *indexSender) serve(stop chan struct{}) { var err error l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) @@ -1930,7 +1928,7 @@ func (s *indexSender) Serve() { for err == nil { select { - case <-s.stop: + case <-stop: return case <-s.connClosed: return @@ -1943,7 +1941,7 @@ func (s *indexSender) Serve() { // sending for. if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { select { - case <-s.stop: + case <-stop: return case <-s.connClosed: return @@ -1963,11 +1961,6 @@ func (s *indexSender) Serve() { } } -func (s *indexSender) Stop() { - close(s.stop) - <-s.stopped -} - // Complete implements the suture.IsCompletable interface. When Serve terminates // before Stop is called, the supervisor will check for this method and if it // returns true removes the service instead of restarting it. Here it always diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index d905c6a6..27c0f4ae 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -10,13 +10,18 @@ import ( "fmt" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" ) type ProgressEmitter struct { + suture.Service + registry map[string]map[string]*sharedPullerState // folder: name: puller interval time.Duration minBlocks int @@ -27,15 +32,12 @@ type ProgressEmitter struct { mut sync.Mutex timer *time.Timer - - stop chan struct{} } // NewProgressEmitter creates a new progress emitter which emits // DownloadProgress events every interval. func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { t := &ProgressEmitter{ - stop: make(chan struct{}), registry: make(map[string]map[string]*sharedPullerState), timer: time.NewTimer(time.Millisecond), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), @@ -43,6 +45,7 @@ func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { foldersByConns: make(map[protocol.DeviceID][]string), mut: sync.NewMutex(), } + t.Service = util.AsService(t.serve) t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) cfg.Subscribe(t) @@ -50,14 +53,14 @@ func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { return t } -// Serve starts the progress emitter which starts emitting DownloadProgress +// serve starts the progress emitter which starts emitting DownloadProgress // events as the progress happens. -func (t *ProgressEmitter) Serve() { +func (t *ProgressEmitter) serve(stop chan struct{}) { var lastUpdate time.Time var lastCount, newCount int for { select { - case <-t.stop: + case <-stop: l.Debugln("progress emitter: stopping") return case <-t.timer.C: @@ -212,11 +215,6 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo return true } -// Stop stops the emitter. -func (t *ProgressEmitter) Stop() { - t.stop <- struct{}{} -} - // Register a puller with the emitter which will start broadcasting pullers // progress. func (t *ProgressEmitter) Register(s *sharedPullerState) { diff --git a/lib/relay/client/client.go b/lib/relay/client/client.go index 22489f58..9ece6088 100644 --- a/lib/relay/client/client.go +++ b/lib/relay/client/client.go @@ -9,6 +9,10 @@ import ( "time" "github.com/syncthing/syncthing/lib/relay/protocol" + "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" + + "github.com/thejerf/suture" ) type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient @@ -22,8 +26,7 @@ var ( ) type RelayClient interface { - Serve() - Stop() + suture.Service Error() error Latency() time.Duration String() string @@ -39,3 +42,42 @@ func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol. return factory(uri, certs, invitations, timeout), nil } + +type commonClient struct { + util.ServiceWithError + + invitations chan protocol.SessionInvitation + closeInvitationsOnFinish bool + mut sync.RWMutex +} + +func newCommonClient(invitations chan protocol.SessionInvitation, serve func(chan struct{}) error) commonClient { + c := commonClient{ + invitations: invitations, + mut: sync.NewRWMutex(), + } + newServe := func(stop chan struct{}) error { + defer c.cleanup() + return serve(stop) + } + c.ServiceWithError = util.AsServiceWithError(newServe) + if c.invitations == nil { + c.closeInvitationsOnFinish = true + c.invitations = make(chan protocol.SessionInvitation) + } + return c +} + +func (c *commonClient) cleanup() { + c.mut.Lock() + if c.closeInvitationsOnFinish { + close(c.invitations) + } + c.mut.Unlock() +} + +func (c *commonClient) Invitations() chan protocol.SessionInvitation { + c.mut.RLock() + defer c.mut.RUnlock() + return c.invitations +} diff --git a/lib/relay/client/dynamic.go b/lib/relay/client/dynamic.go index 17400b78..7c07c2b5 100644 --- a/lib/relay/client/dynamic.go +++ b/lib/relay/client/dynamic.go @@ -14,45 +14,29 @@ import ( "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/relay/protocol" - "github.com/syncthing/syncthing/lib/sync" ) type dynamicClient struct { - pooladdr *url.URL - certs []tls.Certificate - invitations chan protocol.SessionInvitation - closeInvitationsOnFinish bool - timeout time.Duration + commonClient + + pooladdr *url.URL + certs []tls.Certificate + timeout time.Duration - mut sync.RWMutex - err error client RelayClient - stop chan struct{} } func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient { - closeInvitationsOnFinish := false - if invitations == nil { - closeInvitationsOnFinish = true - invitations = make(chan protocol.SessionInvitation) - } - return &dynamicClient{ - pooladdr: uri, - certs: certs, - invitations: invitations, - closeInvitationsOnFinish: closeInvitationsOnFinish, - timeout: timeout, - - mut: sync.NewRWMutex(), + c := &dynamicClient{ + pooladdr: uri, + certs: certs, + timeout: timeout, } + c.commonClient = newCommonClient(invitations, c.serve) + return c } -func (c *dynamicClient) Serve() { - defer c.cleanup() - c.mut.Lock() - c.stop = make(chan struct{}) - c.mut.Unlock() - +func (c *dynamicClient) serve(stop chan struct{}) error { uri := *c.pooladdr // Trim off the `dynamic+` prefix @@ -63,8 +47,7 @@ func (c *dynamicClient) Serve() { data, err := http.Get(uri.String()) if err != nil { l.Debugln(c, "failed to lookup dynamic relays", err) - c.setError(err) - return + return err } var ann dynamicAnnouncement @@ -72,8 +55,7 @@ func (c *dynamicClient) Serve() { data.Body.Close() if err != nil { l.Debugln(c, "failed to lookup dynamic relays", err) - c.setError(err) - return + return err } var addrs []string @@ -87,22 +69,26 @@ func (c *dynamicClient) Serve() { addrs = append(addrs, ruri.String()) } + defer func() { + c.mut.RLock() + if c.client != nil { + c.client.Stop() + } + c.mut.RUnlock() + }() + for _, addr := range relayAddressesOrder(addrs) { select { - case <-c.stop: + case <-stop: l.Debugln(c, "stopping") - c.setError(nil) - return + return nil default: ruri, err := url.Parse(addr) if err != nil { l.Debugln(c, "skipping relay", addr, err) continue } - client, err := NewClient(ruri, c.certs, c.invitations, c.timeout) - if err != nil { - continue - } + client := newStaticClient(ruri, c.certs, c.invitations, c.timeout) c.mut.Lock() c.client = client c.mut.Unlock() @@ -115,24 +101,14 @@ func (c *dynamicClient) Serve() { } } l.Debugln(c, "could not find a connectable relay") - c.setError(fmt.Errorf("could not find a connectable relay")) -} - -func (c *dynamicClient) Stop() { - c.mut.RLock() - defer c.mut.RUnlock() - close(c.stop) - if c.client == nil { - return - } - c.client.Stop() + return fmt.Errorf("could not find a connectable relay") } func (c *dynamicClient) Error() error { c.mut.RLock() defer c.mut.RUnlock() if c.client == nil { - return c.err + return c.Error() } return c.client.Error() } @@ -159,28 +135,6 @@ func (c *dynamicClient) URI() *url.URL { return c.client.URI() } -func (c *dynamicClient) Invitations() chan protocol.SessionInvitation { - c.mut.RLock() - inv := c.invitations - c.mut.RUnlock() - return inv -} - -func (c *dynamicClient) cleanup() { - c.mut.Lock() - if c.closeInvitationsOnFinish { - close(c.invitations) - c.invitations = make(chan protocol.SessionInvitation) - } - c.mut.Unlock() -} - -func (c *dynamicClient) setError(err error) { - c.mut.Lock() - c.err = err - c.mut.Unlock() -} - // This is the announcement received from the relay server; // {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]} type dynamicAnnouncement struct { diff --git a/lib/relay/client/static.go b/lib/relay/client/static.go index 67e5ef18..dc023aa5 100644 --- a/lib/relay/client/static.go +++ b/lib/relay/client/static.go @@ -12,88 +12,54 @@ import ( "github.com/syncthing/syncthing/lib/dialer" syncthingprotocol "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/relay/protocol" - "github.com/syncthing/syncthing/lib/sync" ) type staticClient struct { - uri *url.URL - invitations chan protocol.SessionInvitation + commonClient - closeInvitationsOnFinish bool + uri *url.URL config *tls.Config messageTimeout time.Duration connectTimeout time.Duration - stop chan struct{} - stopped chan struct{} - stopMut sync.RWMutex - conn *tls.Conn - mut sync.RWMutex - err error connected bool latency time.Duration } func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient { - closeInvitationsOnFinish := false - if invitations == nil { - closeInvitationsOnFinish = true - invitations = make(chan protocol.SessionInvitation) - } - - stopped := make(chan struct{}) - close(stopped) // not yet started, don't block on Stop() - return &staticClient{ - uri: uri, - invitations: invitations, - - closeInvitationsOnFinish: closeInvitationsOnFinish, + c := &staticClient{ + uri: uri, config: configForCerts(certs), messageTimeout: time.Minute * 2, connectTimeout: timeout, - - stop: make(chan struct{}), - stopped: stopped, - stopMut: sync.NewRWMutex(), - - mut: sync.NewRWMutex(), } + c.commonClient = newCommonClient(invitations, c.serve) + return c } -func (c *staticClient) Serve() { - defer c.cleanup() - c.stopMut.Lock() - c.stop = make(chan struct{}) - c.stopped = make(chan struct{}) - c.stopMut.Unlock() - defer close(c.stopped) - +func (c *staticClient) serve(stop chan struct{}) error { if err := c.connect(); err != nil { l.Infof("Could not connect to relay %s: %s", c.uri, err) - c.setError(err) - return + return err } l.Debugln(c, "connected", c.conn.RemoteAddr()) + defer c.disconnect() if err := c.join(); err != nil { - c.conn.Close() l.Infof("Could not join relay %s: %s", c.uri, err) - c.setError(err) - return + return err } if err := c.conn.SetDeadline(time.Time{}); err != nil { - c.conn.Close() l.Infoln("Relay set deadline:", err) - c.setError(err) - return + return err } l.Infof("Joined relay %s://%s", c.uri.Scheme, c.uri.Host) @@ -106,12 +72,10 @@ func (c *staticClient) Serve() { messages := make(chan interface{}) errors := make(chan error, 1) - go messageReader(c.conn, messages, errors) + go messageReader(c.conn, messages, errors, stop) timeout := time.NewTimer(c.messageTimeout) - c.stopMut.RLock() - defer c.stopMut.RUnlock() for { select { case message := <-messages: @@ -122,11 +86,9 @@ func (c *staticClient) Serve() { case protocol.Ping: if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil { l.Infoln("Relay write:", err) - c.setError(err) - c.disconnect() - } else { - l.Debugln(c, "sent pong") + return err } + l.Debugln(c, "sent pong") case protocol.SessionInvitation: ip := net.IP(msg.Address) @@ -137,52 +99,28 @@ func (c *staticClient) Serve() { case protocol.RelayFull: l.Infof("Disconnected from relay %s due to it becoming full.", c.uri) - c.setError(fmt.Errorf("Relay full")) - c.disconnect() + return fmt.Errorf("relay full") default: l.Infoln("Relay: protocol error: unexpected message %v", msg) - c.setError(fmt.Errorf("protocol error: unexpected message %v", msg)) - c.disconnect() + return fmt.Errorf("protocol error: unexpected message %v", msg) } - case <-c.stop: + case <-stop: l.Debugln(c, "stopping") - c.setError(nil) - c.disconnect() + return nil - // We always exit via this branch of the select, to make sure the - // the reader routine exits. case err := <-errors: - close(errors) - close(messages) - c.mut.Lock() - if c.connected { - c.conn.Close() - c.connected = false - l.Infof("Disconnecting from relay %s due to error: %s", c.uri, err) - c.err = err - } else { - c.err = nil - } - c.mut.Unlock() - return + l.Infof("Disconnecting from relay %s due to error: %s", c.uri, err) + return err case <-timeout.C: l.Debugln(c, "timed out") - c.disconnect() - c.setError(fmt.Errorf("timed out")) + return fmt.Errorf("timed out") } } } -func (c *staticClient) Stop() { - c.stopMut.RLock() - close(c.stop) - <-c.stopped - c.stopMut.RUnlock() -} - func (c *staticClient) StatusOK() bool { c.mut.RLock() con := c.connected @@ -205,22 +143,6 @@ func (c *staticClient) URI() *url.URL { return c.uri } -func (c *staticClient) Invitations() chan protocol.SessionInvitation { - c.mut.RLock() - inv := c.invitations - c.mut.RUnlock() - return inv -} - -func (c *staticClient) cleanup() { - c.mut.Lock() - if c.closeInvitationsOnFinish { - close(c.invitations) - c.invitations = make(chan protocol.SessionInvitation) - } - c.mut.Unlock() -} - func (c *staticClient) connect() error { if c.uri.Scheme != "relay" { return fmt.Errorf("Unsupported relay schema: %v", c.uri.Scheme) @@ -261,19 +183,6 @@ func (c *staticClient) disconnect() { c.conn.Close() } -func (c *staticClient) setError(err error) { - c.mut.Lock() - c.err = err - c.mut.Unlock() -} - -func (c *staticClient) Error() error { - c.mut.RLock() - err := c.err - c.mut.RUnlock() - return err -} - func (c *staticClient) join() error { if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil { return err @@ -332,13 +241,17 @@ func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error { return nil } -func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { +func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error, stop chan struct{}) { for { msg, err := protocol.ReadMessage(conn) if err != nil { errors <- err return } - messages <- msg + select { + case messages <- msg: + case <-stop: + return + } } } diff --git a/lib/stun/stun.go b/lib/stun/stun.go index bcf18e75..f2b299ce 100644 --- a/lib/stun/stun.go +++ b/lib/stun/stun.go @@ -13,7 +13,10 @@ import ( "github.com/AudriusButkevicius/pfilter" "github.com/ccding/go-stun/stun" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/util" ) const stunRetryInterval = 5 * time.Minute @@ -56,6 +59,8 @@ type Subscriber interface { } type Service struct { + suture.Service + name string cfg config.Wrapper subscriber Subscriber @@ -66,8 +71,6 @@ type Service struct { natType NATType addr *Host - - stop chan struct{} } func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Service, net.PacketConn) { @@ -88,7 +91,7 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi client.SetSoftwareName("") // Explicitly unset this, seems to freak some servers out. // Return the service and the other conn to the client - return &Service{ + s := &Service{ name: "Stun@" + conn.LocalAddr().Network() + "://" + conn.LocalAddr().String(), cfg: cfg, @@ -100,16 +103,17 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi natType: NATUnknown, addr: nil, - stop: make(chan struct{}), - }, otherDataConn + } + s.Service = util.AsService(s.serve) + return s, otherDataConn } func (s *Service) Stop() { - close(s.stop) + s.Service.Stop() _ = s.stunConn.Close() } -func (s *Service) Serve() { +func (s *Service) serve(stop chan struct{}) { for { disabled: s.setNATType(NATUnknown) @@ -117,7 +121,7 @@ func (s *Service) Serve() { if s.cfg.Options().IsStunDisabled() { select { - case <-s.stop: + case <-stop: return case <-time.After(time.Second): continue @@ -130,12 +134,12 @@ func (s *Service) Serve() { // This blocks until we hit an exit condition or there are issues with the STUN server. // This returns a boolean signifying if a different STUN server should be tried (oppose to the whole thing // shutting down and this winding itself down. - if !s.runStunForServer(addr) { + if !s.runStunForServer(addr, stop) { // Check exit conditions. // Have we been asked to stop? select { - case <-s.stop: + case <-stop: return default: } @@ -163,7 +167,7 @@ func (s *Service) Serve() { } } -func (s *Service) runStunForServer(addr string) (tryNext bool) { +func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext bool) { l.Debugf("Running stun for %s via %s", s, addr) // Resolve the address, so that in case the server advertises two @@ -201,10 +205,10 @@ func (s *Service) runStunForServer(addr string) (tryNext bool) { return false } - return s.stunKeepAlive(addr, extAddr) + return s.stunKeepAlive(addr, extAddr, stop) } -func (s *Service) stunKeepAlive(addr string, extAddr *Host) (tryNext bool) { +func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{}) (tryNext bool) { var err error nextSleep := time.Duration(s.cfg.Options().StunKeepaliveStartS) * time.Second @@ -247,7 +251,7 @@ func (s *Service) stunKeepAlive(addr string, extAddr *Host) (tryNext bool) { select { case <-time.After(sleepFor): - case <-s.stop: + case <-stop: l.Debugf("%s stopping, aborting stun", s) return false } diff --git a/lib/ur/usage_report.go b/lib/ur/usage_report.go index 643d08c2..8020ae02 100644 --- a/lib/ur/usage_report.go +++ b/lib/ur/usage_report.go @@ -17,7 +17,6 @@ import ( "runtime" "sort" "strings" - "sync" "time" "github.com/syncthing/syncthing/lib/build" @@ -28,6 +27,9 @@ import ( "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/upgrade" + "github.com/syncthing/syncthing/lib/util" + + "github.com/thejerf/suture" ) // Current version number of the usage report, for acceptance purposes. If @@ -38,14 +40,12 @@ const Version = 3 var StartTime = time.Now() type Service struct { + suture.Service cfg config.Wrapper model model.Model connectionsService connections.Service noUpgrade bool forceRun chan struct{} - stop chan struct{} - stopped chan struct{} - stopMut sync.RWMutex } func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service { @@ -54,11 +54,9 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi model: m, connectionsService: connectionsService, noUpgrade: noUpgrade, - forceRun: make(chan struct{}), - stop: make(chan struct{}), - stopped: make(chan struct{}), + forceRun: make(chan struct{}, 1), // Buffered to prevent locking } - close(svc.stopped) // Not yet running, dont block on Stop() + svc.Service = util.AsService(svc.serve) cfg.Subscribe(svc) return svc } @@ -385,20 +383,11 @@ func (s *Service) sendUsageReport() error { return err } -func (s *Service) Serve() { - s.stopMut.Lock() - s.stop = make(chan struct{}) - s.stopped = make(chan struct{}) - s.stopMut.Unlock() +func (s *Service) serve(stop chan struct{}) { t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second) - s.stopMut.RLock() - defer func() { - close(s.stopped) - s.stopMut.RUnlock() - }() for { select { - case <-s.stop: + case <-stop: return case <-s.forceRun: t.Reset(0) @@ -422,23 +411,16 @@ func (s *Service) VerifyConfiguration(from, to config.Configuration) error { func (s *Service) CommitConfiguration(from, to config.Configuration) bool { if from.Options.URAccepted != to.Options.URAccepted || from.Options.URUniqueID != to.Options.URUniqueID || from.Options.URURL != to.Options.URURL { - s.stopMut.RLock() select { case s.forceRun <- struct{}{}: - case <-s.stop: + default: + // s.forceRun is one buffered, so even though nothing + // was sent, a run will still happen after this point. } - s.stopMut.RUnlock() } return true } -func (s *Service) Stop() { - s.stopMut.RLock() - close(s.stop) - <-s.stopped - s.stopMut.RUnlock() -} - func (*Service) String() string { return "ur.Service" } diff --git a/lib/util/utils.go b/lib/util/utils.go index c2d9296f..8044ddf5 100644 --- a/lib/util/utils.go +++ b/lib/util/utils.go @@ -12,6 +12,10 @@ import ( "reflect" "strconv" "strings" + + "github.com/syncthing/syncthing/lib/sync" + + "github.com/thejerf/suture" ) type defaultParser interface { @@ -170,3 +174,73 @@ func Address(network, host string) string { } return u.String() } + +// AsService wraps the given function to implement suture.Service by calling +// that function on serve and closing the passed channel when Stop is called. +func AsService(fn func(stop chan struct{})) suture.Service { + return AsServiceWithError(func(stop chan struct{}) error { + fn(stop) + return nil + }) +} + +type ServiceWithError interface { + suture.Service + Error() error +} + +// AsServiceWithError does the same as AsService, except that it keeps track +// of an error returned by the given function. +func AsServiceWithError(fn func(stop chan struct{}) error) ServiceWithError { + s := &service{ + serve: fn, + stop: make(chan struct{}), + stopped: make(chan struct{}), + mut: sync.NewMutex(), + } + close(s.stopped) // not yet started, don't block on Stop() + return s +} + +type service struct { + serve func(stop chan struct{}) error + stop chan struct{} + stopped chan struct{} + err error + mut sync.Mutex +} + +func (s *service) Serve() { + s.mut.Lock() + select { + case <-s.stop: + s.mut.Unlock() + return + default: + } + s.err = nil + s.stopped = make(chan struct{}) + s.mut.Unlock() + + var err error + defer func() { + s.mut.Lock() + s.err = err + close(s.stopped) + s.mut.Unlock() + }() + err = s.serve(s.stop) +} + +func (s *service) Stop() { + s.mut.Lock() + close(s.stop) + s.mut.Unlock() + <-s.stopped +} + +func (s *service) Error() error { + s.mut.Lock() + defer s.mut.Unlock() + return s.err +} diff --git a/lib/versioner/staggered.go b/lib/versioner/staggered.go index c2068e88..3d708b05 100644 --- a/lib/versioner/staggered.go +++ b/lib/versioner/staggered.go @@ -11,8 +11,11 @@ import ( "strconv" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/sync" + "github.com/syncthing/syncthing/lib/util" ) func init() { @@ -26,13 +29,13 @@ type Interval struct { } type Staggered struct { + suture.Service cleanInterval int64 folderFs fs.Filesystem versionsFs fs.Filesystem interval [4]Interval mutex sync.Mutex - stop chan struct{} testCleanDone chan struct{} } @@ -61,14 +64,14 @@ func NewStaggered(folderID string, folderFs fs.Filesystem, params map[string]str {604800, maxAge}, // next year -> 1 week between versions }, mutex: sync.NewMutex(), - stop: make(chan struct{}), } + s.Service = util.AsService(s.serve) l.Debugf("instantiated %#v", s) return s } -func (v *Staggered) Serve() { +func (v *Staggered) serve(stop chan struct{}) { v.clean() if v.testCleanDone != nil { close(v.testCleanDone) @@ -80,16 +83,12 @@ func (v *Staggered) Serve() { select { case <-tck.C: v.clean() - case <-v.stop: + case <-stop: return } } } -func (v *Staggered) Stop() { - close(v.stop) -} - func (v *Staggered) clean() { l.Debugln("Versioner clean: Waiting for lock on", v.versionsFs) v.mutex.Lock() diff --git a/lib/versioner/trashcan.go b/lib/versioner/trashcan.go index f364c364..d1438a5b 100644 --- a/lib/versioner/trashcan.go +++ b/lib/versioner/trashcan.go @@ -11,7 +11,10 @@ import ( "strconv" "time" + "github.com/thejerf/suture" + "github.com/syncthing/syncthing/lib/fs" + "github.com/syncthing/syncthing/lib/util" ) func init() { @@ -20,10 +23,10 @@ func init() { } type Trashcan struct { + suture.Service folderFs fs.Filesystem versionsFs fs.Filesystem cleanoutDays int - stop chan struct{} } func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]string) Versioner { @@ -34,8 +37,8 @@ func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]stri folderFs: folderFs, versionsFs: fsFromParams(folderFs, params), cleanoutDays: cleanoutDays, - stop: make(chan struct{}), } + s.Service = util.AsService(s.serve) l.Debugf("instantiated %#v", s) return s @@ -49,7 +52,7 @@ func (t *Trashcan) Archive(filePath string) error { }) } -func (t *Trashcan) Serve() { +func (t *Trashcan) serve(stop chan struct{}) { l.Debugln(t, "starting") defer l.Debugln(t, "stopping") @@ -59,7 +62,7 @@ func (t *Trashcan) Serve() { for { select { - case <-t.stop: + case <-stop: return case <-timer.C: @@ -75,10 +78,6 @@ func (t *Trashcan) Serve() { } } -func (t *Trashcan) Stop() { - close(t.stop) -} - func (t *Trashcan) String() string { return fmt.Sprintf("trashcan@%p", t) }