suture.go 19 KB


  1. /*
  2. Package suture provides Erlang-like supervisor trees.
  3. This implements Erlang-esque supervisor trees, as adapted for Go. This is
  4. intended to be an industrial-strength implementation, but it has not yet
  5. been deployed in a hostile environment. (It's headed there, though.)
  6. Supervisor Tree -> SuTree -> suture -> holds your code together when it's
  7. trying to fall apart.
  8. Why use Suture?
  9. * You want to write bullet-resistant services that will remain available
  10. despite unforeseen failure.
  11. * You need the code to be smart enough not to consume 100% of the CPU
  12. restarting things.
  13. * You want to easily compose multiple such services in one program.
  14. * You want the Erlang programmers to stop lording their supervision
  15. trees over you.
  16. Suture has 100% test coverage, and is golint clean. This doesn't prove it
  17. free of bugs, but it shows I care.
  18. A blog post describing the design decisions is available at
  19. http://www.jerf.org/iri/post/2930 .
  20. Using Suture
  21. To idiomatically use Suture, create a Supervisor which is your top level
  22. "application" supervisor. This will often occur in your program's "main"
  23. function.
  24. Create "Service"s, which implement the Service interface. .Add() them
  25. to your Supervisor. Supervisors are also services, so you can create a
  26. tree structure here, depending on the exact combination of restarts
  27. you want to create.
  28. As a special case, when adding Supervisors to Supervisors, the "sub"
  29. supervisor will have the "super" supervisor's Log function copied.
  30. This allows you to set one log function on the "top" supervisor, and
  31. have it propagate down to all the sub-supervisors. This also allows
  32. libraries or modules to provide Supervisors without having to commit
  33. their users to a particular logging method.
  34. Finally, as what is probably the last line of your main() function, call
  35. .Serve() on your top level supervisor. This will start all the services
  36. you've defined.
  37. See the Example for an example, using a simple service that serves out
  38. incrementing integers.
  39. */
  40. package suture
  41. import (
  42. "errors"
  43. "fmt"
  44. "log"
  45. "math"
  46. "runtime"
  47. "sync"
  48. "sync/atomic"
  49. "time"
  50. )
  51. const (
  52. notRunning = iota
  53. normal
  54. paused
  55. )
  56. type supervisorID uint32
  57. type serviceID uint32
  58. var currentSupervisorID uint32
  59. // ErrWrongSupervisor is returned by the (*Supervisor).Remove method
  60. // if you pass a ServiceToken from the wrong Supervisor.
  61. var ErrWrongSupervisor = errors.New("wrong supervisor for this service token, no service removed")
  62. // ServiceToken is an opaque identifier that can be used to terminate a service that
  63. // has been Add()ed to a Supervisor.
  64. type ServiceToken struct {
  65. id uint64
  66. }
  67. /*
  68. Supervisor is the core type of the module that represents a Supervisor.
  69. Supervisors should be constructed either by New or NewSimple.
  70. Once constructed, a Supervisor should be started in one of three ways:
  71. 1. Calling .Serve().
  72. 2. Calling .ServeBackground().
  73. 3. Adding it to an existing Supervisor.
  74. Calling Serve will cause the supervisor to run until it is shut down by
  75. an external user calling Stop() on it. If that never happens, it simply
  76. runs forever. I suggest creating your services in Supervisors, then making
  77. a Serve() call on your top-level Supervisor be the last line of your main
  78. func.
  79. Calling ServeBackground will CORRECTLY start the supervisor running in a
  80. new goroutine. You do not want to just:
  81. go supervisor.Serve()
  82. because that will briefly create a race condition as it starts up, if you
  83. try to .Add() services immediately afterward.
  84. */
  85. type Supervisor struct {
  86. Name string
  87. id supervisorID
  88. failureDecay float64
  89. failureThreshold float64
  90. failureBackoff time.Duration
  91. timeout time.Duration
  92. log func(string)
  93. services map[serviceID]Service
  94. lastFail time.Time
  95. failures float64
  96. restartQueue []serviceID
  97. serviceCounter serviceID
  98. control chan supervisorMessage
  99. resumeTimer <-chan time.Time
  100. // The testing uses the ability to grab these individual logging functions
  101. // and get inside of suture's handling at a deep level.
  102. // If you ever come up with some need to get into these, submit a pull
  103. // request to make them public and some smidge of justification, and
  104. // I'll happily do it.
  105. // But since I've now changed the signature on these once, I'm glad I
  106. // didn't start with them public... :)
  107. logBadStop func(*Supervisor, Service)
  108. logFailure func(supervisor *Supervisor, service Service, currentFailures float64, failureThreshold float64, restarting bool, error interface{}, stacktrace []byte)
  109. logBackoff func(*Supervisor, bool)
  110. // avoid a dependency on github.com/thejerf/abtime by just implementing
  111. // a minimal chunk.
  112. getNow func() time.Time
  113. getAfterChan func(time.Duration) <-chan time.Time
  114. sync.Mutex
  115. state uint8
  116. }
  117. // Spec is used to pass arguments to the New function to create a
  118. // supervisor. See the New function for full documentation.
  119. type Spec struct {
  120. Log func(string)
  121. FailureDecay float64
  122. FailureThreshold float64
  123. FailureBackoff time.Duration
  124. Timeout time.Duration
  125. }
  126. /*
  127. New is the full constructor function for a supervisor.
  128. The name is a friendly human name for the supervisor, used in logging. Suture
  129. does not care if this is unique, but it is good for your sanity if it is.
  130. If not set, the following values are used:
  131. * Log: A function is created that uses log.Print.
  132. * FailureDecay: 30 seconds
  133. * FailureThreshold: 5 failures
  134. * FailureBackoff: 15 seconds
  135. * Timeout: 10 seconds
  136. The Log function will be called when errors occur. Suture will log the
  137. following:
  138. * When a service has failed, with a descriptive message about the
  139. current backoff status, and whether it was immediately restarted
  140. * When the supervisor has gone into its backoff mode, and when it
  141. exits it
  142. * When a service fails to stop
  143. The failureRate, failureThreshold, and failureBackoff controls how failures
  144. are handled, in order to avoid the supervisor failure case where the
  145. program does nothing but restarting failed services. If you do not
  146. care how failures behave, the default values should be fine for the
  147. vast majority of services, but if you want the details:
  148. The supervisor tracks the number of failures that have occurred, with an
  149. exponential decay on the count. Every FailureDecay seconds, the number of
  150. failures that have occurred is cut in half. (This is done smoothly with an
  151. exponential function.) When a failure occurs, the number of failures
  152. is incremented by one. When the number of failures passes the
  153. FailureThreshold, the entire service waits for FailureBackoff seconds
  154. before attempting any further restarts, at which point it resets its
  155. failure count to zero.
  156. Timeout is how long Suture will wait for a service to properly terminate.
  157. */
  158. func New(name string, spec Spec) (s *Supervisor) {
  159. s = new(Supervisor)
  160. s.Name = name
  161. s.id = supervisorID(atomic.AddUint32(&currentSupervisorID, 1))
  162. if spec.Log == nil {
  163. s.log = func(msg string) {
  164. log.Print(fmt.Sprintf("Supervisor %s: %s", s.Name, msg))
  165. }
  166. } else {
  167. s.log = spec.Log
  168. }
  169. if spec.FailureDecay == 0 {
  170. s.failureDecay = 30
  171. } else {
  172. s.failureDecay = spec.FailureDecay
  173. }
  174. if spec.FailureThreshold == 0 {
  175. s.failureThreshold = 5
  176. } else {
  177. s.failureThreshold = spec.FailureThreshold
  178. }
  179. if spec.FailureBackoff == 0 {
  180. s.failureBackoff = time.Second * 15
  181. } else {
  182. s.failureBackoff = spec.FailureBackoff
  183. }
  184. if spec.Timeout == 0 {
  185. s.timeout = time.Second * 10
  186. } else {
  187. s.timeout = spec.Timeout
  188. }
  189. // overriding these allows for testing the threshold behavior
  190. s.getNow = time.Now
  191. s.getAfterChan = time.After
  192. s.control = make(chan supervisorMessage)
  193. s.services = make(map[serviceID]Service)
  194. s.restartQueue = make([]serviceID, 0, 1)
  195. s.resumeTimer = make(chan time.Time)
  196. // set up the default logging handlers
  197. s.logBadStop = func(supervisor *Supervisor, service Service) {
  198. s.log(fmt.Sprintf("%s: Service %s failed to terminate in a timely manner", serviceName(supervisor), serviceName(service)))
  199. }
  200. s.logFailure = func(supervisor *Supervisor, service Service, failures float64, threshold float64, restarting bool, err interface{}, st []byte) {
  201. var errString string
  202. e, canError := err.(error)
  203. if canError {
  204. errString = e.Error()
  205. } else {
  206. errString = fmt.Sprintf("%#v", err)
  207. }
  208. s.log(fmt.Sprintf("%s: Failed service '%s' (%f failures of %f), restarting: %#v, error: %s, stacktrace: %s", serviceName(supervisor), serviceName(service), failures, threshold, restarting, errString, string(st)))
  209. }
  210. s.logBackoff = func(s *Supervisor, entering bool) {
  211. if entering {
  212. s.log("Entering the backoff state.")
  213. } else {
  214. s.log("Exiting backoff state.")
  215. }
  216. }
  217. return
  218. }
  219. func serviceName(service Service) (serviceName string) {
  220. stringer, canStringer := service.(fmt.Stringer)
  221. if canStringer {
  222. serviceName = stringer.String()
  223. } else {
  224. serviceName = fmt.Sprintf("%#v", service)
  225. }
  226. return
  227. }
  228. // NewSimple is a convenience function to create a service with just a name
  229. // and the sensible defaults.
  230. func NewSimple(name string) *Supervisor {
  231. return New(name, Spec{})
  232. }
  233. /*
  234. Service is the interface that describes a service to a Supervisor.
  235. Serve Method
  236. The Serve method is called by a Supervisor to start the service.
  237. The service should execute within the goroutine that this is
  238. called in. If this function either returns or panics, the Supervisor
  239. will call it again.
  240. A Serve method SHOULD do as much cleanup of the state as possible,
  241. to prevent any corruption in the previous state from crashing the
  242. service again.
  243. Stop Method
  244. This method is used by the supervisor to stop the service. Calling this
  245. directly on a Service given to a Supervisor will simply result in the
  246. Service being restarted; use the Supervisor's .Remove(ServiceToken) method
  247. to stop a service. A supervisor will call .Stop() only once. Thus, it may
  248. be as destructive as it likes to get the service to stop.
  249. Once Stop has been called on a Service, the Service SHOULD NOT be
  250. reused in any other supervisor! Because of the impossibility of
  251. guaranteeing that the service has actually stopped in Go, you can't
  252. prove that you won't be starting two goroutines using the exact
  253. same memory to store state, causing completely unpredictable behavior.
  254. Stop should not return until the service has actually stopped.
  255. "Stopped" here is defined as "the service will stop servicing any
  256. further requests in the future". For instance, a common implementation
  257. is to receive a message on a dedicated "stop" channel and immediately
  258. returning. Once the stop command has been processed, the service is
  259. stopped.
  260. Another common Stop implementation is to forcibly close an open socket
  261. or other resource, which will cause detectable errors to manifest in the
  262. service code. Bear in mind that to perfectly correctly use this
  263. approach requires a bit more work to handle the chance of a Stop
  264. command coming in before the resource has been created.
  265. If a service does not Stop within the supervisor's timeout duration, a log
  266. entry will be made with a descriptive string to that effect. This does
  267. not guarantee that the service is hung; it may still get around to being
  268. properly stopped in the future. Until the service is fully stopped,
  269. both the service and the spawned goroutine trying to stop it will be
  270. "leaked".
  271. Stringer Interface
  272. It is not mandatory to implement the fmt.Stringer interface on your
  273. service, but if your Service does happen to implement that, the log
  274. messages that describe your service will use that when naming the
  275. service. Otherwise, you'll see the GoString of your service object,
  276. obtained via fmt.Sprintf("%#v", service).
  277. */
  278. type Service interface {
  279. Serve()
  280. Stop()
  281. }
  282. /*
  283. Add adds a service to this supervisor.
  284. If the supervisor is currently running, the service will be started
  285. immediately. If the supervisor is not currently running, the service
  286. will be started when the supervisor is.
  287. The returned ServiceID may be passed to the Remove method of the Supervisor
  288. to terminate the service.
  289. As a special behavior, if the service added is itself a supervisor, the
  290. supervisor being added will copy the Log function from the Supervisor it
  291. is being added to. This allows factoring out providing a Supervisor
  292. from its logging.
  293. */
  294. func (s *Supervisor) Add(service Service) ServiceToken {
  295. if s == nil {
  296. panic("can't add service to nil *suture.Supervisor")
  297. }
  298. if supervisor, isSupervisor := service.(*Supervisor); isSupervisor {
  299. supervisor.logBadStop = s.logBadStop
  300. supervisor.logFailure = s.logFailure
  301. supervisor.logBackoff = s.logBackoff
  302. }
  303. s.Lock()
  304. if s.state == notRunning {
  305. id := s.serviceCounter
  306. s.serviceCounter++
  307. s.services[id] = service
  308. s.restartQueue = append(s.restartQueue, id)
  309. s.Unlock()
  310. return ServiceToken{uint64(s.id)<<32 | uint64(id)}
  311. }
  312. s.Unlock()
  313. response := make(chan serviceID)
  314. s.control <- addService{service, response}
  315. return ServiceToken{uint64(s.id)<<32 | uint64(<-response)}
  316. }
  317. // ServeBackground starts running a supervisor in its own goroutine. This
  318. // method does not return until it is safe to use .Add() on the Supervisor.
  319. func (s *Supervisor) ServeBackground() {
  320. go s.Serve()
  321. s.sync()
  322. }
  323. /*
  324. Serve starts the supervisor. You should call this on the top-level supervisor,
  325. but nothing else.
  326. */
  327. func (s *Supervisor) Serve() {
  328. if s == nil {
  329. panic("Can't serve with a nil *suture.Supervisor")
  330. }
  331. if s.id == 0 {
  332. panic("Can't call Serve on an incorrectly-constructed *suture.Supervisor")
  333. }
  334. defer func() {
  335. s.Lock()
  336. s.state = notRunning
  337. s.Unlock()
  338. }()
  339. s.Lock()
  340. if s.state != notRunning {
  341. s.Unlock()
  342. panic("Running a supervisor while it is already running?")
  343. }
  344. s.state = normal
  345. s.Unlock()
  346. // for all the services I currently know about, start them
  347. for _, id := range s.restartQueue {
  348. service, present := s.services[id]
  349. if present {
  350. s.runService(service, id)
  351. }
  352. }
  353. s.restartQueue = make([]serviceID, 0, 1)
  354. for {
  355. select {
  356. case m := <-s.control:
  357. switch msg := m.(type) {
  358. case serviceFailed:
  359. s.handleFailedService(msg.id, msg.err, msg.stacktrace)
  360. case serviceEnded:
  361. service, monitored := s.services[msg.id]
  362. if monitored {
  363. s.handleFailedService(msg.id, fmt.Sprintf("%s returned unexpectedly", service), []byte("[unknown stack trace]"))
  364. }
  365. case addService:
  366. id := s.serviceCounter
  367. s.serviceCounter++
  368. s.services[id] = msg.service
  369. s.runService(msg.service, id)
  370. msg.response <- id
  371. case removeService:
  372. s.removeService(msg.id)
  373. case stopSupervisor:
  374. for id := range s.services {
  375. s.removeService(id)
  376. }
  377. return
  378. case listServices:
  379. services := []Service{}
  380. for _, service := range s.services {
  381. services = append(services, service)
  382. }
  383. msg.c <- services
  384. case syncSupervisor:
  385. // this does nothing on purpose; its sole purpose is to
  386. // introduce a sync point via the channel receive
  387. case panicSupervisor:
  388. // used only by tests
  389. panic("Panicking as requested!")
  390. }
  391. case _ = <-s.resumeTimer:
  392. // We're resuming normal operation after a pause due to
  393. // excessive thrashing
  394. // FIXME: Ought to permit some spacing of these functions, rather
  395. // than simply hammering through them
  396. s.Lock()
  397. s.state = normal
  398. s.Unlock()
  399. s.failures = 0
  400. s.logBackoff(s, false)
  401. for _, id := range s.restartQueue {
  402. service, present := s.services[id]
  403. if present {
  404. s.runService(service, id)
  405. }
  406. }
  407. s.restartQueue = make([]serviceID, 0, 1)
  408. }
  409. }
  410. }
  411. func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktrace []byte) {
  412. now := s.getNow()
  413. if s.lastFail.IsZero() {
  414. s.lastFail = now
  415. s.failures = 1.0
  416. } else {
  417. sinceLastFail := now.Sub(s.lastFail).Seconds()
  418. intervals := sinceLastFail / s.failureDecay
  419. s.failures = s.failures*math.Pow(.5, intervals) + 1
  420. }
  421. if s.failures > s.failureThreshold {
  422. s.Lock()
  423. s.state = paused
  424. s.Unlock()
  425. s.logBackoff(s, true)
  426. s.resumeTimer = s.getAfterChan(s.failureBackoff)
  427. }
  428. s.lastFail = now
  429. failedService, monitored := s.services[id]
  430. // It is possible for a service to be no longer monitored
  431. // by the time we get here. In that case, just ignore it.
  432. if monitored {
  433. // this may look dangerous because the state could change, but this
  434. // code is only ever run in the one goroutine that is permitted to
  435. // change the state, so nothing else will.
  436. s.Lock()
  437. curState := s.state
  438. s.Unlock()
  439. if curState == normal {
  440. s.runService(failedService, id)
  441. s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace)
  442. } else {
  443. // FIXME: When restarting, check that the service still
  444. // exists (it may have been stopped in the meantime)
  445. s.restartQueue = append(s.restartQueue, id)
  446. s.logFailure(s, failedService, s.failures, s.failureThreshold, false, err, stacktrace)
  447. }
  448. }
  449. }
  450. func (s *Supervisor) runService(service Service, id serviceID) {
  451. go func() {
  452. defer func() {
  453. if r := recover(); r != nil {
  454. buf := make([]byte, 65535, 65535)
  455. written := runtime.Stack(buf, false)
  456. buf = buf[:written]
  457. s.fail(id, r, buf)
  458. }
  459. }()
  460. service.Serve()
  461. s.serviceEnded(id)
  462. }()
  463. }
  464. func (s *Supervisor) removeService(id serviceID) {
  465. service, present := s.services[id]
  466. if present {
  467. delete(s.services, id)
  468. go func() {
  469. successChan := make(chan bool)
  470. go func() {
  471. service.Stop()
  472. successChan <- true
  473. }()
  474. failChan := s.getAfterChan(s.timeout)
  475. select {
  476. case <-successChan:
  477. // Life is good!
  478. case <-failChan:
  479. s.logBadStop(s, service)
  480. }
  481. }()
  482. }
  483. }
  484. // String implements the fmt.Stringer interface.
  485. func (s *Supervisor) String() string {
  486. return s.Name
  487. }
  488. // sum type pattern for type-safe message passing; see
  489. // http://www.jerf.org/iri/post/2917
  490. type supervisorMessage interface {
  491. isSupervisorMessage()
  492. }
  493. /*
  494. Remove will remove the given service from the Supervisor, and attempt to Stop() it.
  495. The ServiceID token comes from the Add() call.
  496. */
  497. func (s *Supervisor) Remove(id ServiceToken) error {
  498. sID := supervisorID(id.id >> 32)
  499. if sID != s.id {
  500. return ErrWrongSupervisor
  501. }
  502. s.control <- removeService{serviceID(id.id & 0xffffffff)}
  503. return nil
  504. }
  505. /*
  506. Services returns a []Service containing a snapshot of the services this
  507. Supervisor is managing.
  508. */
  509. func (s *Supervisor) Services() []Service {
  510. ls := listServices{make(chan []Service)}
  511. s.control <- ls
  512. return <-ls.c
  513. }
  514. type listServices struct {
  515. c chan []Service
  516. }
  517. func (ls listServices) isSupervisorMessage() {}
  518. type removeService struct {
  519. id serviceID
  520. }
  521. func (rs removeService) isSupervisorMessage() {}
  522. func (s *Supervisor) sync() {
  523. s.control <- syncSupervisor{}
  524. }
  525. type syncSupervisor struct {
  526. }
  527. func (ss syncSupervisor) isSupervisorMessage() {}
  528. func (s *Supervisor) fail(id serviceID, err interface{}, stacktrace []byte) {
  529. s.control <- serviceFailed{id, err, stacktrace}
  530. }
  531. type serviceFailed struct {
  532. id serviceID
  533. err interface{}
  534. stacktrace []byte
  535. }
  536. func (sf serviceFailed) isSupervisorMessage() {}
  537. func (s *Supervisor) serviceEnded(id serviceID) {
  538. s.control <- serviceEnded{id}
  539. }
  540. type serviceEnded struct {
  541. id serviceID
  542. }
  543. func (s serviceEnded) isSupervisorMessage() {}
  544. // added by the Add() method
  545. type addService struct {
  546. service Service
  547. response chan serviceID
  548. }
  549. func (as addService) isSupervisorMessage() {}
  550. // Stop stops the Supervisor.
  551. func (s *Supervisor) Stop() {
  552. s.control <- stopSupervisor{}
  553. }
  554. type stopSupervisor struct {
  555. }
  556. func (ss stopSupervisor) isSupervisorMessage() {}
  557. func (s *Supervisor) panic() {
  558. s.control <- panicSupervisor{}
  559. }
  560. type panicSupervisor struct {
  561. }
  562. func (ps panicSupervisor) isSupervisorMessage() {}