syncs.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package syncs contains additional sync types and functionality.
  4. package syncs
  5. import (
  6. "context"
  7. "sync"
  8. "sync/atomic"
  9. "tailscale.com/util/mak"
  10. )
  11. // ClosedChan returns a channel that's already closed.
  12. func ClosedChan() <-chan struct{} { return closedChan }
  13. var closedChan = initClosedChan()
  14. func initClosedChan() <-chan struct{} {
  15. ch := make(chan struct{})
  16. close(ch)
  17. return ch
  18. }
  19. // AtomicValue is the generic version of atomic.Value.
  20. type AtomicValue[T any] struct {
  21. v atomic.Value
  22. }
  23. // Load returns the value set by the most recent Store.
  24. // It returns the zero value for T if the value is empty.
  25. func (v *AtomicValue[T]) Load() T {
  26. x, _ := v.LoadOk()
  27. return x
  28. }
  29. // LoadOk is like Load but returns a boolean indicating whether the value was
  30. // loaded.
  31. func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
  32. x := v.v.Load()
  33. if x != nil {
  34. return x.(T), true
  35. }
  36. var zero T
  37. return zero, false
  38. }
  39. // Store sets the value of the Value to x.
  40. func (v *AtomicValue[T]) Store(x T) {
  41. v.v.Store(x)
  42. }
  43. // Swap stores new into Value and returns the previous value.
  44. // It returns the zero value for T if the value is empty.
  45. func (v *AtomicValue[T]) Swap(x T) (old T) {
  46. oldV := v.v.Swap(x)
  47. if oldV != nil {
  48. return oldV.(T)
  49. }
  50. return old
  51. }
  52. // CompareAndSwap executes the compare-and-swap operation for the Value.
  53. func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
  54. return v.v.CompareAndSwap(oldV, newV)
  55. }
  56. // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
  57. // on completion that you can wait on. (This, you can only use the
  58. // value once)
  59. // Also, its zero value is not usable. Use the constructor.
  60. type WaitGroupChan struct {
  61. n int64 // atomic
  62. done chan struct{} // closed on transition to zero
  63. }
  64. // NewWaitGroupChan returns a new single-use WaitGroupChan.
  65. func NewWaitGroupChan() *WaitGroupChan {
  66. return &WaitGroupChan{done: make(chan struct{})}
  67. }
  68. // DoneChan returns a channel that's closed on completion.
  69. func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
  70. // Add adds delta, which may be negative, to the WaitGroupChan
  71. // counter. If the counter becomes zero, all goroutines blocked on
  72. // Wait or the Done chan are released. If the counter goes negative,
  73. // Add panics.
  74. //
  75. // Note that calls with a positive delta that occur when the counter
  76. // is zero must happen before a Wait. Calls with a negative delta, or
  77. // calls with a positive delta that start when the counter is greater
  78. // than zero, may happen at any time. Typically this means the calls
  79. // to Add should execute before the statement creating the goroutine
  80. // or other event to be waited for.
  81. func (wg *WaitGroupChan) Add(delta int) {
  82. n := atomic.AddInt64(&wg.n, int64(delta))
  83. if n == 0 {
  84. close(wg.done)
  85. }
  86. }
  87. // Decr decrements the WaitGroup counter by one.
  88. //
  89. // (It is like sync.WaitGroup's Done method, but we don't use Done in
  90. // this type, because it's ambiguous between Context.Done and
  91. // WaitGroup.Done. So we use DoneChan and Decr instead.)
  92. func (wg *WaitGroupChan) Decr() {
  93. wg.Add(-1)
  94. }
  95. // Wait blocks until the WaitGroupChan counter is zero.
  96. func (wg *WaitGroupChan) Wait() { <-wg.done }
  97. // Semaphore is a counting semaphore.
  98. //
  99. // Use NewSemaphore to create one.
  100. type Semaphore struct {
  101. c chan struct{}
  102. }
  103. // NewSemaphore returns a semaphore with resource count n.
  104. func NewSemaphore(n int) Semaphore {
  105. return Semaphore{c: make(chan struct{}, n)}
  106. }
  107. // Acquire blocks until a resource is acquired.
  108. func (s Semaphore) Acquire() {
  109. s.c <- struct{}{}
  110. }
  111. // AcquireContext reports whether the resource was acquired before the ctx was done.
  112. func (s Semaphore) AcquireContext(ctx context.Context) bool {
  113. select {
  114. case s.c <- struct{}{}:
  115. return true
  116. case <-ctx.Done():
  117. return false
  118. }
  119. }
  120. // TryAcquire reports, without blocking, whether the resource was acquired.
  121. func (s Semaphore) TryAcquire() bool {
  122. select {
  123. case s.c <- struct{}{}:
  124. return true
  125. default:
  126. return false
  127. }
  128. }
  129. // Release releases a resource.
  130. func (s Semaphore) Release() {
  131. <-s.c
  132. }
  133. // Map is a Go map protected by a [sync.RWMutex].
  134. // It is preferred over [sync.Map] for maps with entries that change
  135. // at a relatively high frequency.
  136. // This must not be shallow copied.
  137. type Map[K comparable, V any] struct {
  138. mu sync.RWMutex
  139. m map[K]V
  140. }
  141. // Load loads the value for the provided key and whether it was found.
  142. func (m *Map[K, V]) Load(key K) (value V, loaded bool) {
  143. m.mu.RLock()
  144. defer m.mu.RUnlock()
  145. value, loaded = m.m[key]
  146. return value, loaded
  147. }
  148. // LoadFunc calls f with the value for the provided key
  149. // regardless of whether the entry exists or not.
  150. // The lock is held for the duration of the call to f.
  151. func (m *Map[K, V]) LoadFunc(key K, f func(value V, loaded bool)) {
  152. m.mu.RLock()
  153. defer m.mu.RUnlock()
  154. value, loaded := m.m[key]
  155. f(value, loaded)
  156. }
  157. // Store stores the value for the provided key.
  158. func (m *Map[K, V]) Store(key K, value V) {
  159. m.mu.Lock()
  160. defer m.mu.Unlock()
  161. mak.Set(&m.m, key, value)
  162. }
  163. // LoadOrStore returns the value for the given key if it exists
  164. // otherwise it stores value.
  165. func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
  166. if actual, loaded = m.Load(key); loaded {
  167. return actual, loaded
  168. }
  169. m.mu.Lock()
  170. defer m.mu.Unlock()
  171. actual, loaded = m.m[key]
  172. if !loaded {
  173. actual = value
  174. mak.Set(&m.m, key, value)
  175. }
  176. return actual, loaded
  177. }
  178. // LoadOrInit returns the value for the given key if it exists
  179. // otherwise f is called to construct the value to be set.
  180. // The lock is held for the duration to prevent duplicate initialization.
  181. func (m *Map[K, V]) LoadOrInit(key K, f func() V) (actual V, loaded bool) {
  182. if actual, loaded := m.Load(key); loaded {
  183. return actual, loaded
  184. }
  185. m.mu.Lock()
  186. defer m.mu.Unlock()
  187. if actual, loaded = m.m[key]; loaded {
  188. return actual, loaded
  189. }
  190. loaded = false
  191. actual = f()
  192. mak.Set(&m.m, key, actual)
  193. return actual, loaded
  194. }
  195. // LoadAndDelete returns the value for the given key if it exists.
  196. // It ensures that the map is cleared of any entry for the key.
  197. func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
  198. m.mu.Lock()
  199. defer m.mu.Unlock()
  200. value, loaded = m.m[key]
  201. if loaded {
  202. delete(m.m, key)
  203. }
  204. return value, loaded
  205. }
  206. // Delete deletes the entry identified by key.
  207. func (m *Map[K, V]) Delete(key K) {
  208. m.mu.Lock()
  209. defer m.mu.Unlock()
  210. delete(m.m, key)
  211. }
  212. // Range iterates over the map in undefined order calling f for each entry.
  213. // Iteration stops if f returns false. Map changes are blocked during iteration.
  214. func (m *Map[K, V]) Range(f func(key K, value V) bool) {
  215. m.mu.RLock()
  216. defer m.mu.RUnlock()
  217. for k, v := range m.m {
  218. if !f(k, v) {
  219. return
  220. }
  221. }
  222. }
  223. // Len returns the length of the map.
  224. func (m *Map[K, V]) Len() int {
  225. m.mu.RLock()
  226. defer m.mu.RUnlock()
  227. return len(m.m)
  228. }
  229. // Clear removes all entries from the map.
  230. func (m *Map[K, V]) Clear() {
  231. m.mu.Lock()
  232. defer m.mu.Unlock()
  233. clear(m.m)
  234. }
  235. // WaitGroup is identical to [sync.WaitGroup],
  236. // but provides a Go method to start a goroutine.
  237. type WaitGroup struct{ sync.WaitGroup }
  238. // Go calls the given function in a new goroutine.
  239. // It automatically increments the counter before execution and
  240. // automatically decrements the counter after execution.
  241. // It must not be called concurrently with Wait.
  242. func (wg *WaitGroup) Go(f func()) {
  243. wg.Add(1)
  244. go func() {
  245. defer wg.Done()
  246. f()
  247. }()
  248. }