execqueue.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package execqueue implements an ordered asynchronous queue for executing functions.
  4. package execqueue
  5. import (
  6. "context"
  7. "errors"
  8. "tailscale.com/syncs"
  9. )
  10. type ExecQueue struct {
  11. mu syncs.Mutex
  12. ctx context.Context // context.Background + closed on Shutdown
  13. cancel context.CancelFunc // closes ctx
  14. closed bool
  15. inFlight bool // whether a goroutine is running q.run
  16. doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
  17. queue []func()
  18. }
  19. func (q *ExecQueue) Add(f func()) {
  20. q.mu.Lock()
  21. defer q.mu.Unlock()
  22. if q.closed {
  23. return
  24. }
  25. q.initCtxLocked()
  26. if q.inFlight {
  27. q.queue = append(q.queue, f)
  28. } else {
  29. q.inFlight = true
  30. go q.run(f)
  31. }
  32. }
  33. // RunSync waits for the queue to be drained and then synchronously runs f.
  34. // It returns an error if the queue is closed before f is run or ctx expires.
  35. func (q *ExecQueue) RunSync(ctx context.Context, f func()) error {
  36. q.mu.Lock()
  37. q.initCtxLocked()
  38. shutdownCtx := q.ctx
  39. q.mu.Unlock()
  40. ch := make(chan struct{})
  41. q.Add(f)
  42. q.Add(func() { close(ch) })
  43. select {
  44. case <-ch:
  45. return nil
  46. case <-ctx.Done():
  47. return ctx.Err()
  48. case <-shutdownCtx.Done():
  49. return errExecQueueShutdown
  50. }
  51. }
  52. func (q *ExecQueue) run(f func()) {
  53. f()
  54. q.mu.Lock()
  55. for len(q.queue) > 0 && !q.closed {
  56. f := q.queue[0]
  57. q.queue[0] = nil
  58. q.queue = q.queue[1:]
  59. q.mu.Unlock()
  60. f()
  61. q.mu.Lock()
  62. }
  63. q.inFlight = false
  64. q.queue = nil
  65. if q.doneWaiter != nil {
  66. close(q.doneWaiter)
  67. q.doneWaiter = nil
  68. }
  69. q.mu.Unlock()
  70. }
  71. // Shutdown asynchronously signals the queue to stop.
  72. func (q *ExecQueue) Shutdown() {
  73. q.mu.Lock()
  74. defer q.mu.Unlock()
  75. q.closed = true
  76. if q.cancel != nil {
  77. q.cancel()
  78. }
  79. }
  80. func (q *ExecQueue) initCtxLocked() {
  81. if q.ctx == nil {
  82. q.ctx, q.cancel = context.WithCancel(context.Background())
  83. }
  84. }
  85. var errExecQueueShutdown = errors.New("execqueue shut down")
  86. // Wait waits for the queue to be empty or shut down.
  87. func (q *ExecQueue) Wait(ctx context.Context) error {
  88. q.mu.Lock()
  89. q.initCtxLocked()
  90. waitCh := q.doneWaiter
  91. if q.inFlight && waitCh == nil {
  92. waitCh = make(chan struct{})
  93. q.doneWaiter = waitCh
  94. }
  95. closed := q.closed
  96. shutdownCtx := q.ctx
  97. q.mu.Unlock()
  98. if closed {
  99. return errExecQueueShutdown
  100. }
  101. if waitCh == nil {
  102. return nil
  103. }
  104. select {
  105. case <-waitCh:
  106. return nil
  107. case <-shutdownCtx.Done():
  108. return errExecQueueShutdown
  109. case <-ctx.Done():
  110. return ctx.Err()
  111. }
  112. }