all, vendor: Switch back to non-forked thejerf/suture (#5171)
This commit is contained in:
committed by
Audrius Butkevicius
parent
8aa2d8d92c
commit
9e00b619ab
19
vendor/github.com/thejerf/suture/LICENSE
generated
vendored
Normal file
19
vendor/github.com/thejerf/suture/LICENSE
generated
vendored
Normal file
@@ -0,0 +1,19 @@
|
||||
Copyright (c) 2014-2015 Barracuda Networks, Inc.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
54
vendor/github.com/thejerf/suture/doc.go
generated
vendored
Normal file
54
vendor/github.com/thejerf/suture/doc.go
generated
vendored
Normal file
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
|
||||
Package suture provides Erlang-like supervisor trees.
|
||||
|
||||
This implements Erlang-esque supervisor trees, as adapted for Go. This is
|
||||
intended to be an industrial-strength implementation, but it has not yet
|
||||
been deployed in a hostile environment. (It's headed there, though.)
|
||||
|
||||
Supervisor Tree -> SuTree -> suture -> holds your code together when it's
|
||||
trying to fall apart.
|
||||
|
||||
Why use Suture?
|
||||
|
||||
* You want to write bullet-resistant services that will remain available
|
||||
despite unforeseen failure.
|
||||
* You need the code to be smart enough not to consume 100% of the CPU
|
||||
restarting things.
|
||||
* You want to easily compose multiple such services in one program.
|
||||
* You want the Erlang programmers to stop lording their supervision
|
||||
trees over you.
|
||||
|
||||
Suture has 100% test coverage, and is golint clean. This doesn't prove it
|
||||
free of bugs, but it shows I care.
|
||||
|
||||
A blog post describing the design decisions is available at
|
||||
http://www.jerf.org/iri/post/2930 .
|
||||
|
||||
Using Suture
|
||||
|
||||
To idiomatically use Suture, create a Supervisor which is your top level
|
||||
"application" supervisor. This will often occur in your program's "main"
|
||||
function.
|
||||
|
||||
Create "Service"s, which implement the Service interface. .Add() them
|
||||
to your Supervisor. Supervisors are also services, so you can create a
|
||||
tree structure here, depending on the exact combination of restarts
|
||||
you want to create.
|
||||
|
||||
As a special case, when adding Supervisors to Supervisors, the "sub"
|
||||
supervisor will have the "super" supervisor's Log function copied.
|
||||
This allows you to set one log function on the "top" supervisor, and
|
||||
have it propagate down to all the sub-supervisors. This also allows
|
||||
libraries or modules to provide Supervisors without having to commit
|
||||
their users to a particular logging method.
|
||||
|
||||
Finally, as what is probably the last line of your main() function, call
|
||||
.Serve() on your top level supervisor. This will start all the services
|
||||
you've defined.
|
||||
|
||||
See the Example for an example, using a simple service that serves out
|
||||
incrementing integers.
|
||||
|
||||
*/
|
||||
package suture
|
||||
83
vendor/github.com/thejerf/suture/messages.go
generated
vendored
Normal file
83
vendor/github.com/thejerf/suture/messages.go
generated
vendored
Normal file
@@ -0,0 +1,83 @@
|
||||
package suture
|
||||
|
||||
// sum type pattern for type-safe message passing; see
|
||||
// http://www.jerf.org/iri/post/2917
|
||||
|
||||
type supervisorMessage interface {
|
||||
isSupervisorMessage()
|
||||
}
|
||||
|
||||
type listServices struct {
|
||||
c chan []Service
|
||||
}
|
||||
|
||||
func (ls listServices) isSupervisorMessage() {}
|
||||
|
||||
type removeService struct {
|
||||
id serviceID
|
||||
notification chan struct{}
|
||||
}
|
||||
|
||||
func (rs removeService) isSupervisorMessage() {}
|
||||
|
||||
func (s *Supervisor) sync() {
|
||||
s.control <- syncSupervisor{}
|
||||
}
|
||||
|
||||
type syncSupervisor struct {
|
||||
}
|
||||
|
||||
func (ss syncSupervisor) isSupervisorMessage() {}
|
||||
|
||||
func (s *Supervisor) fail(id serviceID, err interface{}, stacktrace []byte) {
|
||||
s.control <- serviceFailed{id, err, stacktrace}
|
||||
}
|
||||
|
||||
type serviceFailed struct {
|
||||
id serviceID
|
||||
err interface{}
|
||||
stacktrace []byte
|
||||
}
|
||||
|
||||
func (sf serviceFailed) isSupervisorMessage() {}
|
||||
|
||||
func (s *Supervisor) serviceEnded(id serviceID, complete bool) {
|
||||
s.sendControl(serviceEnded{id, complete})
|
||||
}
|
||||
|
||||
type serviceEnded struct {
|
||||
id serviceID
|
||||
complete bool
|
||||
}
|
||||
|
||||
func (s serviceEnded) isSupervisorMessage() {}
|
||||
|
||||
// added by the Add() method
|
||||
type addService struct {
|
||||
service Service
|
||||
name string
|
||||
response chan serviceID
|
||||
}
|
||||
|
||||
func (as addService) isSupervisorMessage() {}
|
||||
|
||||
type stopSupervisor struct {
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
func (ss stopSupervisor) isSupervisorMessage() {}
|
||||
|
||||
func (s *Supervisor) panic() {
|
||||
s.control <- panicSupervisor{}
|
||||
}
|
||||
|
||||
type serviceTerminated struct {
|
||||
id serviceID
|
||||
}
|
||||
|
||||
func (st serviceTerminated) isSupervisorMessage() {}
|
||||
|
||||
type panicSupervisor struct {
|
||||
}
|
||||
|
||||
func (ps panicSupervisor) isSupervisorMessage() {}
|
||||
87
vendor/github.com/thejerf/suture/service.go
generated
vendored
Normal file
87
vendor/github.com/thejerf/suture/service.go
generated
vendored
Normal file
@@ -0,0 +1,87 @@
|
||||
package suture
|
||||
|
||||
/*
|
||||
Service is the interface that describes a service to a Supervisor.
|
||||
|
||||
Serve Method
|
||||
|
||||
The Serve method is called by a Supervisor to start the service.
|
||||
The service should execute within the goroutine that this is
|
||||
called in. If this function either returns or panics, the Supervisor
|
||||
will call it again.
|
||||
|
||||
A Serve method SHOULD do as much cleanup of the state as possible,
|
||||
to prevent any corruption in the previous state from crashing the
|
||||
service again.
|
||||
|
||||
Stop Method
|
||||
|
||||
This method is used by the supervisor to stop the service. Calling this
|
||||
directly on a Service given to a Supervisor will simply result in the
|
||||
Service being restarted; use the Supervisor's .Remove(ServiceToken) method
|
||||
to stop a service. A supervisor will call .Stop() only once. Thus, it may
|
||||
be as destructive as it likes to get the service to stop.
|
||||
|
||||
Once Stop has been called on a Service, the Service SHOULD NOT be
|
||||
reused in any other supervisor! Because of the impossibility of
|
||||
guaranteeing that the service has actually stopped in Go, you can't
|
||||
prove that you won't be starting two goroutines using the exact
|
||||
same memory to store state, causing completely unpredictable behavior.
|
||||
|
||||
Stop should not return until the service has actually stopped.
|
||||
"Stopped" here is defined as "the service will stop servicing any
|
||||
further requests in the future". For instance, a common implementation
|
||||
is to receive a message on a dedicated "stop" channel and immediately
|
||||
returning. Once the stop command has been processed, the service is
|
||||
stopped.
|
||||
|
||||
Another common Stop implementation is to forcibly close an open socket
|
||||
or other resource, which will cause detectable errors to manifest in the
|
||||
service code. Bear in mind that to perfectly correctly use this
|
||||
approach requires a bit more work to handle the chance of a Stop
|
||||
command coming in before the resource has been created.
|
||||
|
||||
If a service does not Stop within the supervisor's timeout duration, a log
|
||||
entry will be made with a descriptive string to that effect. This does
|
||||
not guarantee that the service is hung; it may still get around to being
|
||||
properly stopped in the future. Until the service is fully stopped,
|
||||
both the service and the spawned goroutine trying to stop it will be
|
||||
"leaked".
|
||||
|
||||
Stringer Interface
|
||||
|
||||
When a Service is added to a Supervisor, the Supervisor will create a
|
||||
string representation of that service used for logging.
|
||||
|
||||
If you implement the fmt.Stringer interface, that will be used.
|
||||
|
||||
If you do not implement the fmt.Stringer interface, a default
|
||||
fmt.Sprintf("%#v") will be used.
|
||||
|
||||
Optional Interface
|
||||
|
||||
Services may optionally implement IsCompletable, which allows a service
|
||||
to indicate to a supervisor that it does not need to be restarted if
|
||||
it has terminated.
|
||||
|
||||
*/
|
||||
type Service interface {
|
||||
Serve()
|
||||
Stop()
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
IsCompletable is an optionally-implementable interface that allows a service
|
||||
to report to a supervisor that it does not need to be restarted because it
|
||||
has terminated normally. When a Service is going to be restarted, the
|
||||
supervisor will check for this method, and if Complete returns true, the
|
||||
service is removed from the supervisor instead of restarted.
|
||||
|
||||
This is only executed when the service is not running because it has
|
||||
terminated, and has not yet been restarted.
|
||||
|
||||
*/
|
||||
type IsCompletable interface {
|
||||
Complete() bool
|
||||
}
|
||||
707
vendor/github.com/thejerf/suture/supervisor.go
generated
vendored
Normal file
707
vendor/github.com/thejerf/suture/supervisor.go
generated
vendored
Normal file
@@ -0,0 +1,707 @@
|
||||
package suture
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
notRunning = iota
|
||||
normal
|
||||
paused
|
||||
)
|
||||
|
||||
type supervisorID uint32
|
||||
type serviceID uint32
|
||||
|
||||
type (
|
||||
// BadStopLogger is called when a service fails to properly stop
|
||||
BadStopLogger func(*Supervisor, Service, string)
|
||||
|
||||
// FailureLogger is called when a service fails
|
||||
FailureLogger func(
|
||||
supervisor *Supervisor,
|
||||
service Service,
|
||||
serviceName string,
|
||||
currentFailures float64,
|
||||
failureThreshold float64,
|
||||
restarting bool,
|
||||
error interface{},
|
||||
stacktrace []byte,
|
||||
)
|
||||
|
||||
// BackoffLogger is called when the supervisor enters or exits backoff mode
|
||||
BackoffLogger func(s *Supervisor, entering bool)
|
||||
)
|
||||
|
||||
var currentSupervisorIDL sync.Mutex
|
||||
var currentSupervisorID uint32
|
||||
|
||||
// ErrWrongSupervisor is returned by the (*Supervisor).Remove method
|
||||
// if you pass a ServiceToken from the wrong Supervisor.
|
||||
var ErrWrongSupervisor = errors.New("wrong supervisor for this service token, no service removed")
|
||||
|
||||
// ErrTimeout is returned when an attempt to RemoveAndWait for a service to
|
||||
// stop has timed out.
|
||||
var ErrTimeout = errors.New("waiting for service to stop has timed out")
|
||||
|
||||
// ServiceToken is an opaque identifier that can be used to terminate a service that
|
||||
// has been Add()ed to a Supervisor.
|
||||
type ServiceToken struct {
|
||||
id uint64
|
||||
}
|
||||
|
||||
type serviceWithName struct {
|
||||
Service Service
|
||||
name string
|
||||
}
|
||||
|
||||
/*
|
||||
Supervisor is the core type of the module that represents a Supervisor.
|
||||
|
||||
Supervisors should be constructed either by New or NewSimple.
|
||||
|
||||
Once constructed, a Supervisor should be started in one of three ways:
|
||||
|
||||
1. Calling .Serve().
|
||||
2. Calling .ServeBackground().
|
||||
3. Adding it to an existing Supervisor.
|
||||
|
||||
Calling Serve will cause the supervisor to run until it is shut down by
|
||||
an external user calling Stop() on it. If that never happens, it simply
|
||||
runs forever. I suggest creating your services in Supervisors, then making
|
||||
a Serve() call on your top-level Supervisor be the last line of your main
|
||||
func.
|
||||
|
||||
Calling ServeBackground will CORRECTLY start the supervisor running in a
|
||||
new goroutine. You do not want to just:
|
||||
|
||||
go supervisor.Serve()
|
||||
|
||||
because that will briefly create a race condition as it starts up, if you
|
||||
try to .Add() services immediately afterward.
|
||||
|
||||
The various Log function should only be modified while the Supervisor is
|
||||
not running, to prevent race conditions.
|
||||
|
||||
*/
|
||||
type Supervisor struct {
|
||||
Name string
|
||||
id supervisorID
|
||||
|
||||
failureDecay float64
|
||||
failureThreshold float64
|
||||
failureBackoff time.Duration
|
||||
timeout time.Duration
|
||||
log func(string)
|
||||
services map[serviceID]serviceWithName
|
||||
servicesShuttingDown map[serviceID]serviceWithName
|
||||
lastFail time.Time
|
||||
failures float64
|
||||
restartQueue []serviceID
|
||||
serviceCounter serviceID
|
||||
control chan supervisorMessage
|
||||
liveness chan struct{}
|
||||
resumeTimer <-chan time.Time
|
||||
recoverPanics bool
|
||||
|
||||
LogBadStop BadStopLogger
|
||||
LogFailure FailureLogger
|
||||
LogBackoff BackoffLogger
|
||||
|
||||
// avoid a dependency on github.com/thejerf/abtime by just implementing
|
||||
// a minimal chunk.
|
||||
getNow func() time.Time
|
||||
getAfterChan func(time.Duration) <-chan time.Time
|
||||
|
||||
sync.Mutex
|
||||
state uint8
|
||||
}
|
||||
|
||||
// Spec is used to pass arguments to the New function to create a
|
||||
// supervisor. See the New function for full documentation.
|
||||
type Spec struct {
|
||||
Log func(string)
|
||||
FailureDecay float64
|
||||
FailureThreshold float64
|
||||
FailureBackoff time.Duration
|
||||
Timeout time.Duration
|
||||
LogBadStop BadStopLogger
|
||||
LogFailure FailureLogger
|
||||
LogBackoff BackoffLogger
|
||||
PassThroughPanics bool
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
New is the full constructor function for a supervisor.
|
||||
|
||||
The name is a friendly human name for the supervisor, used in logging. Suture
|
||||
does not care if this is unique, but it is good for your sanity if it is.
|
||||
|
||||
If not set, the following values are used:
|
||||
|
||||
* Log: A function is created that uses log.Print.
|
||||
* FailureDecay: 30 seconds
|
||||
* FailureThreshold: 5 failures
|
||||
* FailureBackoff: 15 seconds
|
||||
* Timeout: 10 seconds
|
||||
|
||||
The Log function will be called when errors occur. Suture will log the
|
||||
following:
|
||||
|
||||
* When a service has failed, with a descriptive message about the
|
||||
current backoff status, and whether it was immediately restarted
|
||||
* When the supervisor has gone into its backoff mode, and when it
|
||||
exits it
|
||||
* When a service fails to stop
|
||||
|
||||
The failureRate, failureThreshold, and failureBackoff controls how failures
|
||||
are handled, in order to avoid the supervisor failure case where the
|
||||
program does nothing but restarting failed services. If you do not
|
||||
care how failures behave, the default values should be fine for the
|
||||
vast majority of services, but if you want the details:
|
||||
|
||||
The supervisor tracks the number of failures that have occurred, with an
|
||||
exponential decay on the count. Every FailureDecay seconds, the number of
|
||||
failures that have occurred is cut in half. (This is done smoothly with an
|
||||
exponential function.) When a failure occurs, the number of failures
|
||||
is incremented by one. When the number of failures passes the
|
||||
FailureThreshold, the entire service waits for FailureBackoff seconds
|
||||
before attempting any further restarts, at which point it resets its
|
||||
failure count to zero.
|
||||
|
||||
Timeout is how long Suture will wait for a service to properly terminate.
|
||||
|
||||
The PassThroughPanics options can be set to let panics in services propagate
|
||||
and crash the program, should this be desirable.
|
||||
|
||||
*/
|
||||
func New(name string, spec Spec) (s *Supervisor) {
|
||||
s = new(Supervisor)
|
||||
|
||||
s.Name = name
|
||||
currentSupervisorIDL.Lock()
|
||||
currentSupervisorID++
|
||||
s.id = supervisorID(currentSupervisorID)
|
||||
currentSupervisorIDL.Unlock()
|
||||
|
||||
if spec.Log == nil {
|
||||
s.log = func(msg string) {
|
||||
log.Print(fmt.Sprintf("Supervisor %s: %s", s.Name, msg))
|
||||
}
|
||||
} else {
|
||||
s.log = spec.Log
|
||||
}
|
||||
|
||||
if spec.FailureDecay == 0 {
|
||||
s.failureDecay = 30
|
||||
} else {
|
||||
s.failureDecay = spec.FailureDecay
|
||||
}
|
||||
if spec.FailureThreshold == 0 {
|
||||
s.failureThreshold = 5
|
||||
} else {
|
||||
s.failureThreshold = spec.FailureThreshold
|
||||
}
|
||||
if spec.FailureBackoff == 0 {
|
||||
s.failureBackoff = time.Second * 15
|
||||
} else {
|
||||
s.failureBackoff = spec.FailureBackoff
|
||||
}
|
||||
if spec.Timeout == 0 {
|
||||
s.timeout = time.Second * 10
|
||||
} else {
|
||||
s.timeout = spec.Timeout
|
||||
}
|
||||
s.recoverPanics = !spec.PassThroughPanics
|
||||
|
||||
// overriding these allows for testing the threshold behavior
|
||||
s.getNow = time.Now
|
||||
s.getAfterChan = time.After
|
||||
|
||||
s.control = make(chan supervisorMessage)
|
||||
s.liveness = make(chan struct{})
|
||||
s.services = make(map[serviceID]serviceWithName)
|
||||
s.servicesShuttingDown = make(map[serviceID]serviceWithName)
|
||||
s.restartQueue = make([]serviceID, 0, 1)
|
||||
s.resumeTimer = make(chan time.Time)
|
||||
|
||||
// set up the default logging handlers
|
||||
if spec.LogBadStop == nil {
|
||||
s.LogBadStop = func(sup *Supervisor, _ Service, name string) {
|
||||
s.log(fmt.Sprintf(
|
||||
"%s: Service %s failed to terminate in a timely manner",
|
||||
sup.Name,
|
||||
name,
|
||||
))
|
||||
}
|
||||
} else {
|
||||
s.LogBadStop = spec.LogBadStop
|
||||
}
|
||||
|
||||
if spec.LogFailure == nil {
|
||||
s.LogFailure = func(
|
||||
sup *Supervisor,
|
||||
_ Service,
|
||||
svcName string,
|
||||
f float64,
|
||||
thresh float64,
|
||||
restarting bool,
|
||||
err interface{},
|
||||
st []byte,
|
||||
) {
|
||||
var errString string
|
||||
|
||||
e, canError := err.(error)
|
||||
if canError {
|
||||
errString = e.Error()
|
||||
} else {
|
||||
errString = fmt.Sprintf("%#v", err)
|
||||
}
|
||||
|
||||
s.log(fmt.Sprintf(
|
||||
"%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s",
|
||||
sup.Name,
|
||||
svcName,
|
||||
f,
|
||||
thresh,
|
||||
restarting,
|
||||
errString,
|
||||
string(st),
|
||||
))
|
||||
}
|
||||
} else {
|
||||
s.LogFailure = spec.LogFailure
|
||||
}
|
||||
|
||||
if spec.LogBackoff == nil {
|
||||
s.LogBackoff = func(s *Supervisor, entering bool) {
|
||||
if entering {
|
||||
s.log("Entering the backoff state.")
|
||||
} else {
|
||||
s.log("Exiting backoff state.")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
s.LogBackoff = spec.LogBackoff
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func serviceName(service Service) (serviceName string) {
|
||||
stringer, canStringer := service.(fmt.Stringer)
|
||||
if canStringer {
|
||||
serviceName = stringer.String()
|
||||
} else {
|
||||
serviceName = fmt.Sprintf("%#v", service)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// NewSimple is a convenience function to create a service with just a name
|
||||
// and the sensible defaults.
|
||||
func NewSimple(name string) *Supervisor {
|
||||
return New(name, Spec{})
|
||||
}
|
||||
|
||||
/*
|
||||
Add adds a service to this supervisor.
|
||||
|
||||
If the supervisor is currently running, the service will be started
|
||||
immediately. If the supervisor is not currently running, the service
|
||||
will be started when the supervisor is.
|
||||
|
||||
The returned ServiceID may be passed to the Remove method of the Supervisor
|
||||
to terminate the service.
|
||||
|
||||
As a special behavior, if the service added is itself a supervisor, the
|
||||
supervisor being added will copy the Log function from the Supervisor it
|
||||
is being added to. This allows factoring out providing a Supervisor
|
||||
from its logging. This unconditionally overwrites the child Supervisor's
|
||||
logging functions.
|
||||
|
||||
*/
|
||||
func (s *Supervisor) Add(service Service) ServiceToken {
|
||||
if s == nil {
|
||||
panic("can't add service to nil *suture.Supervisor")
|
||||
}
|
||||
|
||||
if supervisor, isSupervisor := service.(*Supervisor); isSupervisor {
|
||||
supervisor.LogBadStop = s.LogBadStop
|
||||
supervisor.LogFailure = s.LogFailure
|
||||
supervisor.LogBackoff = s.LogBackoff
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
if s.state == notRunning {
|
||||
id := s.serviceCounter
|
||||
s.serviceCounter++
|
||||
|
||||
s.services[id] = serviceWithName{service, serviceName(service)}
|
||||
s.restartQueue = append(s.restartQueue, id)
|
||||
|
||||
s.Unlock()
|
||||
return ServiceToken{uint64(s.id)<<32 | uint64(id)}
|
||||
}
|
||||
s.Unlock()
|
||||
|
||||
response := make(chan serviceID)
|
||||
s.control <- addService{service, serviceName(service), response}
|
||||
return ServiceToken{uint64(s.id)<<32 | uint64(<-response)}
|
||||
}
|
||||
|
||||
// ServeBackground starts running a supervisor in its own goroutine. This
|
||||
// method does not return until it is safe to use .Add() on the Supervisor.
|
||||
func (s *Supervisor) ServeBackground() {
|
||||
go s.Serve()
|
||||
s.sync()
|
||||
}
|
||||
|
||||
/*
|
||||
Serve starts the supervisor. You should call this on the top-level supervisor,
|
||||
but nothing else.
|
||||
*/
|
||||
func (s *Supervisor) Serve() {
|
||||
if s == nil {
|
||||
panic("Can't serve with a nil *suture.Supervisor")
|
||||
}
|
||||
if s.id == 0 {
|
||||
panic("Can't call Serve on an incorrectly-constructed *suture.Supervisor")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
s.Lock()
|
||||
s.state = notRunning
|
||||
s.Unlock()
|
||||
}()
|
||||
|
||||
s.Lock()
|
||||
if s.state != notRunning {
|
||||
s.Unlock()
|
||||
panic("Running a supervisor while it is already running?")
|
||||
}
|
||||
|
||||
s.state = normal
|
||||
s.Unlock()
|
||||
|
||||
// for all the services I currently know about, start them
|
||||
for _, id := range s.restartQueue {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
s.runService(namedService.Service, id)
|
||||
}
|
||||
}
|
||||
s.restartQueue = make([]serviceID, 0, 1)
|
||||
|
||||
for {
|
||||
select {
|
||||
case m := <-s.control:
|
||||
switch msg := m.(type) {
|
||||
case serviceFailed:
|
||||
s.handleFailedService(msg.id, msg.err, msg.stacktrace)
|
||||
case serviceEnded:
|
||||
service, monitored := s.services[msg.id]
|
||||
if monitored {
|
||||
if msg.complete {
|
||||
delete(s.services, msg.id)
|
||||
go func() {
|
||||
service.Service.Stop()
|
||||
}()
|
||||
} else {
|
||||
s.handleFailedService(msg.id, fmt.Sprintf("%s returned unexpectedly", service), []byte("[unknown stack trace]"))
|
||||
}
|
||||
}
|
||||
case addService:
|
||||
id := s.serviceCounter
|
||||
s.serviceCounter++
|
||||
|
||||
s.services[id] = serviceWithName{msg.service, msg.name}
|
||||
s.runService(msg.service, id)
|
||||
|
||||
msg.response <- id
|
||||
case removeService:
|
||||
s.removeService(msg.id, msg.notification, s.control)
|
||||
case serviceTerminated:
|
||||
delete(s.servicesShuttingDown, msg.id)
|
||||
case stopSupervisor:
|
||||
s.stopSupervisor()
|
||||
msg.done <- struct{}{}
|
||||
return
|
||||
case listServices:
|
||||
services := []Service{}
|
||||
for _, service := range s.services {
|
||||
services = append(services, service.Service)
|
||||
}
|
||||
msg.c <- services
|
||||
case syncSupervisor:
|
||||
// this does nothing on purpose; its sole purpose is to
|
||||
// introduce a sync point via the channel receive
|
||||
case panicSupervisor:
|
||||
// used only by tests
|
||||
panic("Panicking as requested!")
|
||||
}
|
||||
case _ = <-s.resumeTimer:
|
||||
// We're resuming normal operation after a pause due to
|
||||
// excessive thrashing
|
||||
// FIXME: Ought to permit some spacing of these functions, rather
|
||||
// than simply hammering through them
|
||||
s.Lock()
|
||||
s.state = normal
|
||||
s.Unlock()
|
||||
s.failures = 0
|
||||
s.LogBackoff(s, false)
|
||||
for _, id := range s.restartQueue {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
s.runService(namedService.Service, id)
|
||||
}
|
||||
}
|
||||
s.restartQueue = make([]serviceID, 0, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the Supervisor.
|
||||
//
|
||||
// This function will not return until either all Services have stopped, or
|
||||
// they timeout after the timeout value given to the Supervisor at creation.
|
||||
func (s *Supervisor) Stop() {
|
||||
done := make(chan struct{})
|
||||
if s.sendControl(stopSupervisor{done}) {
|
||||
<-done
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktrace []byte) {
|
||||
now := s.getNow()
|
||||
|
||||
if s.lastFail.IsZero() {
|
||||
s.lastFail = now
|
||||
s.failures = 1.0
|
||||
} else {
|
||||
sinceLastFail := now.Sub(s.lastFail).Seconds()
|
||||
intervals := sinceLastFail / s.failureDecay
|
||||
s.failures = s.failures*math.Pow(.5, intervals) + 1
|
||||
}
|
||||
|
||||
if s.failures > s.failureThreshold {
|
||||
s.Lock()
|
||||
s.state = paused
|
||||
s.Unlock()
|
||||
s.LogBackoff(s, true)
|
||||
s.resumeTimer = s.getAfterChan(s.failureBackoff)
|
||||
}
|
||||
|
||||
s.lastFail = now
|
||||
|
||||
failedService, monitored := s.services[id]
|
||||
|
||||
// It is possible for a service to be no longer monitored
|
||||
// by the time we get here. In that case, just ignore it.
|
||||
if monitored {
|
||||
// this may look dangerous because the state could change, but this
|
||||
// code is only ever run in the one goroutine that is permitted to
|
||||
// change the state, so nothing else will.
|
||||
s.Lock()
|
||||
curState := s.state
|
||||
s.Unlock()
|
||||
if curState == normal {
|
||||
s.runService(failedService.Service, id)
|
||||
s.LogFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, true, err, stacktrace)
|
||||
} else {
|
||||
// FIXME: When restarting, check that the service still
|
||||
// exists (it may have been stopped in the meantime)
|
||||
s.restartQueue = append(s.restartQueue, id)
|
||||
s.LogFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, false, err, stacktrace)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) runService(service Service, id serviceID) {
|
||||
go func() {
|
||||
if s.recoverPanics {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
buf := make([]byte, 65535, 65535)
|
||||
written := runtime.Stack(buf, false)
|
||||
buf = buf[:written]
|
||||
s.fail(id, r, buf)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
service.Serve()
|
||||
|
||||
complete := false
|
||||
if completable, ok := service.(IsCompletable); ok && completable.Complete() {
|
||||
complete = true
|
||||
}
|
||||
|
||||
s.serviceEnded(id, complete)
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Supervisor) removeService(id serviceID, notificationChan chan struct{}, removedChan chan supervisorMessage) {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
delete(s.services, id)
|
||||
s.servicesShuttingDown[id] = namedService
|
||||
go func() {
|
||||
successChan := make(chan struct{})
|
||||
go func() {
|
||||
namedService.Service.Stop()
|
||||
close(successChan)
|
||||
if notificationChan != nil {
|
||||
notificationChan <- struct{}{}
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-successChan:
|
||||
// Life is good!
|
||||
case <-s.getAfterChan(s.timeout):
|
||||
s.LogBadStop(s, namedService.Service, namedService.name)
|
||||
}
|
||||
removedChan <- serviceTerminated{id}
|
||||
}()
|
||||
} else {
|
||||
if notificationChan != nil {
|
||||
notificationChan <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Supervisor) stopSupervisor() {
|
||||
notifyDone := make(chan serviceID)
|
||||
|
||||
for id := range s.services {
|
||||
namedService, present := s.services[id]
|
||||
if present {
|
||||
delete(s.services, id)
|
||||
s.servicesShuttingDown[id] = namedService
|
||||
go func(sID serviceID) {
|
||||
namedService.Service.Stop()
|
||||
notifyDone <- sID
|
||||
}(id)
|
||||
}
|
||||
}
|
||||
|
||||
timeout := s.getAfterChan(s.timeout)
|
||||
SHUTTING_DOWN_SERVICES:
|
||||
for len(s.servicesShuttingDown) > 0 {
|
||||
select {
|
||||
case id := <-notifyDone:
|
||||
delete(s.servicesShuttingDown, id)
|
||||
case <-timeout:
|
||||
for _, namedService := range s.servicesShuttingDown {
|
||||
s.LogBadStop(s, namedService.Service, namedService.name)
|
||||
}
|
||||
|
||||
// failed remove statements will log the errors.
|
||||
break SHUTTING_DOWN_SERVICES
|
||||
}
|
||||
}
|
||||
|
||||
close(s.liveness)
|
||||
return
|
||||
}
|
||||
|
||||
// String implements the fmt.Stringer interface.
|
||||
func (s *Supervisor) String() string {
|
||||
return s.Name
|
||||
}
|
||||
|
||||
func (s *Supervisor) sendControl(sm supervisorMessage) bool {
|
||||
select {
|
||||
case s.control <- sm:
|
||||
return true
|
||||
case _, _ = <-s.liveness:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Remove will remove the given service from the Supervisor, and attempt to Stop() it.
|
||||
The ServiceID token comes from the Add() call. This returns without waiting
|
||||
for the service to stop.
|
||||
*/
|
||||
func (s *Supervisor) Remove(id ServiceToken) error {
|
||||
sID := supervisorID(id.id >> 32)
|
||||
if sID != s.id {
|
||||
return ErrWrongSupervisor
|
||||
}
|
||||
// no meaningful error handling if this is false
|
||||
_ = s.sendControl(removeService{serviceID(id.id & 0xffffffff), nil})
|
||||
return nil
|
||||
}
|
||||
|
||||
/*
|
||||
RemoveAndWait will remove the given service from the Supervisor and attempt
|
||||
to Stop() it. It will wait up to the given timeout value for the service to
|
||||
terminate. A timeout value of 0 means to wait forever.
|
||||
|
||||
If a nil error is returned from this function, then the service was
|
||||
terminated normally. If either the supervisor terminates or the timeout
|
||||
passes, ErrTimeout is returned. (If this isn't even the right supervisor
|
||||
ErrWrongSupervisor is returned.)
|
||||
*/
|
||||
func (s *Supervisor) RemoveAndWait(id ServiceToken, timeout time.Duration) error {
|
||||
sID := supervisorID(id.id >> 32)
|
||||
if sID != s.id {
|
||||
return ErrWrongSupervisor
|
||||
}
|
||||
|
||||
var timeoutC <-chan time.Time
|
||||
|
||||
if timeout > 0 {
|
||||
timer := time.NewTimer(timeout)
|
||||
defer timer.Stop()
|
||||
timeoutC = timer.C
|
||||
}
|
||||
|
||||
notificationC := make(chan struct{})
|
||||
|
||||
sentControl := s.sendControl(removeService{serviceID(id.id & 0xffffffff), notificationC})
|
||||
|
||||
if sentControl == false {
|
||||
return ErrTimeout
|
||||
}
|
||||
|
||||
select {
|
||||
case <-notificationC:
|
||||
// normal case; the service is terminated.
|
||||
return nil
|
||||
|
||||
// This occurs if the entire supervisor ends without the service
|
||||
// having terminated, and includes the timeout the supervisor
|
||||
// itself waited before closing the liveness channel.
|
||||
case _, _ = <-s.liveness:
|
||||
return ErrTimeout
|
||||
|
||||
// The local timeout.
|
||||
case <-timeoutC:
|
||||
return ErrTimeout
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
Services returns a []Service containing a snapshot of the services this
|
||||
Supervisor is managing.
|
||||
|
||||
*/
|
||||
func (s *Supervisor) Services() []Service {
|
||||
ls := listServices{make(chan []Service)}
|
||||
|
||||
if s.sendControl(ls) {
|
||||
return <-ls.c
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user