execqueue.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package execqueue implements an ordered asynchronous queue for executing functions.
  4. package execqueue
  5. import (
  6. "context"
  7. "errors"
  8. "sync"
  9. )
  10. type ExecQueue struct {
  11. mu sync.Mutex
  12. closed bool
  13. inFlight bool // whether a goroutine is running q.run
  14. doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
  15. queue []func()
  16. }
  17. func (q *ExecQueue) Add(f func()) {
  18. q.mu.Lock()
  19. defer q.mu.Unlock()
  20. if q.closed {
  21. return
  22. }
  23. if q.inFlight {
  24. q.queue = append(q.queue, f)
  25. } else {
  26. q.inFlight = true
  27. go q.run(f)
  28. }
  29. }
  30. // RunSync waits for the queue to be drained and then synchronously runs f.
  31. // It returns an error if the queue is closed before f is run or ctx expires.
  32. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
  33. for {
  34. if err := q.Wait(ctx); err != nil {
  35. return err
  36. }
  37. q.mu.Lock()
  38. if q.inFlight {
  39. q.mu.Unlock()
  40. continue
  41. }
  42. defer q.mu.Unlock()
  43. if q.closed {
  44. return errors.New("closed")
  45. }
  46. f()
  47. return nil
  48. }
  49. }
  50. func (q *ExecQueue) run(f func()) {
  51. f()
  52. q.mu.Lock()
  53. for len(q.queue) > 0 && !q.closed {
  54. f := q.queue[0]
  55. q.queue[0] = nil
  56. q.queue = q.queue[1:]
  57. q.mu.Unlock()
  58. f()
  59. q.mu.Lock()
  60. }
  61. q.inFlight = false
  62. q.queue = nil
  63. if q.doneWaiter != nil {
  64. close(q.doneWaiter)
  65. q.doneWaiter = nil
  66. }
  67. q.mu.Unlock()
  68. }
  69. // Shutdown asynchronously signals the queue to stop.
  70. func (q *ExecQueue) Shutdown() {
  71. q.mu.Lock()
  72. defer q.mu.Unlock()
  73. q.closed = true
  74. }
  75. // Wait waits for the queue to be empty.
  76. func (q *ExecQueue) Wait(ctx context.Context) error {
  77. q.mu.Lock()
  78. waitCh := q.doneWaiter
  79. if q.inFlight && waitCh == nil {
  80. waitCh = make(chan struct{})
  81. q.doneWaiter = waitCh
  82. }
  83. q.mu.Unlock()
  84. if waitCh == nil {
  85. return nil
  86. }
  87. select {
  88. case <-waitCh:
  89. return nil
  90. case <-ctx.Done():
  91. return ctx.Err()
  92. }
  93. }