syncs.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // Copyright (c) 2020 Tailscale Inc & AUTHORS All rights reserved.
  2. // Use of this source code is governed by a BSD-style
  3. // license that can be found in the LICENSE file.
  4. // Package syncs contains additional sync types and functionality.
  5. package syncs
  6. import (
  7. "context"
  8. "sync/atomic"
  9. )
  10. // ClosedChan returns a channel that's already closed.
  11. func ClosedChan() <-chan struct{} { return closedChan }
  12. var closedChan = initClosedChan()
  13. func initClosedChan() <-chan struct{} {
  14. ch := make(chan struct{})
  15. close(ch)
  16. return ch
  17. }
  18. // WaitGroupChan is like a sync.WaitGroup, but has a chan that closes
  19. // on completion that you can wait on. (This, you can only use the
  20. // value once)
  21. // Also, its zero value is not usable. Use the constructor.
  22. type WaitGroupChan struct {
  23. n int64 // atomic
  24. done chan struct{} // closed on transition to zero
  25. }
  26. // NewWaitGroupChan returns a new single-use WaitGroupChan.
  27. func NewWaitGroupChan() *WaitGroupChan {
  28. return &WaitGroupChan{done: make(chan struct{})}
  29. }
  30. // DoneChan returns a channel that's closed on completion.
  31. func (wg *WaitGroupChan) DoneChan() <-chan struct{} { return wg.done }
  32. // Add adds delta, which may be negative, to the WaitGroupChan
  33. // counter. If the counter becomes zero, all goroutines blocked on
  34. // Wait or the Done chan are released. If the counter goes negative,
  35. // Add panics.
  36. //
  37. // Note that calls with a positive delta that occur when the counter
  38. // is zero must happen before a Wait. Calls with a negative delta, or
  39. // calls with a positive delta that start when the counter is greater
  40. // than zero, may happen at any time. Typically this means the calls
  41. // to Add should execute before the statement creating the goroutine
  42. // or other event to be waited for.
  43. func (wg *WaitGroupChan) Add(delta int) {
  44. n := atomic.AddInt64(&wg.n, int64(delta))
  45. if n == 0 {
  46. close(wg.done)
  47. }
  48. }
  49. // Decr decrements the WaitGroup counter by one.
  50. //
  51. // (It is like sync.WaitGroup's Done method, but we don't use Done in
  52. // this type, because it's ambiguous between Context.Done and
  53. // WaitGroup.Done. So we use DoneChan and Decr instead.)
  54. func (wg *WaitGroupChan) Decr() {
  55. wg.Add(-1)
  56. }
  57. // Wait blocks until the WaitGroupChan counter is zero.
  58. func (wg *WaitGroupChan) Wait() { <-wg.done }
  59. // AtomicBool is an atomic boolean.
  60. type AtomicBool int32
  61. func (b *AtomicBool) Set(v bool) {
  62. var n int32
  63. if v {
  64. n = 1
  65. }
  66. atomic.StoreInt32((*int32)(b), n)
  67. }
  68. // Swap sets b to v and reports whether it changed.
  69. func (b *AtomicBool) Swap(v bool) (changed bool) {
  70. var n int32
  71. if v {
  72. n = 1
  73. }
  74. old := atomic.SwapInt32((*int32)(b), n)
  75. return old != n
  76. }
  77. func (b *AtomicBool) Get() bool {
  78. return atomic.LoadInt32((*int32)(b)) != 0
  79. }
  80. // AtomicUint32 is an atomic uint32.
  81. type AtomicUint32 uint32
  82. func (b *AtomicUint32) Set(v uint32) {
  83. atomic.StoreUint32((*uint32)(b), v)
  84. }
  85. func (b *AtomicUint32) Get() uint32 {
  86. return atomic.LoadUint32((*uint32)(b))
  87. }
  88. // Semaphore is a counting semaphore.
  89. //
  90. // Use NewSemaphore to create one.
  91. type Semaphore struct {
  92. c chan struct{}
  93. }
  94. // NewSemaphore returns a semaphore with resource count n.
  95. func NewSemaphore(n int) Semaphore {
  96. return Semaphore{c: make(chan struct{}, n)}
  97. }
  98. // Acquire blocks until a resource is acquired.
  99. func (s Semaphore) Acquire() {
  100. s.c <- struct{}{}
  101. }
  102. // AcquireContext reports whether the resource was acquired before the ctx was done.
  103. func (s Semaphore) AcquireContext(ctx context.Context) bool {
  104. select {
  105. case s.c <- struct{}{}:
  106. return true
  107. case <-ctx.Done():
  108. return false
  109. }
  110. }
  111. // TryAcquire reports, without blocking, whether the resource was acquired.
  112. func (s Semaphore) TryAcquire() bool {
  113. select {
  114. case s.c <- struct{}{}:
  115. return true
  116. default:
  117. return false
  118. }
  119. }
  120. // Release releases a resource.
  121. func (s Semaphore) Release() {
  122. <-s.c
  123. }