|
@@ -59,6 +59,7 @@ import (
|
|
|
"log"
|
|
"log"
|
|
|
"math"
|
|
"math"
|
|
|
"runtime"
|
|
"runtime"
|
|
|
|
|
+ "sync"
|
|
|
"sync/atomic"
|
|
"sync/atomic"
|
|
|
"time"
|
|
"time"
|
|
|
)
|
|
)
|
|
@@ -123,7 +124,6 @@ type Supervisor struct {
|
|
|
lastFail time.Time
|
|
lastFail time.Time
|
|
|
failures float64
|
|
failures float64
|
|
|
restartQueue []serviceID
|
|
restartQueue []serviceID
|
|
|
- state uint8
|
|
|
|
|
serviceCounter serviceID
|
|
serviceCounter serviceID
|
|
|
control chan supervisorMessage
|
|
control chan supervisorMessage
|
|
|
resumeTimer <-chan time.Time
|
|
resumeTimer <-chan time.Time
|
|
@@ -143,6 +143,9 @@ type Supervisor struct {
|
|
|
// a minimal chunk.
|
|
// a minimal chunk.
|
|
|
getNow func() time.Time
|
|
getNow func() time.Time
|
|
|
getResume func(time.Duration) <-chan time.Time
|
|
getResume func(time.Duration) <-chan time.Time
|
|
|
|
|
+
|
|
|
|
|
+ sync.Mutex
|
|
|
|
|
+ state uint8
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// Spec is used to pass arguments to the New function to create a
|
|
// Spec is used to pass arguments to the New function to create a
|
|
@@ -373,6 +376,7 @@ func (s *Supervisor) Add(service Service) ServiceToken {
|
|
|
supervisor.logBackoff = s.logBackoff
|
|
supervisor.logBackoff = s.logBackoff
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ s.Lock()
|
|
|
if s.state == notRunning {
|
|
if s.state == notRunning {
|
|
|
id := s.serviceCounter
|
|
id := s.serviceCounter
|
|
|
s.serviceCounter++
|
|
s.serviceCounter++
|
|
@@ -380,8 +384,10 @@ func (s *Supervisor) Add(service Service) ServiceToken {
|
|
|
s.services[id] = service
|
|
s.services[id] = service
|
|
|
s.restartQueue = append(s.restartQueue, id)
|
|
s.restartQueue = append(s.restartQueue, id)
|
|
|
|
|
|
|
|
|
|
+ s.Unlock()
|
|
|
return ServiceToken{uint64(s.id)<<32 | uint64(id)}
|
|
return ServiceToken{uint64(s.id)<<32 | uint64(id)}
|
|
|
}
|
|
}
|
|
|
|
|
+ s.Unlock()
|
|
|
|
|
|
|
|
response := make(chan serviceID)
|
|
response := make(chan serviceID)
|
|
|
s.control <- addService{service, response}
|
|
s.control <- addService{service, response}
|
|
@@ -408,16 +414,19 @@ func (s *Supervisor) Serve() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
defer func() {
|
|
|
|
|
+ s.Lock()
|
|
|
s.state = notRunning
|
|
s.state = notRunning
|
|
|
|
|
+ s.Unlock()
|
|
|
}()
|
|
}()
|
|
|
|
|
|
|
|
|
|
+ s.Lock()
|
|
|
if s.state != notRunning {
|
|
if s.state != notRunning {
|
|
|
- // FIXME: Don't explain why I don't need a semaphore, just use one
|
|
|
|
|
- // This doesn't use a semaphore because it's just a sanity check.
|
|
|
|
|
|
|
+ s.Unlock()
|
|
|
panic("Running a supervisor while it is already running?")
|
|
panic("Running a supervisor while it is already running?")
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
s.state = normal
|
|
s.state = normal
|
|
|
|
|
+ s.Unlock()
|
|
|
|
|
|
|
|
// for all the services I currently know about, start them
|
|
// for all the services I currently know about, start them
|
|
|
for _, id := range s.restartQueue {
|
|
for _, id := range s.restartQueue {
|
|
@@ -472,7 +481,9 @@ func (s *Supervisor) Serve() {
|
|
|
// excessive thrashing
|
|
// excessive thrashing
|
|
|
// FIXME: Ought to permit some spacing of these functions, rather
|
|
// FIXME: Ought to permit some spacing of these functions, rather
|
|
|
// than simply hammering through them
|
|
// than simply hammering through them
|
|
|
|
|
+ s.Lock()
|
|
|
s.state = normal
|
|
s.state = normal
|
|
|
|
|
+ s.Unlock()
|
|
|
s.failures = 0
|
|
s.failures = 0
|
|
|
s.logBackoff(s, false)
|
|
s.logBackoff(s, false)
|
|
|
for _, id := range s.restartQueue {
|
|
for _, id := range s.restartQueue {
|
|
@@ -499,7 +510,9 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if s.failures > s.failureThreshold {
|
|
if s.failures > s.failureThreshold {
|
|
|
|
|
+ s.Lock()
|
|
|
s.state = paused
|
|
s.state = paused
|
|
|
|
|
+ s.Unlock()
|
|
|
s.logBackoff(s, true)
|
|
s.logBackoff(s, true)
|
|
|
s.resumeTimer = s.getResume(s.failureBackoff)
|
|
s.resumeTimer = s.getResume(s.failureBackoff)
|
|
|
}
|
|
}
|
|
@@ -511,7 +524,13 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
|
|
|
// It is possible for a service to be no longer monitored
|
|
// It is possible for a service to be no longer monitored
|
|
|
// by the time we get here. In that case, just ignore it.
|
|
// by the time we get here. In that case, just ignore it.
|
|
|
if monitored {
|
|
if monitored {
|
|
|
- if s.state == normal {
|
|
|
|
|
|
|
+ // this may look dangerous because the state could change, but this
|
|
|
|
|
+ // code is only ever run in the one goroutine that is permitted to
|
|
|
|
|
+ // change the state, so nothing else will.
|
|
|
|
|
+ s.Lock()
|
|
|
|
|
+ curState := s.state
|
|
|
|
|
+ s.Unlock()
|
|
|
|
|
+ if curState == normal {
|
|
|
s.runService(failedService, id)
|
|
s.runService(failedService, id)
|
|
|
s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace)
|
|
s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace)
|
|
|
} else {
|
|
} else {
|