vendor: Update everything
GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4620
This commit is contained in:
220
vendor/github.com/thejerf/suture/supervisor.go
generated
vendored
220
vendor/github.com/thejerf/suture/supervisor.go
generated
vendored
@@ -19,6 +19,26 @@ const (
|
||||
type supervisorID uint32
|
||||
type serviceID uint32
|
||||
|
||||
type (
|
||||
// BadStopLogger is called when a service fails to properly stop
|
||||
BadStopLogger func(*Supervisor, Service, string)
|
||||
|
||||
// FailureLogger is called when a service fails
|
||||
FailureLogger func(
|
||||
supervisor *Supervisor,
|
||||
service Service,
|
||||
serviceName string,
|
||||
currentFailures float64,
|
||||
failureThreshold float64,
|
||||
restarting bool,
|
||||
error interface{},
|
||||
stacktrace []byte,
|
||||
)
|
||||
|
||||
// BackoffLogger is called when the supervisor enters or exits backoff mode
|
||||
BackoffLogger func(s *Supervisor, entering bool)
|
||||
)
|
||||
|
||||
var currentSupervisorIDL sync.Mutex
|
||||
var currentSupervisorID uint32
|
||||
|
||||
@@ -26,6 +46,10 @@ var currentSupervisorID uint32
|
||||
// if you pass a ServiceToken from the wrong Supervisor.
|
||||
var ErrWrongSupervisor = errors.New("wrong supervisor for this service token, no service removed")
|
||||
|
||||
// ErrTimeout is returned when an attempt to RemoveAndWait for a service to
|
||||
// stop has timed out.
|
||||
var ErrTimeout = errors.New("waiting for service to stop has timed out")
|
||||
|
||||
// ServiceToken is an opaque identifier that can be used to terminate a service that
|
||||
// has been Add()ed to a Supervisor.
|
||||
type ServiceToken struct {
|
||||
@@ -62,6 +86,9 @@ new goroutine. You do not want to just:
|
||||
because that will briefly create a race condition as it starts up, if you
|
||||
try to .Add() services immediately afterward.
|
||||
|
||||
The various Log function should only be modified while the Supervisor is
|
||||
not running, to prevent race conditions.
|
||||
|
||||
*/
|
||||
type Supervisor struct {
|
||||
Name string
|
||||
@@ -82,16 +109,9 @@ type Supervisor struct {
|
||||
liveness chan struct{}
|
||||
resumeTimer <-chan time.Time
|
||||
|
||||
// The testing uses the ability to grab these individual logging functions
|
||||
// and get inside of suture's handling at a deep level.
|
||||
// If you ever come up with some need to get into these, submit a pull
|
||||
// request to make them public and some smidge of justification, and
|
||||
// I'll happily do it.
|
||||
// But since I've now changed the signature on these once, I'm glad I
|
||||
// didn't start with them public... :)
|
||||
logBadStop func(*Supervisor, Service, string)
|
||||
logFailure func(supervisor *Supervisor, service Service, serviceName string, currentFailures float64, failureThreshold float64, restarting bool, error interface{}, stacktrace []byte)
|
||||
logBackoff func(*Supervisor, bool)
|
||||
LogBadStop BadStopLogger
|
||||
LogFailure FailureLogger
|
||||
LogBackoff BackoffLogger
|
||||
|
||||
// avoid a dependency on github.com/thejerf/abtime by just implementing
|
||||
// a minimal chunk.
|
||||
@@ -110,6 +130,9 @@ type Spec struct {
|
||||
FailureThreshold float64
|
||||
FailureBackoff time.Duration
|
||||
Timeout time.Duration
|
||||
LogBadStop BadStopLogger
|
||||
LogFailure FailureLogger
|
||||
LogBackoff BackoffLogger
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -204,27 +227,63 @@ func New(name string, spec Spec) (s *Supervisor) {
|
||||
s.resumeTimer = make(chan time.Time)
|
||||
|
||||
// set up the default logging handlers
|
||||
s.logBadStop = func(supervisor *Supervisor, service Service, name string) {
|
||||
s.log(fmt.Sprintf("%s: Service %s failed to terminate in a timely manner", supervisor.Name, name))
|
||||
}
|
||||
s.logFailure = func(supervisor *Supervisor, service Service, serviceName string, failures float64, threshold float64, restarting bool, err interface{}, st []byte) {
|
||||
var errString string
|
||||
|
||||
e, canError := err.(error)
|
||||
if canError {
|
||||
errString = e.Error()
|
||||
} else {
|
||||
errString = fmt.Sprintf("%#v", err)
|
||||
if spec.LogBadStop == nil {
|
||||
s.LogBadStop = func(sup *Supervisor, _ Service, name string) {
|
||||
s.log(fmt.Sprintf(
|
||||
"%s: Service %s failed to terminate in a timely manner",
|
||||
sup.Name,
|
||||
name,
|
||||
))
|
||||
}
|
||||
|
||||
s.log(fmt.Sprintf("%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s", supervisor.Name, serviceName, failures, threshold, restarting, errString, string(st)))
|
||||
} else {
|
||||
s.LogBadStop = spec.LogBadStop
|
||||
}
|
||||
s.logBackoff = func(s *Supervisor, entering bool) {
|
||||
if entering {
|
||||
s.log("Entering the backoff state.")
|
||||
} else {
|
||||
s.log("Exiting backoff state.")
|
||||
|
||||
if spec.LogFailure == nil {
|
||||
s.LogFailure = func(
|
||||
sup *Supervisor,
|
||||
_ Service,
|
||||
svcName string,
|
||||
f float64,
|
||||
thresh float64,
|
||||
restarting bool,
|
||||
err interface{},
|
||||
st []byte,
|
||||
) {
|
||||
var errString string
|
||||
|
||||
e, canError := err.(error)
|
||||
if canError {
|
||||
errString = e.Error()
|
||||
} else {
|
||||
errString = fmt.Sprintf("%#v", err)
|
||||
}
|
||||
|
||||
s.log(fmt.Sprintf(
|
||||
"%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s",
|
||||
sup.Name,
|
||||
svcName,
|
||||
f,
|
||||
thresh,
|
||||
restarting,
|
||||
errString,
|
||||
string(st),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
s.LogFailure = spec.LogFailure
|
||||
}
|
||||
|
||||
if spec.LogBackoff == nil {
|
||||
s.LogBackoff = func(s *Supervisor, entering bool) {
|
||||
if entering {
|
||||
s.log("Entering the backoff state.")
|
||||
} else {
|
||||
s.log("Exiting backoff state.")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.LogBackoff = spec.LogBackoff
|
||||
}
|
||||
|
||||
return
|
||||
@@ -259,7 +318,8 @@ to terminate the service.
|
||||
As a special behavior, if the service added is itself a supervisor, the
|
||||
supervisor being added will copy the Log function from the Supervisor it
|
||||
is being added to. This allows factoring out providing a Supervisor
|
||||
from its logging.
|
||||
from its logging. This unconditionally overwrites the child Supervisor's
|
||||
logging functions.
|
||||
|
||||
*/
|
||||
func (s *Supervisor) Add(service Service) ServiceToken {
|
||||
@@ -268,9 +328,9 @@ func (s *Supervisor) Add(service Service) ServiceToken {
|
||||
}
|
||||
|
||||
if supervisor, isSupervisor := service.(*Supervisor); isSupervisor {
|
||||
supervisor.logBadStop = s.logBadStop
|
||||
supervisor.logFailure = s.logFailure
|
||||
supervisor.logBackoff = s.logBackoff
|
||||
supervisor.LogBadStop = s.LogBadStop
|
||||
supervisor.LogFailure = s.LogFailure
|
||||
supervisor.LogBackoff = s.LogBackoff
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
@@ -354,7 +414,7 @@ func (s *Supervisor) Serve() {
|
||||
|
||||
msg.response <- id
|
||||
case removeService:
|
||||
s.removeService(msg.id, s.control)
|
||||
s.removeService(msg.id, msg.notification, s.control)
|
||||
case serviceTerminated:
|
||||
delete(s.servicesShuttingDown, msg.id)
|
||||
case stopSupervisor:
|
||||
@@ -383,7 +443,7 @@ func (s *Supervisor) Serve() {
|
||||
s.state = normal
|
||||
s.Unlock()
|
||||
s.failures = 0
|
||||
s.logBackoff(s, false)
|
||||
s.LogBackoff(s, false)
|
||||
for _, id := range s.restartQueue {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
@@ -395,6 +455,17 @@ func (s *Supervisor) Serve() {
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the Supervisor.
|
||||
//
|
||||
// This function will not return until either all Services have stopped, or
|
||||
// they timeout after the timeout value given to the Supervisor at creation.
|
||||
func (s *Supervisor) Stop() {
|
||||
done := make(chan struct{})
|
||||
if s.sendControl(stopSupervisor{done}) {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktrace []byte) {
|
||||
now := s.getNow()
|
||||
|
||||
@@ -411,7 +482,7 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
|
||||
s.Lock()
|
||||
s.state = paused
|
||||
s.Unlock()
|
||||
s.logBackoff(s, true)
|
||||
s.LogBackoff(s, true)
|
||||
s.resumeTimer = s.getAfterChan(s.failureBackoff)
|
||||
}
|
||||
|
||||
@@ -430,12 +501,12 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
|
||||
s.Unlock()
|
||||
if curState == normal {
|
||||
s.runService(failedService.Service, id)
|
||||
s.logFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, true, err, stacktrace)
|
||||
s.LogFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, true, err, stacktrace)
|
||||
} else {
|
||||
// FIXME: When restarting, check that the service still
|
||||
// exists (it may have been stopped in the meantime)
|
||||
s.restartQueue = append(s.restartQueue, id)
|
||||
s.logFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, false, err, stacktrace)
|
||||
s.LogFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, false, err, stacktrace)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -457,7 +528,7 @@ func (s *Supervisor) runService(service Service, id serviceID) {
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Supervisor) removeService(id serviceID, removedChan chan supervisorMessage) {
|
||||
func (s *Supervisor) removeService(id serviceID, notificationChan chan struct{}, removedChan chan supervisorMessage) {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
delete(s.services, id)
|
||||
@@ -467,16 +538,23 @@ func (s *Supervisor) removeService(id serviceID, removedChan chan supervisorMess
|
||||
go func() {
|
||||
namedService.Service.Stop()
|
||||
successChan <- true
|
||||
if notificationChan != nil {
|
||||
notificationChan <- struct{}{}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-successChan:
|
||||
// Life is good!
|
||||
case <-s.getAfterChan(s.timeout):
|
||||
s.logBadStop(s, namedService.Service, namedService.name)
|
||||
s.LogBadStop(s, namedService.Service, namedService.name)
|
||||
}
|
||||
removedChan <- serviceTerminated{id}
|
||||
}()
|
||||
} else {
|
||||
if notificationChan != nil {
|
||||
notificationChan <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -496,15 +574,18 @@ func (s *Supervisor) stopSupervisor() {
|
||||
}
|
||||
|
||||
timeout := s.getAfterChan(s.timeout)
|
||||
SHUTTING_DOWN_SERVICES:
|
||||
for len(s.servicesShuttingDown) > 0 {
|
||||
select {
|
||||
case id := <-notifyDone:
|
||||
delete(s.servicesShuttingDown, id)
|
||||
case <-timeout:
|
||||
for _, namedService := range s.servicesShuttingDown {
|
||||
s.logBadStop(s, namedService.Service, namedService.name)
|
||||
s.LogBadStop(s, namedService.Service, namedService.name)
|
||||
}
|
||||
return
|
||||
|
||||
// failed remove statements will log the errors.
|
||||
break SHUTTING_DOWN_SERVICES
|
||||
}
|
||||
}
|
||||
|
||||
@@ -521,24 +602,72 @@ func (s *Supervisor) sendControl(sm supervisorMessage) bool {
|
||||
select {
|
||||
case s.control <- sm:
|
||||
return true
|
||||
case _, _ = (<-s.liveness):
|
||||
case _, _ = <-s.liveness:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Remove will remove the given service from the Supervisor, and attempt to Stop() it.
|
||||
The ServiceID token comes from the Add() call.
|
||||
The ServiceID token comes from the Add() call. This returns without waiting
|
||||
for the service to stop.
|
||||
*/
|
||||
func (s *Supervisor) Remove(id ServiceToken) error {
|
||||
sID := supervisorID(id.id >> 32)
|
||||
if sID != s.id {
|
||||
return ErrWrongSupervisor
|
||||
}
|
||||
s.sendControl(removeService{serviceID(id.id & 0xffffffff)})
|
||||
// no meaningful error handling if this is false
|
||||
_ = s.sendControl(removeService{serviceID(id.id & 0xffffffff), nil})
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
RemoveAndWait will remove the given service from the Supervisor and attempt
|
||||
to Stop() it. It will wait up to the given timeout value for the service to
|
||||
terminate. A timeout value of 0 means to wait forever.
|
||||
|
||||
If a nil error is returned from this function
|
||||
*/
|
||||
func (s *Supervisor) RemoveAndWait(id ServiceToken, timeout time.Duration) error {
|
||||
sID := supervisorID(id.id >> 32)
|
||||
if sID != s.id {
|
||||
return ErrWrongSupervisor
|
||||
}
|
||||
|
||||
var timeoutC <-chan time.Time
|
||||
|
||||
if timeout > 0 {
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
timeoutC = timer.C
|
||||
}
|
||||
|
||||
notificationC := make(chan struct{})
|
||||
|
||||
sentControl := s.sendControl(removeService{serviceID(id.id & 0xffffffff), notificationC})
|
||||
|
||||
if sentControl == false {
|
||||
return ErrTimeout
|
||||
}
|
||||
|
||||
select {
|
||||
case <-notificationC:
|
||||
// normal case; the service is terminated.
|
||||
return nil
|
||||
|
||||
// This occurs if the entire supervisor ends without the service
|
||||
// having terminated, and includes the timeout the supervisor
|
||||
// itself waited before closing the liveness channel.
|
||||
case _, _ = <-s.liveness:
|
||||
return ErrTimeout
|
||||
|
||||
// The local timeout.
|
||||
case <-timeoutC:
|
||||
return ErrTimeout
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Services returns a []Service containing a snapshot of the services this
|
||||
@@ -550,7 +679,6 @@ func (s *Supervisor) Services() []Service {
|
||||
|
||||
if s.sendControl(ls) {
|
||||
return <-ls.c
|
||||
} else {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user