|
|
@@ -1,56 +1,3 @@
|
|
|
-/*
|
|
|
-
|
|
|
-Package suture provides Erlang-like supervisor trees.
|
|
|
-
|
|
|
-This implements Erlang-esque supervisor trees, as adapted for Go. This is
|
|
|
-intended to be an industrial-strength implementation, but it has not yet
|
|
|
-been deployed in a hostile environment. (It's headed there, though.)
|
|
|
-
|
|
|
-Supervisor Tree -> SuTree -> suture -> holds your code together when it's
|
|
|
-trying to fall apart.
|
|
|
-
|
|
|
-Why use Suture?
|
|
|
-
|
|
|
- * You want to write bullet-resistant services that will remain available
|
|
|
- despite unforeseen failure.
|
|
|
- * You need the code to be smart enough not to consume 100% of the CPU
|
|
|
- restarting things.
|
|
|
- * You want to easily compose multiple such services in one program.
|
|
|
- * You want the Erlang programmers to stop lording their supervision
|
|
|
- trees over you.
|
|
|
-
|
|
|
-Suture has 100% test coverage, and is golint clean. This doesn't prove it
|
|
|
-free of bugs, but it shows I care.
|
|
|
-
|
|
|
-A blog post describing the design decisions is available at
|
|
|
-http://www.jerf.org/iri/post/2930 .
|
|
|
-
|
|
|
-Using Suture
|
|
|
-
|
|
|
-To idiomatically use Suture, create a Supervisor which is your top level
|
|
|
-"application" supervisor. This will often occur in your program's "main"
|
|
|
-function.
|
|
|
-
|
|
|
-Create "Service"s, which implement the Service interface. .Add() them
|
|
|
-to your Supervisor. Supervisors are also services, so you can create a
|
|
|
-tree structure here, depending on the exact combination of restarts
|
|
|
-you want to create.
|
|
|
-
|
|
|
-As a special case, when adding Supervisors to Supervisors, the "sub"
|
|
|
-supervisor will have the "super" supervisor's Log function copied.
|
|
|
-This allows you to set one log function on the "top" supervisor, and
|
|
|
-have it propagate down to all the sub-supervisors. This also allows
|
|
|
-libraries or modules to provide Supervisors without having to commit
|
|
|
-their users to a particular logging method.
|
|
|
-
|
|
|
-Finally, as what is probably the last line of your main() function, call
|
|
|
-.Serve() on your top level supervisor. This will start all the services
|
|
|
-you've defined.
|
|
|
-
|
|
|
-See the Example for an example, using a simple service that serves out
|
|
|
-incrementing integers.
|
|
|
-
|
|
|
-*/
|
|
|
package suture
|
|
|
|
|
|
import (
|
|
|
@@ -60,7 +7,6 @@ import (
|
|
|
"math"
|
|
|
"runtime"
|
|
|
"sync"
|
|
|
- "sync/atomic"
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
@@ -73,6 +19,7 @@ const (
|
|
|
type supervisorID uint32
|
|
|
type serviceID uint32
|
|
|
|
|
|
+var currentSupervisorIDL sync.Mutex
|
|
|
var currentSupervisorID uint32
|
|
|
|
|
|
// ErrWrongSupervisor is returned by the (*Supervisor).Remove method
|
|
|
@@ -85,6 +32,11 @@ type ServiceToken struct {
|
|
|
id uint64
|
|
|
}
|
|
|
|
|
|
+type serviceWithName struct {
|
|
|
+ Service Service
|
|
|
+ name string
|
|
|
+}
|
|
|
+
|
|
|
/*
|
|
|
Supervisor is the core type of the module that represents a Supervisor.
|
|
|
|
|
|
@@ -115,18 +67,20 @@ type Supervisor struct {
|
|
|
Name string
|
|
|
id supervisorID
|
|
|
|
|
|
- failureDecay float64
|
|
|
- failureThreshold float64
|
|
|
- failureBackoff time.Duration
|
|
|
- timeout time.Duration
|
|
|
- log func(string)
|
|
|
- services map[serviceID]Service
|
|
|
- lastFail time.Time
|
|
|
- failures float64
|
|
|
- restartQueue []serviceID
|
|
|
- serviceCounter serviceID
|
|
|
- control chan supervisorMessage
|
|
|
- resumeTimer <-chan time.Time
|
|
|
+ failureDecay float64
|
|
|
+ failureThreshold float64
|
|
|
+ failureBackoff time.Duration
|
|
|
+ timeout time.Duration
|
|
|
+ log func(string)
|
|
|
+ services map[serviceID]serviceWithName
|
|
|
+ servicesShuttingDown map[serviceID]serviceWithName
|
|
|
+ lastFail time.Time
|
|
|
+ failures float64
|
|
|
+ restartQueue []serviceID
|
|
|
+ serviceCounter serviceID
|
|
|
+ control chan supervisorMessage
|
|
|
+ liveness chan struct{}
|
|
|
+ resumeTimer <-chan time.Time
|
|
|
|
|
|
// The testing uses the ability to grab these individual logging functions
|
|
|
// and get inside of suture's handling at a deep level.
|
|
|
@@ -135,8 +89,8 @@ type Supervisor struct {
|
|
|
// I'll happily do it.
|
|
|
// But since I've now changed the signature on these once, I'm glad I
|
|
|
// didn't start with them public... :)
|
|
|
- logBadStop func(*Supervisor, Service)
|
|
|
- logFailure func(supervisor *Supervisor, service Service, currentFailures float64, failureThreshold float64, restarting bool, error interface{}, stacktrace []byte)
|
|
|
+ logBadStop func(*Supervisor, Service, string)
|
|
|
+ logFailure func(supervisor *Supervisor, service Service, serviceName string, currentFailures float64, failureThreshold float64, restarting bool, error interface{}, stacktrace []byte)
|
|
|
logBackoff func(*Supervisor, bool)
|
|
|
|
|
|
// avoid a dependency on github.com/thejerf/abtime by just implementing
|
|
|
@@ -204,7 +158,10 @@ func New(name string, spec Spec) (s *Supervisor) {
|
|
|
s = new(Supervisor)
|
|
|
|
|
|
s.Name = name
|
|
|
- s.id = supervisorID(atomic.AddUint32(¤tSupervisorID, 1))
|
|
|
+ currentSupervisorIDL.Lock()
|
|
|
+ currentSupervisorID++
|
|
|
+ s.id = supervisorID(currentSupervisorID)
|
|
|
+ currentSupervisorIDL.Unlock()
|
|
|
|
|
|
if spec.Log == nil {
|
|
|
s.log = func(msg string) {
|
|
|
@@ -240,15 +197,17 @@ func New(name string, spec Spec) (s *Supervisor) {
|
|
|
s.getAfterChan = time.After
|
|
|
|
|
|
s.control = make(chan supervisorMessage)
|
|
|
- s.services = make(map[serviceID]Service)
|
|
|
+ s.liveness = make(chan struct{})
|
|
|
+ s.services = make(map[serviceID]serviceWithName)
|
|
|
+ s.servicesShuttingDown = make(map[serviceID]serviceWithName)
|
|
|
s.restartQueue = make([]serviceID, 0, 1)
|
|
|
s.resumeTimer = make(chan time.Time)
|
|
|
|
|
|
// set up the default logging handlers
|
|
|
- s.logBadStop = func(supervisor *Supervisor, service Service) {
|
|
|
- s.log(fmt.Sprintf("%s: Service %s failed to terminate in a timely manner", serviceName(supervisor), serviceName(service)))
|
|
|
+ s.logBadStop = func(supervisor *Supervisor, service Service, name string) {
|
|
|
+ s.log(fmt.Sprintf("%s: Service %s failed to terminate in a timely manner", supervisor.Name, name))
|
|
|
}
|
|
|
- s.logFailure = func(supervisor *Supervisor, service Service, failures float64, threshold float64, restarting bool, err interface{}, st []byte) {
|
|
|
+ s.logFailure = func(supervisor *Supervisor, service Service, serviceName string, failures float64, threshold float64, restarting bool, err interface{}, st []byte) {
|
|
|
var errString string
|
|
|
|
|
|
e, canError := err.(error)
|
|
|
@@ -258,7 +217,7 @@ func New(name string, spec Spec) (s *Supervisor) {
|
|
|
errString = fmt.Sprintf("%#v", err)
|
|
|
}
|
|
|
|
|
|
- s.log(fmt.Sprintf("%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s", serviceName(supervisor), serviceName(service), failures, threshold, restarting, errString, string(st)))
|
|
|
+ s.log(fmt.Sprintf("%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s", supervisor.Name, serviceName, failures, threshold, restarting, errString, string(st)))
|
|
|
}
|
|
|
s.logBackoff = func(s *Supervisor, entering bool) {
|
|
|
if entering {
|
|
|
@@ -287,68 +246,6 @@ func NewSimple(name string) *Supervisor {
|
|
|
return New(name, Spec{})
|
|
|
}
|
|
|
|
|
|
-/*
|
|
|
-Service is the interface that describes a service to a Supervisor.
|
|
|
-
|
|
|
-Serve Method
|
|
|
-
|
|
|
-The Serve method is called by a Supervisor to start the service.
|
|
|
-The service should execute within the goroutine that this is
|
|
|
-called in. If this function either returns or panics, the Supervisor
|
|
|
-will call it again.
|
|
|
-
|
|
|
-A Serve method SHOULD do as much cleanup of the state as possible,
|
|
|
-to prevent any corruption in the previous state from crashing the
|
|
|
-service again.
|
|
|
-
|
|
|
-Stop Method
|
|
|
-
|
|
|
-This method is used by the supervisor to stop the service. Calling this
|
|
|
-directly on a Service given to a Supervisor will simply result in the
|
|
|
-Service being restarted; use the Supervisor's .Remove(ServiceToken) method
|
|
|
-to stop a service. A supervisor will call .Stop() only once. Thus, it may
|
|
|
-be as destructive as it likes to get the service to stop.
|
|
|
-
|
|
|
-Once Stop has been called on a Service, the Service SHOULD NOT be
|
|
|
-reused in any other supervisor! Because of the impossibility of
|
|
|
-guaranteeing that the service has actually stopped in Go, you can't
|
|
|
-prove that you won't be starting two goroutines using the exact
|
|
|
-same memory to store state, causing completely unpredictable behavior.
|
|
|
-
|
|
|
-Stop should not return until the service has actually stopped.
|
|
|
-"Stopped" here is defined as "the service will stop servicing any
|
|
|
-further requests in the future". For instance, a common implementation
|
|
|
-is to receive a message on a dedicated "stop" channel and immediately
|
|
|
-returning. Once the stop command has been processed, the service is
|
|
|
-stopped.
|
|
|
-
|
|
|
-Another common Stop implementation is to forcibly close an open socket
|
|
|
-or other resource, which will cause detectable errors to manifest in the
|
|
|
-service code. Bear in mind that to perfectly correctly use this
|
|
|
-approach requires a bit more work to handle the chance of a Stop
|
|
|
-command coming in before the resource has been created.
|
|
|
-
|
|
|
-If a service does not Stop within the supervisor's timeout duration, a log
|
|
|
-entry will be made with a descriptive string to that effect. This does
|
|
|
-not guarantee that the service is hung; it may still get around to being
|
|
|
-properly stopped in the future. Until the service is fully stopped,
|
|
|
-both the service and the spawned goroutine trying to stop it will be
|
|
|
-"leaked".
|
|
|
-
|
|
|
-Stringer Interface
|
|
|
-
|
|
|
-It is not mandatory to implement the fmt.Stringer interface on your
|
|
|
-service, but if your Service does happen to implement that, the log
|
|
|
-messages that describe your service will use that when naming the
|
|
|
-service. Otherwise, you'll see the GoString of your service object,
|
|
|
-obtained via fmt.Sprintf("%#v", service).
|
|
|
-
|
|
|
-*/
|
|
|
-type Service interface {
|
|
|
- Serve()
|
|
|
- Stop()
|
|
|
-}
|
|
|
-
|
|
|
/*
|
|
|
Add adds a service to this supervisor.
|
|
|
|
|
|
@@ -381,7 +278,7 @@ func (s *Supervisor) Add(service Service) ServiceToken {
|
|
|
id := s.serviceCounter
|
|
|
s.serviceCounter++
|
|
|
|
|
|
- s.services[id] = service
|
|
|
+ s.services[id] = serviceWithName{service, serviceName(service)}
|
|
|
s.restartQueue = append(s.restartQueue, id)
|
|
|
|
|
|
s.Unlock()
|
|
|
@@ -390,7 +287,7 @@ func (s *Supervisor) Add(service Service) ServiceToken {
|
|
|
s.Unlock()
|
|
|
|
|
|
response := make(chan serviceID)
|
|
|
- s.control <- addService{service, response}
|
|
|
+ s.control <- addService{service, serviceName(service), response}
|
|
|
return ServiceToken{uint64(s.id)<<32 | uint64(<-response)}
|
|
|
}
|
|
|
|
|
|
@@ -430,9 +327,9 @@ func (s *Supervisor) Serve() {
|
|
|
|
|
|
// for all the services I currently know about, start them
|
|
|
for _, id := range s.restartQueue {
|
|
|
- service, present := s.services[id]
|
|
|
+ namedService, present := s.services[id]
|
|
|
if present {
|
|
|
- s.runService(service, id)
|
|
|
+ s.runService(namedService.Service, id)
|
|
|
}
|
|
|
}
|
|
|
s.restartQueue = make([]serviceID, 0, 1)
|
|
|
@@ -452,21 +349,22 @@ func (s *Supervisor) Serve() {
|
|
|
id := s.serviceCounter
|
|
|
s.serviceCounter++
|
|
|
|
|
|
- s.services[id] = msg.service
|
|
|
+ s.services[id] = serviceWithName{msg.service, msg.name}
|
|
|
s.runService(msg.service, id)
|
|
|
|
|
|
msg.response <- id
|
|
|
case removeService:
|
|
|
- s.removeService(msg.id)
|
|
|
+ s.removeService(msg.id, s.control)
|
|
|
+ case serviceTerminated:
|
|
|
+ delete(s.servicesShuttingDown, msg.id)
|
|
|
case stopSupervisor:
|
|
|
- for id := range s.services {
|
|
|
- s.removeService(id)
|
|
|
- }
|
|
|
+ s.stopSupervisor()
|
|
|
+ msg.done <- struct{}{}
|
|
|
return
|
|
|
case listServices:
|
|
|
services := []Service{}
|
|
|
for _, service := range s.services {
|
|
|
- services = append(services, service)
|
|
|
+ services = append(services, service.Service)
|
|
|
}
|
|
|
msg.c <- services
|
|
|
case syncSupervisor:
|
|
|
@@ -487,9 +385,9 @@ func (s *Supervisor) Serve() {
|
|
|
s.failures = 0
|
|
|
s.logBackoff(s, false)
|
|
|
for _, id := range s.restartQueue {
|
|
|
- service, present := s.services[id]
|
|
|
+ namedService, present := s.services[id]
|
|
|
if present {
|
|
|
- s.runService(service, id)
|
|
|
+ s.runService(namedService.Service, id)
|
|
|
}
|
|
|
}
|
|
|
s.restartQueue = make([]serviceID, 0, 1)
|
|
|
@@ -531,13 +429,13 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
|
|
|
curState := s.state
|
|
|
s.Unlock()
|
|
|
if curState == normal {
|
|
|
- s.runService(failedService, id)
|
|
|
- s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace)
|
|
|
+ s.runService(failedService.Service, id)
|
|
|
+ s.logFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, true, err, stacktrace)
|
|
|
} else {
|
|
|
// FIXME: When restarting, check that the service still
|
|
|
// exists (it may have been stopped in the meantime)
|
|
|
s.restartQueue = append(s.restartQueue, id)
|
|
|
- s.logFailure(s, failedService, s.failures, s.failureThreshold, false, err, stacktrace)
|
|
|
+ s.logFailure(s, failedService.Service, failedService.name, s.failures, s.failureThreshold, false, err, stacktrace)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -559,39 +457,73 @@ func (s *Supervisor) runService(service Service, id serviceID) {
|
|
|
}()
|
|
|
}
|
|
|
|
|
|
-func (s *Supervisor) removeService(id serviceID) {
|
|
|
- service, present := s.services[id]
|
|
|
+func (s *Supervisor) removeService(id serviceID, removedChan chan supervisorMessage) {
|
|
|
+ namedService, present := s.services[id]
|
|
|
if present {
|
|
|
delete(s.services, id)
|
|
|
+ s.servicesShuttingDown[id] = namedService
|
|
|
go func() {
|
|
|
successChan := make(chan bool)
|
|
|
go func() {
|
|
|
- service.Stop()
|
|
|
+ namedService.Service.Stop()
|
|
|
successChan <- true
|
|
|
}()
|
|
|
|
|
|
- failChan := s.getAfterChan(s.timeout)
|
|
|
-
|
|
|
select {
|
|
|
case <-successChan:
|
|
|
// Life is good!
|
|
|
- case <-failChan:
|
|
|
- s.logBadStop(s, service)
|
|
|
+ case <-s.getAfterChan(s.timeout):
|
|
|
+ s.logBadStop(s, namedService.Service, namedService.name)
|
|
|
}
|
|
|
+ removedChan <- serviceTerminated{id}
|
|
|
}()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (s *Supervisor) stopSupervisor() {
|
|
|
+ notifyDone := make(chan serviceID)
|
|
|
+
|
|
|
+ for id := range s.services {
|
|
|
+ namedService, present := s.services[id]
|
|
|
+ if present {
|
|
|
+ delete(s.services, id)
|
|
|
+ s.servicesShuttingDown[id] = namedService
|
|
|
+ go func(sID serviceID) {
|
|
|
+ namedService.Service.Stop()
|
|
|
+ notifyDone <- sID
|
|
|
+ }(id)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ timeout := s.getAfterChan(s.timeout)
|
|
|
+ for len(s.servicesShuttingDown) > 0 {
|
|
|
+ select {
|
|
|
+ case id := <-notifyDone:
|
|
|
+ delete(s.servicesShuttingDown, id)
|
|
|
+ case <-timeout:
|
|
|
+ for _, namedService := range s.servicesShuttingDown {
|
|
|
+ s.logBadStop(s, namedService.Service, namedService.name)
|
|
|
+ }
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ close(s.liveness)
|
|
|
+ return
|
|
|
+}
|
|
|
+
|
|
|
// String implements the fmt.Stringer interface.
|
|
|
func (s *Supervisor) String() string {
|
|
|
return s.Name
|
|
|
}
|
|
|
|
|
|
-// sum type pattern for type-safe message passing; see
|
|
|
-// http://www.jerf.org/iri/post/2917
|
|
|
-
|
|
|
-type supervisorMessage interface {
|
|
|
- isSupervisorMessage()
|
|
|
+func (s *Supervisor) sendControl(sm supervisorMessage) bool {
|
|
|
+ select {
|
|
|
+ case s.control <- sm:
|
|
|
+ return true
|
|
|
+ case _, _ = (<-s.liveness):
|
|
|
+ return false
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/*
|
|
|
@@ -603,7 +535,7 @@ func (s *Supervisor) Remove(id ServiceToken) error {
|
|
|
if sID != s.id {
|
|
|
return ErrWrongSupervisor
|
|
|
}
|
|
|
- s.control <- removeService{serviceID(id.id & 0xffffffff)}
|
|
|
+ s.sendControl(removeService{serviceID(id.id & 0xffffffff)})
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -615,76 +547,10 @@ Supervisor is managing.
|
|
|
*/
|
|
|
func (s *Supervisor) Services() []Service {
|
|
|
ls := listServices{make(chan []Service)}
|
|
|
- s.control <- ls
|
|
|
- return <-ls.c
|
|
|
-}
|
|
|
-
|
|
|
-type listServices struct {
|
|
|
- c chan []Service
|
|
|
-}
|
|
|
-
|
|
|
-func (ls listServices) isSupervisorMessage() {}
|
|
|
-
|
|
|
-type removeService struct {
|
|
|
- id serviceID
|
|
|
-}
|
|
|
-
|
|
|
-func (rs removeService) isSupervisorMessage() {}
|
|
|
-
|
|
|
-func (s *Supervisor) sync() {
|
|
|
- s.control <- syncSupervisor{}
|
|
|
-}
|
|
|
-
|
|
|
-type syncSupervisor struct {
|
|
|
-}
|
|
|
-
|
|
|
-func (ss syncSupervisor) isSupervisorMessage() {}
|
|
|
-
|
|
|
-func (s *Supervisor) fail(id serviceID, err interface{}, stacktrace []byte) {
|
|
|
- s.control <- serviceFailed{id, err, stacktrace}
|
|
|
-}
|
|
|
-
|
|
|
-type serviceFailed struct {
|
|
|
- id serviceID
|
|
|
- err interface{}
|
|
|
- stacktrace []byte
|
|
|
-}
|
|
|
-
|
|
|
-func (sf serviceFailed) isSupervisorMessage() {}
|
|
|
|
|
|
-func (s *Supervisor) serviceEnded(id serviceID) {
|
|
|
- s.control <- serviceEnded{id}
|
|
|
-}
|
|
|
-
|
|
|
-type serviceEnded struct {
|
|
|
- id serviceID
|
|
|
-}
|
|
|
-
|
|
|
-func (s serviceEnded) isSupervisorMessage() {}
|
|
|
-
|
|
|
-// added by the Add() method
|
|
|
-type addService struct {
|
|
|
- service Service
|
|
|
- response chan serviceID
|
|
|
-}
|
|
|
-
|
|
|
-func (as addService) isSupervisorMessage() {}
|
|
|
-
|
|
|
-// Stop stops the Supervisor.
|
|
|
-func (s *Supervisor) Stop() {
|
|
|
- s.control <- stopSupervisor{}
|
|
|
-}
|
|
|
-
|
|
|
-type stopSupervisor struct {
|
|
|
-}
|
|
|
-
|
|
|
-func (ss stopSupervisor) isSupervisorMessage() {}
|
|
|
-
|
|
|
-func (s *Supervisor) panic() {
|
|
|
- s.control <- panicSupervisor{}
|
|
|
-}
|
|
|
-
|
|
|
-type panicSupervisor struct {
|
|
|
+ if s.sendControl(ls) {
|
|
|
+ return <-ls.c
|
|
|
+ } else {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
-func (ps panicSupervisor) isSupervisorMessage() {}
|