syncs.go 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package syncs contains additional sync types and functionality.
  4. package syncs
  5. import (
  6. "context"
  7. "sync"
  8. "sync/atomic"
  9. "tailscale.com/util/mak"
  10. )
  11. // ClosedChan returns a channel that's already closed.
  12. func ClosedChan() <-chan struct{} { return closedChan }
  13. var closedChan = initClosedChan()
  14. func initClosedChan() <-chan struct{} {
  15. ch := make(chan struct{})
  16. close(ch)
  17. return ch
  18. }
  19. // AtomicValue is the generic version of atomic.Value.
  20. type AtomicValue[T any] struct {
  21. v atomic.Value
  22. }
  23. // Load returns the value set by the most recent Store.
  24. // It returns the zero value for T if the value is empty.
  25. func (v *AtomicValue[T]) Load() T {
  26. x, _ := v.LoadOk()
  27. return x
  28. }
  29. // LoadOk is like Load but returns a boolean indicating whether the value was
  30. // loaded.
  31. func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
  32. x := v.v.Load()
  33. if x != nil {
  34. return x.(T), true
  35. }
  36. var zero T
  37. return zero, false
  38. }
  39. // Store sets the value of the Value to x.
  40. func (v *AtomicValue[T]) Store(x T) {
  41. v.v.Store(x)
  42. }
  43. // Swap stores new into Value and returns the previous value.
  44. // It returns the zero value for T if the value is empty.
  45. func (v *AtomicValue[T]) Swap(x T) (old T) {
  46. oldV := v.v.Swap(x)
  47. if oldV != nil {
  48. return oldV.(T)
  49. }
  50. return old
  51. }
  52. // CompareAndSwap executes the compare-and-swap operation for the Value.
  53. func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
  54. return v.v.CompareAndSwap(oldV, newV)
  55. }
  56. // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
  57. // on completion that you can wait on. (This, you can only use the
  58. // value once)
  59. // Also, its zero value is not usable. Use the constructor.
  60. type WaitGroupChan struct {
  61. n int64 // atomic
  62. done chan struct{} // closed on transition to zero
  63. }
  64. // NewWaitGroupChan returns a new single-use WaitGroupChan.
  65. func NewWaitGroupChan() *WaitGroupChan {
  66. return &WaitGroupChan{done: make(chan struct{})}
  67. }
  68. // DoneChan returns a channel that's closed on completion.
  69. func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
  70. // Add adds delta, which may be negative, to the WaitGroupChan
  71. // counter. If the counter becomes zero, all goroutines blocked on
  72. // Wait or the Done chan are released. If the counter goes negative,
  73. // Add panics.
  74. //
  75. // Note that calls with a positive delta that occur when the counter
  76. // is zero must happen before a Wait. Calls with a negative delta, or
  77. // calls with a positive delta that start when the counter is greater
  78. // than zero, may happen at any time. Typically this means the calls
  79. // to Add should execute before the statement creating the goroutine
  80. // or other event to be waited for.
  81. func (wg *WaitGroupChan) Add(delta int) {
  82. n := atomic.AddInt64(&wg.n, int64(delta))
  83. if n == 0 {
  84. close(wg.done)
  85. }
  86. }
  87. // Decr decrements the WaitGroup counter by one.
  88. //
  89. // (It is like sync.WaitGroup's Done method, but we don't use Done in
  90. // this type, because it's ambiguous between Context.Done and
  91. // WaitGroup.Done. So we use DoneChan and Decr instead.)
  92. func (wg *WaitGroupChan) Decr() {
  93. wg.Add(-1)
  94. }
  95. // Wait blocks until the WaitGroupChan counter is zero.
  96. func (wg *WaitGroupChan) Wait() { <-wg.done }
  97. // Semaphore is a counting semaphore.
  98. //
  99. // Use NewSemaphore to create one.
  100. type Semaphore struct {
  101. c chan struct{}
  102. }
  103. // NewSemaphore returns a semaphore with resource count n.
  104. func NewSemaphore(n int) Semaphore {
  105. return Semaphore{c: make(chan struct{}, n)}
  106. }
  107. // Acquire blocks until a resource is acquired.
  108. func (s Semaphore) Acquire() {
  109. s.c <- struct{}{}
  110. }
  111. // AcquireContext reports whether the resource was acquired before the ctx was done.
  112. func (s Semaphore) AcquireContext(ctx context.Context) bool {
  113. select {
  114. case s.c <- struct{}{}:
  115. return true
  116. case <-ctx.Done():
  117. return false
  118. }
  119. }
  120. // TryAcquire reports, without blocking, whether the resource was acquired.
  121. func (s Semaphore) TryAcquire() bool {
  122. select {
  123. case s.c <- struct{}{}:
  124. return true
  125. default:
  126. return false
  127. }
  128. }
  129. // Release releases a resource.
  130. func (s Semaphore) Release() {
  131. <-s.c
  132. }
  133. // Map is a Go map protected by a [sync.RWMutex].
  134. // It is preferred over [sync.Map] for maps with entries that change
  135. // at a relatively high frequency.
  136. // This must not be shallow copied.
  137. type Map[K comparable, V any] struct {
  138. mu sync.RWMutex
  139. m map[K]V
  140. }
  141. func (m *Map[K, V]) Load(key K) (value V, ok bool) {
  142. m.mu.RLock()
  143. defer m.mu.RUnlock()
  144. value, ok = m.m[key]
  145. return value, ok
  146. }
  147. func (m *Map[K, V]) Store(key K, value V) {
  148. m.mu.Lock()
  149. defer m.mu.Unlock()
  150. mak.Set(&m.m, key, value)
  151. }
  152. func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
  153. if actual, loaded = m.Load(key); loaded {
  154. return actual, loaded
  155. }
  156. m.mu.Lock()
  157. defer m.mu.Unlock()
  158. actual, loaded = m.m[key]
  159. if !loaded {
  160. actual = value
  161. mak.Set(&m.m, key, value)
  162. }
  163. return actual, loaded
  164. }
  165. func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
  166. m.mu.Lock()
  167. defer m.mu.Unlock()
  168. value, loaded = m.m[key]
  169. if loaded {
  170. delete(m.m, key)
  171. }
  172. return value, loaded
  173. }
  174. func (m *Map[K, V]) Delete(key K) {
  175. m.mu.Lock()
  176. defer m.mu.Unlock()
  177. delete(m.m, key)
  178. }
  179. func (m *Map[K, V]) Range(f func(key K, value V) bool) {
  180. m.mu.RLock()
  181. defer m.mu.RUnlock()
  182. for k, v := range m.m {
  183. if !f(k, v) {
  184. return
  185. }
  186. }
  187. }
  188. // WaitGroup is identical to [sync.WaitGroup],
  189. // but provides a Go method to start a goroutine.
  190. type WaitGroup struct{ sync.WaitGroup }
  191. // Go calls the given function in a new goroutine.
  192. // It automatically increments the counter before execution and
  193. // automatically decrements the counter after execution.
  194. // It must not be called concurrently with Wait.
  195. func (wg *WaitGroup) Go(f func()) {
  196. wg.Add(1)
  197. go func() {
  198. defer wg.Done()
  199. f()
  200. }()
  201. }