syncs.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package syncs contains additional sync types and functionality.
  4. package syncs
  5. import (
  6. "context"
  7. "sync"
  8. "sync/atomic"
  9. "tailscale.com/util/mak"
  10. )
  11. // ClosedChan returns a channel that's already closed.
  12. func ClosedChan() <-chan struct{} { return closedChan }
  13. var closedChan = initClosedChan()
  14. func initClosedChan() <-chan struct{} {
  15. ch := make(chan struct{})
  16. close(ch)
  17. return ch
  18. }
  19. // AtomicValue is the generic version of [atomic.Value].
  20. type AtomicValue[T any] struct {
  21. v atomic.Value
  22. }
  23. // wrappedValue is used to wrap a value T in a concrete type,
  24. // otherwise atomic.Value.Store may panic due to mismatching types in interfaces.
  25. // This wrapping is not necessary for non-interface kinds of T,
  26. // but there is no harm in wrapping anyways.
  27. // See https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/sync/atomic/value.go;l=78
  28. type wrappedValue[T any] struct{ v T }
  29. // Load returns the value set by the most recent Store.
  30. // It returns the zero value for T if the value is empty.
  31. func (v *AtomicValue[T]) Load() T {
  32. x, _ := v.LoadOk()
  33. return x
  34. }
  35. // LoadOk is like Load but returns a boolean indicating whether the value was
  36. // loaded.
  37. func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
  38. x := v.v.Load()
  39. if x != nil {
  40. return x.(wrappedValue[T]).v, true
  41. }
  42. var zero T
  43. return zero, false
  44. }
  45. // Store sets the value of the Value to x.
  46. func (v *AtomicValue[T]) Store(x T) {
  47. v.v.Store(wrappedValue[T]{x})
  48. }
  49. // Swap stores new into Value and returns the previous value.
  50. // It returns the zero value for T if the value is empty.
  51. func (v *AtomicValue[T]) Swap(x T) (old T) {
  52. oldV := v.v.Swap(wrappedValue[T]{x})
  53. if oldV != nil {
  54. return oldV.(wrappedValue[T]).v
  55. }
  56. return old
  57. }
  58. // CompareAndSwap executes the compare-and-swap operation for the Value.
  59. func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
  60. return v.v.CompareAndSwap(wrappedValue[T]{oldV}, wrappedValue[T]{newV})
  61. }
  62. // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
  63. // on completion that you can wait on. (This, you can only use the
  64. // value once)
  65. // Also, its zero value is not usable. Use the constructor.
  66. type WaitGroupChan struct {
  67. n int64 // atomic
  68. done chan struct{} // closed on transition to zero
  69. }
  70. // NewWaitGroupChan returns a new single-use WaitGroupChan.
  71. func NewWaitGroupChan() *WaitGroupChan {
  72. return &WaitGroupChan{done: make(chan struct{})}
  73. }
  74. // DoneChan returns a channel that's closed on completion.
  75. func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
  76. // Add adds delta, which may be negative, to the WaitGroupChan
  77. // counter. If the counter becomes zero, all goroutines blocked on
  78. // Wait or the Done chan are released. If the counter goes negative,
  79. // Add panics.
  80. //
  81. // Note that calls with a positive delta that occur when the counter
  82. // is zero must happen before a Wait. Calls with a negative delta, or
  83. // calls with a positive delta that start when the counter is greater
  84. // than zero, may happen at any time. Typically this means the calls
  85. // to Add should execute before the statement creating the goroutine
  86. // or other event to be waited for.
  87. func (wg *WaitGroupChan) Add(delta int) {
  88. n := atomic.AddInt64(&wg.n, int64(delta))
  89. if n == 0 {
  90. close(wg.done)
  91. }
  92. }
  93. // Decr decrements the WaitGroup counter by one.
  94. //
  95. // (It is like sync.WaitGroup's Done method, but we don't use Done in
  96. // this type, because it's ambiguous between Context.Done and
  97. // WaitGroup.Done. So we use DoneChan and Decr instead.)
  98. func (wg *WaitGroupChan) Decr() {
  99. wg.Add(-1)
  100. }
  101. // Wait blocks until the WaitGroupChan counter is zero.
  102. func (wg *WaitGroupChan) Wait() { <-wg.done }
  103. // Semaphore is a counting semaphore.
  104. //
  105. // Use NewSemaphore to create one.
  106. type Semaphore struct {
  107. c chan struct{}
  108. }
  109. // NewSemaphore returns a semaphore with resource count n.
  110. func NewSemaphore(n int) Semaphore {
  111. return Semaphore{c: make(chan struct{}, n)}
  112. }
  113. // Acquire blocks until a resource is acquired.
  114. func (s Semaphore) Acquire() {
  115. s.c <- struct{}{}
  116. }
  117. // AcquireContext reports whether the resource was acquired before the ctx was done.
  118. func (s Semaphore) AcquireContext(ctx context.Context) bool {
  119. select {
  120. case s.c <- struct{}{}:
  121. return true
  122. case <-ctx.Done():
  123. return false
  124. }
  125. }
  126. // TryAcquire reports, without blocking, whether the resource was acquired.
  127. func (s Semaphore) TryAcquire() bool {
  128. select {
  129. case s.c <- struct{}{}:
  130. return true
  131. default:
  132. return false
  133. }
  134. }
  135. // Release releases a resource.
  136. func (s Semaphore) Release() {
  137. <-s.c
  138. }
  139. // Map is a Go map protected by a [sync.RWMutex].
  140. // It is preferred over [sync.Map] for maps with entries that change
  141. // at a relatively high frequency.
  142. // This must not be shallow copied.
  143. type Map[K comparable, V any] struct {
  144. mu sync.RWMutex
  145. m map[K]V
  146. }
  147. // Load loads the value for the provided key and whether it was found.
  148. func (m *Map[K, V]) Load(key K) (value V, loaded bool) {
  149. m.mu.RLock()
  150. defer m.mu.RUnlock()
  151. value, loaded = m.m[key]
  152. return value, loaded
  153. }
  154. // LoadFunc calls f with the value for the provided key
  155. // regardless of whether the entry exists or not.
  156. // The lock is held for the duration of the call to f.
  157. func (m *Map[K, V]) LoadFunc(key K, f func(value V, loaded bool)) {
  158. m.mu.RLock()
  159. defer m.mu.RUnlock()
  160. value, loaded := m.m[key]
  161. f(value, loaded)
  162. }
  163. // Store stores the value for the provided key.
  164. func (m *Map[K, V]) Store(key K, value V) {
  165. m.mu.Lock()
  166. defer m.mu.Unlock()
  167. mak.Set(&m.m, key, value)
  168. }
  169. // LoadOrStore returns the value for the given key if it exists
  170. // otherwise it stores value.
  171. func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
  172. if actual, loaded = m.Load(key); loaded {
  173. return actual, loaded
  174. }
  175. m.mu.Lock()
  176. defer m.mu.Unlock()
  177. actual, loaded = m.m[key]
  178. if !loaded {
  179. actual = value
  180. mak.Set(&m.m, key, value)
  181. }
  182. return actual, loaded
  183. }
  184. // LoadOrInit returns the value for the given key if it exists
  185. // otherwise f is called to construct the value to be set.
  186. // The lock is held for the duration to prevent duplicate initialization.
  187. func (m *Map[K, V]) LoadOrInit(key K, f func() V) (actual V, loaded bool) {
  188. if actual, loaded := m.Load(key); loaded {
  189. return actual, loaded
  190. }
  191. m.mu.Lock()
  192. defer m.mu.Unlock()
  193. if actual, loaded = m.m[key]; loaded {
  194. return actual, loaded
  195. }
  196. loaded = false
  197. actual = f()
  198. mak.Set(&m.m, key, actual)
  199. return actual, loaded
  200. }
  201. // LoadAndDelete returns the value for the given key if it exists.
  202. // It ensures that the map is cleared of any entry for the key.
  203. func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
  204. m.mu.Lock()
  205. defer m.mu.Unlock()
  206. value, loaded = m.m[key]
  207. if loaded {
  208. delete(m.m, key)
  209. }
  210. return value, loaded
  211. }
  212. // Delete deletes the entry identified by key.
  213. func (m *Map[K, V]) Delete(key K) {
  214. m.mu.Lock()
  215. defer m.mu.Unlock()
  216. delete(m.m, key)
  217. }
  218. // Range iterates over the map in undefined order calling f for each entry.
  219. // Iteration stops if f returns false. Map changes are blocked during iteration.
  220. func (m *Map[K, V]) Range(f func(key K, value V) bool) {
  221. m.mu.RLock()
  222. defer m.mu.RUnlock()
  223. for k, v := range m.m {
  224. if !f(k, v) {
  225. return
  226. }
  227. }
  228. }
  229. // Len returns the length of the map.
  230. func (m *Map[K, V]) Len() int {
  231. m.mu.RLock()
  232. defer m.mu.RUnlock()
  233. return len(m.m)
  234. }
  235. // Clear removes all entries from the map.
  236. func (m *Map[K, V]) Clear() {
  237. m.mu.Lock()
  238. defer m.mu.Unlock()
  239. clear(m.m)
  240. }
  241. // Swap stores the value for the provided key, and returns the previous value
  242. // (if any). If there was no previous value set, a zero value will be returned.
  243. func (m *Map[K, V]) Swap(key K, value V) (oldValue V) {
  244. m.mu.Lock()
  245. defer m.mu.Unlock()
  246. oldValue = m.m[key]
  247. mak.Set(&m.m, key, value)
  248. return oldValue
  249. }
  250. // WaitGroup is identical to [sync.WaitGroup],
  251. // but provides a Go method to start a goroutine.
  252. type WaitGroup struct{ sync.WaitGroup }
  253. // Go calls the given function in a new goroutine.
  254. // It automatically increments the counter before execution and
  255. // automatically decrements the counter after execution.
  256. // It must not be called concurrently with Wait.
  257. func (wg *WaitGroup) Go(f func()) {
  258. wg.Add(1)
  259. go func() {
  260. defer wg.Done()
  261. f()
  262. }()
  263. }