| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- // Copyright (c) Tailscale Inc & AUTHORS
- // SPDX-License-Identifier: BSD-3-Clause
- // Package syncs contains additional sync types and functionality.
- package syncs
- import (
- "context"
- "iter"
- "sync"
- "sync/atomic"
- "tailscale.com/util/mak"
- )
- // ClosedChan returns a channel that's already closed.
- func ClosedChan() <-chan struct{} { return closedChan }
- var closedChan = initClosedChan()
- func initClosedChan() <-chan struct{} {
- ch := make(chan struct{})
- close(ch)
- return ch
- }
- // AtomicValue is the generic version of [atomic.Value].
- // See [MutexValue] for guidance on whether to use this type.
- type AtomicValue[T any] struct {
- v atomic.Value
- }
- // wrappedValue is used to wrap a value T in a concrete type,
- // otherwise atomic.Value.Store may panic due to mismatching types in interfaces.
- // This wrapping is not necessary for non-interface kinds of T,
- // but there is no harm in wrapping anyways.
- // See https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/sync/atomic/value.go;l=78
- type wrappedValue[T any] struct{ v T }
- // Load returns the value set by the most recent Store.
- // It returns the zero value for T if the value is empty.
- func (v *AtomicValue[T]) Load() T {
- x, _ := v.LoadOk()
- return x
- }
- // LoadOk is like Load but returns a boolean indicating whether the value was
- // loaded.
- func (v *AtomicValue[T]) LoadOk() (_ T, ok bool) {
- x := v.v.Load()
- if x != nil {
- return x.(wrappedValue[T]).v, true
- }
- var zero T
- return zero, false
- }
- // Store sets the value of the Value to x.
- func (v *AtomicValue[T]) Store(x T) {
- v.v.Store(wrappedValue[T]{x})
- }
- // Swap stores new into Value and returns the previous value.
- // It returns the zero value for T if the value is empty.
- func (v *AtomicValue[T]) Swap(x T) (old T) {
- oldV := v.v.Swap(wrappedValue[T]{x})
- if oldV != nil {
- return oldV.(wrappedValue[T]).v
- }
- return old // zero value of T
- }
- // CompareAndSwap executes the compare-and-swap operation for the Value.
- // It panics if T is not comparable.
- func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
- var zero T
- return v.v.CompareAndSwap(wrappedValue[T]{oldV}, wrappedValue[T]{newV}) ||
- // In the edge-case where [atomic.Value.Store] is uninitialized
- // and trying to compare with the zero value of T,
- // then compare-and-swap with the nil any value.
- (any(oldV) == any(zero) && v.v.CompareAndSwap(any(nil), wrappedValue[T]{newV}))
- }
- // MutexValue is a value protected by a mutex.
- //
- // AtomicValue, [MutexValue], [atomic.Pointer] are similar and
- // overlap in their use cases.
- //
- // - Use [atomic.Pointer] if the value being stored is a pointer and
- // you only ever need load and store operations.
- // An atomic pointer only occupies 1 word of memory.
- //
- // - Use [MutexValue] if the value being stored is not a pointer or
- // you need the ability for a mutex to protect a set of operations
- // performed on the value.
- // A mutex-guarded value occupies 1 word of memory plus
- // the memory representation of T.
- //
- // - AtomicValue is useful for non-pointer types that happen to
- // have the memory layout of a single pointer.
- // Examples include a map, channel, func, or a single field struct
- // that contains any prior types.
- // An atomic value occupies 2 words of memory.
- // Consequently, Storing of non-pointer types always allocates.
- //
- // Note that [AtomicValue] has the ability to report whether it was set
- // while [MutexValue] lacks the ability to detect if the value was set
- // and it happens to be the zero value of T. If such a use case is
- // necessary, then you could consider wrapping T in [opt.Value].
- type MutexValue[T any] struct {
- mu sync.Mutex
- v T
- }
- // WithLock calls f with a pointer to the value while holding the lock.
- // The provided pointer must not leak beyond the scope of the call.
- func (m *MutexValue[T]) WithLock(f func(p *T)) {
- m.mu.Lock()
- defer m.mu.Unlock()
- f(&m.v)
- }
- // Load returns a shallow copy of the underlying value.
- func (m *MutexValue[T]) Load() T {
- m.mu.Lock()
- defer m.mu.Unlock()
- return m.v
- }
- // Store stores a shallow copy of the provided value.
- func (m *MutexValue[T]) Store(v T) {
- m.mu.Lock()
- defer m.mu.Unlock()
- m.v = v
- }
- // Swap stores new into m and returns the previous value.
- func (m *MutexValue[T]) Swap(new T) (old T) {
- m.mu.Lock()
- defer m.mu.Unlock()
- old, m.v = m.v, new
- return old
- }
- // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
- // on completion that you can wait on. (This, you can only use the
- // value once)
- // Also, its zero value is not usable. Use the constructor.
- type WaitGroupChan struct {
- n int64 // atomic
- done chan struct{} // closed on transition to zero
- }
- // NewWaitGroupChan returns a new single-use WaitGroupChan.
- func NewWaitGroupChan() *WaitGroupChan {
- return &WaitGroupChan{done: make(chan struct{})}
- }
- // DoneChan returns a channel that's closed on completion.
- func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
- // Add adds delta, which may be negative, to the WaitGroupChan
- // counter. If the counter becomes zero, all goroutines blocked on
- // Wait or the Done chan are released. If the counter goes negative,
- // Add panics.
- //
- // Note that calls with a positive delta that occur when the counter
- // is zero must happen before a Wait. Calls with a negative delta, or
- // calls with a positive delta that start when the counter is greater
- // than zero, may happen at any time. Typically this means the calls
- // to Add should execute before the statement creating the goroutine
- // or other event to be waited for.
- func (wg *WaitGroupChan) Add(delta int) {
- n := atomic.AddInt64(&wg.n, int64(delta))
- if n == 0 {
- close(wg.done)
- }
- }
- // Decr decrements the WaitGroup counter by one.
- //
- // (It is like sync.WaitGroup's Done method, but we don't use Done in
- // this type, because it's ambiguous between Context.Done and
- // WaitGroup.Done. So we use DoneChan and Decr instead.)
- func (wg *WaitGroupChan) Decr() {
- wg.Add(-1)
- }
- // Wait blocks until the WaitGroupChan counter is zero.
- func (wg *WaitGroupChan) Wait() { <-wg.done }
- // Semaphore is a counting semaphore.
- //
- // Use NewSemaphore to create one.
- type Semaphore struct {
- c chan struct{}
- }
- // NewSemaphore returns a semaphore with resource count n.
- func NewSemaphore(n int) Semaphore {
- return Semaphore{c: make(chan struct{}, n)}
- }
- // Len reports the number of in-flight acquisitions.
- // It is incremented whenever the semaphore is acquired.
- // It is decremented whenever the semaphore is released.
- func (s Semaphore) Len() int {
- return len(s.c)
- }
- // Acquire blocks until a resource is acquired.
- func (s Semaphore) Acquire() {
- s.c <- struct{}{}
- }
- // AcquireContext reports whether the resource was acquired before the ctx was done.
- func (s Semaphore) AcquireContext(ctx context.Context) bool {
- select {
- case s.c <- struct{}{}:
- return true
- case <-ctx.Done():
- return false
- }
- }
- // TryAcquire reports, without blocking, whether the resource was acquired.
- func (s Semaphore) TryAcquire() bool {
- select {
- case s.c <- struct{}{}:
- return true
- default:
- return false
- }
- }
- // Release releases a resource.
- func (s Semaphore) Release() {
- <-s.c
- }
- // Map is a Go map protected by a [sync.RWMutex].
- // It is preferred over [sync.Map] for maps with entries that change
- // at a relatively high frequency.
- // This must not be shallow copied.
- type Map[K comparable, V any] struct {
- mu sync.RWMutex
- m map[K]V
- }
- // Load loads the value for the provided key and whether it was found.
- func (m *Map[K, V]) Load(key K) (value V, loaded bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- value, loaded = m.m[key]
- return value, loaded
- }
- // LoadFunc calls f with the value for the provided key
- // regardless of whether the entry exists or not.
- // The lock is held for the duration of the call to f.
- func (m *Map[K, V]) LoadFunc(key K, f func(value V, loaded bool)) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- value, loaded := m.m[key]
- f(value, loaded)
- }
- // Store stores the value for the provided key.
- func (m *Map[K, V]) Store(key K, value V) {
- m.mu.Lock()
- defer m.mu.Unlock()
- mak.Set(&m.m, key, value)
- }
- // LoadOrStore returns the value for the given key if it exists
- // otherwise it stores value.
- func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
- if actual, loaded = m.Load(key); loaded {
- return actual, loaded
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- actual, loaded = m.m[key]
- if !loaded {
- actual = value
- mak.Set(&m.m, key, value)
- }
- return actual, loaded
- }
- // LoadOrInit returns the value for the given key if it exists
- // otherwise f is called to construct the value to be set.
- // The lock is held for the duration to prevent duplicate initialization.
- func (m *Map[K, V]) LoadOrInit(key K, f func() V) (actual V, loaded bool) {
- if actual, loaded := m.Load(key); loaded {
- return actual, loaded
- }
- m.mu.Lock()
- defer m.mu.Unlock()
- if actual, loaded = m.m[key]; loaded {
- return actual, loaded
- }
- loaded = false
- actual = f()
- mak.Set(&m.m, key, actual)
- return actual, loaded
- }
- // LoadAndDelete returns the value for the given key if it exists.
- // It ensures that the map is cleared of any entry for the key.
- func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
- m.mu.Lock()
- defer m.mu.Unlock()
- value, loaded = m.m[key]
- if loaded {
- delete(m.m, key)
- }
- return value, loaded
- }
- // Delete deletes the entry identified by key.
- func (m *Map[K, V]) Delete(key K) {
- m.mu.Lock()
- defer m.mu.Unlock()
- delete(m.m, key)
- }
- // Keys iterates over all keys in the map in an undefined order.
- // A read lock is held for the entire duration of the iteration.
- // Use the [WithLock] method instead to mutate the map during iteration.
- func (m *Map[K, V]) Keys() iter.Seq[K] {
- return func(yield func(K) bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- for k := range m.m {
- if !yield(k) {
- return
- }
- }
- }
- }
- // Values iterates over all values in the map in an undefined order.
- // A read lock is held for the entire duration of the iteration.
- // Use the [WithLock] method instead to mutate the map during iteration.
- func (m *Map[K, V]) Values() iter.Seq[V] {
- return func(yield func(V) bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- for _, v := range m.m {
- if !yield(v) {
- return
- }
- }
- }
- }
- // All iterates over all entries in the map in an undefined order.
- // A read lock is held for the entire duration of the iteration.
- // Use the [WithLock] method instead to mutate the map during iteration.
- func (m *Map[K, V]) All() iter.Seq2[K, V] {
- return func(yield func(K, V) bool) {
- m.mu.RLock()
- defer m.mu.RUnlock()
- for k, v := range m.m {
- if !yield(k, v) {
- return
- }
- }
- }
- }
- // WithLock calls f with the underlying map.
- // Use of m2 must not escape the duration of this call.
- // The write-lock is held for the entire duration of this call.
- func (m *Map[K, V]) WithLock(f func(m2 map[K]V)) {
- m.mu.Lock()
- defer m.mu.Unlock()
- if m.m == nil {
- m.m = make(map[K]V)
- }
- f(m.m)
- }
- // Len returns the length of the map.
- func (m *Map[K, V]) Len() int {
- m.mu.RLock()
- defer m.mu.RUnlock()
- return len(m.m)
- }
- // Clear removes all entries from the map.
- func (m *Map[K, V]) Clear() {
- m.mu.Lock()
- defer m.mu.Unlock()
- clear(m.m)
- }
- // Swap stores the value for the provided key, and returns the previous value
- // (if any). If there was no previous value set, a zero value will be returned.
- func (m *Map[K, V]) Swap(key K, value V) (oldValue V) {
- m.mu.Lock()
- defer m.mu.Unlock()
- oldValue = m.m[key]
- mak.Set(&m.m, key, value)
- return oldValue
- }
|