cmd/syncthing, lib/relay: Fixes regarding stopping of services (#5293)

This commit is contained in:
Simon Frei 2018-11-07 11:05:07 +01:00 committed by Jakob Borg
parent d510e3cca3
commit 603da2dce2
2 changed files with 33 additions and 7 deletions

View File

@ -17,6 +17,7 @@ import (
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
@ -326,6 +327,8 @@ type usageReportingService struct {
connectionsService *connections.Service connectionsService *connections.Service
forceRun chan struct{} forceRun chan struct{}
stop chan struct{} stop chan struct{}
stopped chan struct{}
stopMut sync.RWMutex
} }
func newUsageReportingService(cfg *config.Wrapper, model *model.Model, connectionsService *connections.Service) *usageReportingService { func newUsageReportingService(cfg *config.Wrapper, model *model.Model, connectionsService *connections.Service) *usageReportingService {
@ -335,7 +338,9 @@ func newUsageReportingService(cfg *config.Wrapper, model *model.Model, connectio
connectionsService: connectionsService, connectionsService: connectionsService,
forceRun: make(chan struct{}), forceRun: make(chan struct{}),
stop: make(chan struct{}), stop: make(chan struct{}),
stopped: make(chan struct{}),
} }
close(svc.stopped) // Not yet running, dont block on Stop()
cfg.Subscribe(svc) cfg.Subscribe(svc)
return svc return svc
} }
@ -359,8 +364,16 @@ func (s *usageReportingService) sendUsageReport() error {
} }
func (s *usageReportingService) Serve() { func (s *usageReportingService) Serve() {
s.stopMut.Lock()
s.stop = make(chan struct{}) s.stop = make(chan struct{})
s.stopped = make(chan struct{})
s.stopMut.Unlock()
t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second) t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second)
s.stopMut.RLock()
defer func() {
close(s.stopped)
s.stopMut.RUnlock()
}()
for { for {
select { select {
case <-s.stop: case <-s.stop:
@ -387,14 +400,21 @@ func (s *usageReportingService) VerifyConfiguration(from, to config.Configuratio
func (s *usageReportingService) CommitConfiguration(from, to config.Configuration) bool { func (s *usageReportingService) CommitConfiguration(from, to config.Configuration) bool {
if from.Options.URAccepted != to.Options.URAccepted || from.Options.URUniqueID != to.Options.URUniqueID || from.Options.URURL != to.Options.URURL { if from.Options.URAccepted != to.Options.URAccepted || from.Options.URUniqueID != to.Options.URUniqueID || from.Options.URURL != to.Options.URURL {
s.forceRun <- struct{}{} s.stopMut.RLock()
select {
case s.forceRun <- struct{}{}:
case <-s.stop:
}
s.stopMut.RUnlock()
} }
return true return true
} }
func (s *usageReportingService) Stop() { func (s *usageReportingService) Stop() {
s.stopMut.RLock()
close(s.stop) close(s.stop)
close(s.forceRun) <-s.stopped
s.stopMut.RUnlock()
} }
func (usageReportingService) String() string { func (usageReportingService) String() string {

View File

@ -28,6 +28,7 @@ type staticClient struct {
stop chan struct{} stop chan struct{}
stopped chan struct{} stopped chan struct{}
stopMut sync.RWMutex
conn *tls.Conn conn *tls.Conn
@ -44,6 +45,8 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
invitations = make(chan protocol.SessionInvitation) invitations = make(chan protocol.SessionInvitation)
} }
stopped := make(chan struct{})
close(stopped) // not yet started, don't block on Stop()
return &staticClient{ return &staticClient{
uri: uri, uri: uri,
invitations: invitations, invitations: invitations,
@ -56,7 +59,8 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
connectTimeout: timeout, connectTimeout: timeout,
stop: make(chan struct{}), stop: make(chan struct{}),
stopped: make(chan struct{}), stopped: stopped,
stopMut: sync.NewRWMutex(),
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
} }
@ -64,8 +68,10 @@ func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan pro
func (c *staticClient) Serve() { func (c *staticClient) Serve() {
defer c.cleanup() defer c.cleanup()
c.stopMut.Lock()
c.stop = make(chan struct{}) c.stop = make(chan struct{})
c.stopped = make(chan struct{}) c.stopped = make(chan struct{})
c.stopMut.Unlock()
defer close(c.stopped) defer close(c.stopped)
if err := c.connect(); err != nil { if err := c.connect(); err != nil {
@ -104,6 +110,8 @@ func (c *staticClient) Serve() {
timeout := time.NewTimer(c.messageTimeout) timeout := time.NewTimer(c.messageTimeout)
c.stopMut.RLock()
defer c.stopMut.RUnlock()
for { for {
select { select {
case message := <-messages: case message := <-messages:
@ -169,12 +177,10 @@ func (c *staticClient) Serve() {
} }
func (c *staticClient) Stop() { func (c *staticClient) Stop() {
if c.stop == nil { c.stopMut.RLock()
return
}
close(c.stop) close(c.stop)
<-c.stopped <-c.stopped
c.stopMut.RUnlock()
} }
func (c *staticClient) StatusOK() bool { func (c *staticClient) StatusOK() bool {