syncs.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package syncs contains additional sync types and functionality.
  4. package syncs
  5. import (
  6. "context"
  7. "iter"
  8. "sync"
  9. "sync/atomic"
  10. "tailscale.com/util/mak"
  11. )
  12. // ClosedChan returns a channel that's already closed.
  13. func ClosedChan() <-chan struct{} { return closedChan }
  14. var closedChan = initClosedChan()
  15. func initClosedChan() <-chan struct{} {
  16. ch := make(chan struct{})
  17. close(ch)
  18. return ch
  19. }
  20. // AtomicValue is the generic version of [atomic.Value].
  21. // See [MutexValue] for guidance on whether to use this type.
  22. type AtomicValue[T any] struct {
  23. v atomic.Value
  24. }
  25. // wrappedValue is used to wrap a value T in a concrete type,
  26. // otherwise atomic.Value.Store may panic due to mismatching types in interfaces.
  27. // This wrapping is not necessary for non-interface kinds of T,
  28. // but there is no harm in wrapping anyways.
  29. // See https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/sync/atomic/value.go;l=78
  30. type wrappedValue[T any] struct{ v T }
  31. // Load returns the value set by the most recent Store.
  32. // It returns the zero value for T if the value is empty.
  33. func (v *AtomicValue[T]) Load() T {
  34. x, _ := v.LoadOk()
  35. return x
  36. }
  37. // LoadOk is like Load but returns a boolean indicating whether the value was
  38. // loaded.
  39. func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
  40. x := v.v.Load()
  41. if x != nil {
  42. return x.(wrappedValue[T]).v, true
  43. }
  44. var zero T
  45. return zero, false
  46. }
  47. // Store sets the value of the Value to x.
  48. func (v *AtomicValue[T]) Store(x T) {
  49. v.v.Store(wrappedValue[T]{x})
  50. }
  51. // Swap stores new into Value and returns the previous value.
  52. // It returns the zero value for T if the value is empty.
  53. func (v *AtomicValue[T]) Swap(x T) (old T) {
  54. oldV := v.v.Swap(wrappedValue[T]{x})
  55. if oldV != nil {
  56. return oldV.(wrappedValue[T]).v
  57. }
  58. return old // zero value of T
  59. }
  60. // CompareAndSwap executes the compare-and-swap operation for the Value.
  61. // It panics if T is not comparable.
  62. func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
  63. var zero T
  64. return v.v.CompareAndSwap(wrappedValue[T]{oldV}, wrappedValue[T]{newV}) ||
  65. // In the edge-case where [atomic.Value.Store] is uninitialized
  66. // and trying to compare with the zero value of T,
  67. // then compare-and-swap with the nil any value.
  68. (any(oldV) == any(zero) && v.v.CompareAndSwap(any(nil), wrappedValue[T]{newV}))
  69. }
  70. // MutexValue is a value protected by a mutex.
  71. //
  72. // AtomicValue, [MutexValue], [atomic.Pointer] are similar and
  73. // overlap in their use cases.
  74. //
  75. // - Use [atomic.Pointer] if the value being stored is a pointer and
  76. // you only ever need load and store operations.
  77. // An atomic pointer only occupies 1 word of memory.
  78. //
  79. // - Use [MutexValue] if the value being stored is not a pointer or
  80. // you need the ability for a mutex to protect a set of operations
  81. // performed on the value.
  82. // A mutex-guarded value occupies 1 word of memory plus
  83. // the memory representation of T.
  84. //
  85. // - AtomicValue is useful for non-pointer types that happen to
  86. // have the memory layout of a single pointer.
  87. // Examples include a map, channel, func, or a single field struct
  88. // that contains any prior types.
  89. // An atomic value occupies 2 words of memory.
  90. // Consequently, Storing of non-pointer types always allocates.
  91. //
  92. // Note that [AtomicValue] has the ability to report whether it was set
  93. // while [MutexValue] lacks the ability to detect if the value was set
  94. // and it happens to be the zero value of T. If such a use case is
  95. // necessary, then you could consider wrapping T in [opt.Value].
  96. type MutexValue[T any] struct {
  97. mu sync.Mutex
  98. v T
  99. }
  100. // WithLock calls f with a pointer to the value while holding the lock.
  101. // The provided pointer must not leak beyond the scope of the call.
  102. func (m *MutexValue[T]) WithLock(f func(p *T)) {
  103. m.mu.Lock()
  104. defer m.mu.Unlock()
  105. f(&m.v)
  106. }
  107. // Load returns a shallow copy of the underlying value.
  108. func (m *MutexValue[T]) Load() T {
  109. m.mu.Lock()
  110. defer m.mu.Unlock()
  111. return m.v
  112. }
  113. // Store stores a shallow copy of the provided value.
  114. func (m *MutexValue[T]) Store(v T) {
  115. m.mu.Lock()
  116. defer m.mu.Unlock()
  117. m.v = v
  118. }
  119. // Swap stores new into m and returns the previous value.
  120. func (m *MutexValue[T]) Swap(new T) (old T) {
  121. m.mu.Lock()
  122. defer m.mu.Unlock()
  123. old, m.v = m.v, new
  124. return old
  125. }
  126. // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
  127. // on completion that you can wait on. (This, you can only use the
  128. // value once)
  129. // Also, its zero value is not usable. Use the constructor.
  130. type WaitGroupChan struct {
  131. n int64 // atomic
  132. done chan struct{} // closed on transition to zero
  133. }
  134. // NewWaitGroupChan returns a new single-use WaitGroupChan.
  135. func NewWaitGroupChan() *WaitGroupChan {
  136. return &WaitGroupChan{done: make(chan struct{})}
  137. }
  138. // DoneChan returns a channel that's closed on completion.
  139. func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
  140. // Add adds delta, which may be negative, to the WaitGroupChan
  141. // counter. If the counter becomes zero, all goroutines blocked on
  142. // Wait or the Done chan are released. If the counter goes negative,
  143. // Add panics.
  144. //
  145. // Note that calls with a positive delta that occur when the counter
  146. // is zero must happen before a Wait. Calls with a negative delta, or
  147. // calls with a positive delta that start when the counter is greater
  148. // than zero, may happen at any time. Typically this means the calls
  149. // to Add should execute before the statement creating the goroutine
  150. // or other event to be waited for.
  151. func (wg *WaitGroupChan) Add(delta int) {
  152. n := atomic.AddInt64(&wg.n, int64(delta))
  153. if n == 0 {
  154. close(wg.done)
  155. }
  156. }
  157. // Decr decrements the WaitGroup counter by one.
  158. //
  159. // (It is like sync.WaitGroup's Done method, but we don't use Done in
  160. // this type, because it's ambiguous between Context.Done and
  161. // WaitGroup.Done. So we use DoneChan and Decr instead.)
  162. func (wg *WaitGroupChan) Decr() {
  163. wg.Add(-1)
  164. }
  165. // Wait blocks until the WaitGroupChan counter is zero.
  166. func (wg *WaitGroupChan) Wait() { <-wg.done }
  167. // Semaphore is a counting semaphore.
  168. //
  169. // Use NewSemaphore to create one.
  170. type Semaphore struct {
  171. c chan struct{}
  172. }
  173. // NewSemaphore returns a semaphore with resource count n.
  174. func NewSemaphore(n int) Semaphore {
  175. return Semaphore{c: make(chan struct{}, n)}
  176. }
  177. // Len reports the number of in-flight acquisitions.
  178. // It is incremented whenever the semaphore is acquired.
  179. // It is decremented whenever the semaphore is released.
  180. func (s Semaphore) Len() int {
  181. return len(s.c)
  182. }
  183. // Acquire blocks until a resource is acquired.
  184. func (s Semaphore) Acquire() {
  185. s.c <- struct{}{}
  186. }
  187. // AcquireContext reports whether the resource was acquired before the ctx was done.
  188. func (s Semaphore) AcquireContext(ctx context.Context) bool {
  189. select {
  190. case s.c <- struct{}{}:
  191. return true
  192. case <-ctx.Done():
  193. return false
  194. }
  195. }
  196. // TryAcquire reports, without blocking, whether the resource was acquired.
  197. func (s Semaphore) TryAcquire() bool {
  198. select {
  199. case s.c <- struct{}{}:
  200. return true
  201. default:
  202. return false
  203. }
  204. }
  205. // Release releases a resource.
  206. func (s Semaphore) Release() {
  207. <-s.c
  208. }
  209. // Map is a Go map protected by a [sync.RWMutex].
  210. // It is preferred over [sync.Map] for maps with entries that change
  211. // at a relatively high frequency.
  212. // This must not be shallow copied.
  213. type Map[K comparable, V any] struct {
  214. mu sync.RWMutex
  215. m map[K]V
  216. }
  217. // Load loads the value for the provided key and whether it was found.
  218. func (m *Map[K, V]) Load(key K) (value V, loaded bool) {
  219. m.mu.RLock()
  220. defer m.mu.RUnlock()
  221. value, loaded = m.m[key]
  222. return value, loaded
  223. }
  224. // LoadFunc calls f with the value for the provided key
  225. // regardless of whether the entry exists or not.
  226. // The lock is held for the duration of the call to f.
  227. func (m *Map[K, V]) LoadFunc(key K, f func(value V, loaded bool)) {
  228. m.mu.RLock()
  229. defer m.mu.RUnlock()
  230. value, loaded := m.m[key]
  231. f(value, loaded)
  232. }
  233. // Store stores the value for the provided key.
  234. func (m *Map[K, V]) Store(key K, value V) {
  235. m.mu.Lock()
  236. defer m.mu.Unlock()
  237. mak.Set(&m.m, key, value)
  238. }
  239. // LoadOrStore returns the value for the given key if it exists
  240. // otherwise it stores value.
  241. func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
  242. if actual, loaded = m.Load(key); loaded {
  243. return actual, loaded
  244. }
  245. m.mu.Lock()
  246. defer m.mu.Unlock()
  247. actual, loaded = m.m[key]
  248. if !loaded {
  249. actual = value
  250. mak.Set(&m.m, key, value)
  251. }
  252. return actual, loaded
  253. }
  254. // LoadOrInit returns the value for the given key if it exists
  255. // otherwise f is called to construct the value to be set.
  256. // The lock is held for the duration to prevent duplicate initialization.
  257. func (m *Map[K, V]) LoadOrInit(key K, f func() V) (actual V, loaded bool) {
  258. if actual, loaded := m.Load(key); loaded {
  259. return actual, loaded
  260. }
  261. m.mu.Lock()
  262. defer m.mu.Unlock()
  263. if actual, loaded = m.m[key]; loaded {
  264. return actual, loaded
  265. }
  266. loaded = false
  267. actual = f()
  268. mak.Set(&m.m, key, actual)
  269. return actual, loaded
  270. }
  271. // LoadAndDelete returns the value for the given key if it exists.
  272. // It ensures that the map is cleared of any entry for the key.
  273. func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
  274. m.mu.Lock()
  275. defer m.mu.Unlock()
  276. value, loaded = m.m[key]
  277. if loaded {
  278. delete(m.m, key)
  279. }
  280. return value, loaded
  281. }
  282. // Delete deletes the entry identified by key.
  283. func (m *Map[K, V]) Delete(key K) {
  284. m.mu.Lock()
  285. defer m.mu.Unlock()
  286. delete(m.m, key)
  287. }
  288. // Keys iterates over all keys in the map in an undefined order.
  289. // A read lock is held for the entire duration of the iteration.
  290. // Use the [WithLock] method instead to mutate the map during iteration.
  291. func (m *Map[K, V]) Keys() iter.Seq[K] {
  292. return func(yield func(K) bool) {
  293. m.mu.RLock()
  294. defer m.mu.RUnlock()
  295. for k := range m.m {
  296. if !yield(k) {
  297. return
  298. }
  299. }
  300. }
  301. }
  302. // Values iterates over all values in the map in an undefined order.
  303. // A read lock is held for the entire duration of the iteration.
  304. // Use the [WithLock] method instead to mutate the map during iteration.
  305. func (m *Map[K, V]) Values() iter.Seq[V] {
  306. return func(yield func(V) bool) {
  307. m.mu.RLock()
  308. defer m.mu.RUnlock()
  309. for _, v := range m.m {
  310. if !yield(v) {
  311. return
  312. }
  313. }
  314. }
  315. }
  316. // All iterates over all entries in the map in an undefined order.
  317. // A read lock is held for the entire duration of the iteration.
  318. // Use the [WithLock] method instead to mutate the map during iteration.
  319. func (m *Map[K, V]) All() iter.Seq2[K, V] {
  320. return func(yield func(K, V) bool) {
  321. m.mu.RLock()
  322. defer m.mu.RUnlock()
  323. for k, v := range m.m {
  324. if !yield(k, v) {
  325. return
  326. }
  327. }
  328. }
  329. }
  330. // WithLock calls f with the underlying map.
  331. // Use of m2 must not escape the duration of this call.
  332. // The write-lock is held for the entire duration of this call.
  333. func (m *Map[K, V]) WithLock(f func(m2 map[K]V)) {
  334. m.mu.Lock()
  335. defer m.mu.Unlock()
  336. if m.m == nil {
  337. m.m = make(map[K]V)
  338. }
  339. f(m.m)
  340. }
  341. // Len returns the length of the map.
  342. func (m *Map[K, V]) Len() int {
  343. m.mu.RLock()
  344. defer m.mu.RUnlock()
  345. return len(m.m)
  346. }
  347. // Clear removes all entries from the map.
  348. func (m *Map[K, V]) Clear() {
  349. m.mu.Lock()
  350. defer m.mu.Unlock()
  351. clear(m.m)
  352. }
  353. // Swap stores the value for the provided key, and returns the previous value
  354. // (if any). If there was no previous value set, a zero value will be returned.
  355. func (m *Map[K, V]) Swap(key K, value V) (oldValue V) {
  356. m.mu.Lock()
  357. defer m.mu.Unlock()
  358. oldValue = m.m[key]
  359. mak.Set(&m.m, key, value)
  360. return oldValue
  361. }