sync.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package sync
  7. import (
  8. "fmt"
  9. "path/filepath"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. )
  17. var timeNow = time.Now
  18. type Mutex interface {
  19. Lock()
  20. Unlock()
  21. }
  22. type RWMutex interface {
  23. Mutex
  24. RLock()
  25. RUnlock()
  26. }
  27. type WaitGroup interface {
  28. Add(int)
  29. Done()
  30. Wait()
  31. }
  32. func NewMutex() Mutex {
  33. if debug {
  34. mutex := &loggedMutex{}
  35. mutex.holder.Store(holder{})
  36. return mutex
  37. }
  38. return &sync.Mutex{}
  39. }
  40. func NewRWMutex() RWMutex {
  41. if debug {
  42. mutex := &loggedRWMutex{
  43. readHolders: make(map[int][]holder),
  44. unlockers: make(chan holder, 1024),
  45. }
  46. mutex.holder.Store(holder{})
  47. return mutex
  48. }
  49. return &sync.RWMutex{}
  50. }
  51. func NewWaitGroup() WaitGroup {
  52. if debug {
  53. return &loggedWaitGroup{}
  54. }
  55. return &sync.WaitGroup{}
  56. }
  57. type holder struct {
  58. at string
  59. time time.Time
  60. goid int
  61. }
  62. func (h holder) String() string {
  63. if h.at == "" {
  64. return "not held"
  65. }
  66. return fmt.Sprintf("at %s goid: %d for %s", h.at, h.goid, timeNow().Sub(h.time))
  67. }
  68. type loggedMutex struct {
  69. sync.Mutex
  70. holder atomic.Value
  71. }
  72. func (m *loggedMutex) Lock() {
  73. m.Mutex.Lock()
  74. m.holder.Store(getHolder())
  75. }
  76. func (m *loggedMutex) Unlock() {
  77. currentHolder := m.holder.Load().(holder)
  78. duration := timeNow().Sub(currentHolder.time)
  79. if duration >= threshold {
  80. l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  81. }
  82. m.holder.Store(holder{})
  83. m.Mutex.Unlock()
  84. }
  85. func (m *loggedMutex) Holders() string {
  86. return m.holder.Load().(holder).String()
  87. }
  88. type loggedRWMutex struct {
  89. sync.RWMutex
  90. holder atomic.Value
  91. readHolders map[int][]holder
  92. readHoldersMut sync.Mutex
  93. logUnlockers atomic.Bool
  94. unlockers chan holder
  95. }
  96. func (m *loggedRWMutex) Lock() {
  97. start := timeNow()
  98. m.logUnlockers.Store(true)
  99. m.RWMutex.Lock()
  100. m.logUnlockers.Store(false)
  101. holder := getHolder()
  102. m.holder.Store(holder)
  103. duration := holder.time.Sub(start)
  104. if duration > threshold {
  105. var unlockerStrings []string
  106. loop:
  107. for {
  108. select {
  109. case holder := <-m.unlockers:
  110. unlockerStrings = append(unlockerStrings, holder.String())
  111. default:
  112. break loop
  113. }
  114. }
  115. l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking:\n%s", duration, holder.at, strings.Join(unlockerStrings, "\n"))
  116. }
  117. }
  118. func (m *loggedRWMutex) Unlock() {
  119. currentHolder := m.holder.Load().(holder)
  120. duration := timeNow().Sub(currentHolder.time)
  121. if duration >= threshold {
  122. l.Debugf("RWMutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  123. }
  124. m.holder.Store(holder{})
  125. m.RWMutex.Unlock()
  126. }
  127. func (m *loggedRWMutex) RLock() {
  128. m.RWMutex.RLock()
  129. holder := getHolder()
  130. m.readHoldersMut.Lock()
  131. m.readHolders[holder.goid] = append(m.readHolders[holder.goid], holder)
  132. m.readHoldersMut.Unlock()
  133. }
  134. func (m *loggedRWMutex) RUnlock() {
  135. id := goid()
  136. m.readHoldersMut.Lock()
  137. current := m.readHolders[id]
  138. if len(current) > 0 {
  139. m.readHolders[id] = current[:len(current)-1]
  140. }
  141. m.readHoldersMut.Unlock()
  142. if m.logUnlockers.Load() {
  143. holder := getHolder()
  144. select {
  145. case m.unlockers <- holder:
  146. default:
  147. l.Debugf("Dropped holder %s as channel full", holder)
  148. }
  149. }
  150. m.RWMutex.RUnlock()
  151. }
  152. func (m *loggedRWMutex) Holders() string {
  153. output := m.holder.Load().(holder).String() + " (writer)"
  154. m.readHoldersMut.Lock()
  155. for _, holders := range m.readHolders {
  156. for _, holder := range holders {
  157. output += "\n" + holder.String() + " (reader)"
  158. }
  159. }
  160. m.readHoldersMut.Unlock()
  161. return output
  162. }
  163. type loggedWaitGroup struct {
  164. sync.WaitGroup
  165. }
  166. func (wg *loggedWaitGroup) Wait() {
  167. start := timeNow()
  168. wg.WaitGroup.Wait()
  169. duration := timeNow().Sub(start)
  170. if duration >= threshold {
  171. l.Debugf("WaitGroup took %v at %s", duration, getHolder())
  172. }
  173. }
  174. func getHolder() holder {
  175. _, file, line, _ := runtime.Caller(2)
  176. file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file))
  177. return holder{
  178. at: fmt.Sprintf("%s:%d", file, line),
  179. goid: goid(),
  180. time: timeNow(),
  181. }
  182. }
  183. func goid() int {
  184. var buf [64]byte
  185. n := runtime.Stack(buf[:], false)
  186. idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
  187. id, err := strconv.Atoi(idField)
  188. if err != nil {
  189. return -1
  190. }
  191. return id
  192. }
  193. // TimeoutCond is a variant on Cond. It has roughly the same semantics regarding 'L' - it must be held
  194. // both when broadcasting and when calling TimeoutCondWaiter.Wait()
  195. // Call Broadcast() to broadcast to all waiters on the TimeoutCond. Call SetupWait to create a
  196. // TimeoutCondWaiter configured with the given timeout, which can then be used to listen for
  197. // broadcasts.
  198. type TimeoutCond struct {
  199. L sync.Locker
  200. ch chan struct{}
  201. }
  202. // TimeoutCondWaiter is a type allowing a consumer to wait on a TimeoutCond with a timeout. Wait() may be called multiple times,
  203. // and will return true every time that the TimeoutCond is broadcast to. Once the configured timeout
  204. // expires, Wait() will return false.
  205. // Call Stop() to release resources once this TimeoutCondWaiter is no longer needed.
  206. type TimeoutCondWaiter struct {
  207. c *TimeoutCond
  208. timer *time.Timer
  209. }
  210. func NewTimeoutCond(l sync.Locker) *TimeoutCond {
  211. return &TimeoutCond{
  212. L: l,
  213. }
  214. }
  215. func (c *TimeoutCond) Broadcast() {
  216. // ch.L must be locked when calling this function
  217. if c.ch != nil {
  218. close(c.ch)
  219. c.ch = nil
  220. }
  221. }
  222. func (c *TimeoutCond) SetupWait(timeout time.Duration) *TimeoutCondWaiter {
  223. timer := time.NewTimer(timeout)
  224. return &TimeoutCondWaiter{
  225. c: c,
  226. timer: timer,
  227. }
  228. }
  229. func (w *TimeoutCondWaiter) Wait() bool {
  230. // ch.L must be locked when calling this function
  231. // Ensure that the channel exists, since we're going to be waiting on it
  232. if w.c.ch == nil {
  233. w.c.ch = make(chan struct{})
  234. }
  235. ch := w.c.ch
  236. w.c.L.Unlock()
  237. defer w.c.L.Lock()
  238. select {
  239. case <-w.timer.C:
  240. return false
  241. case <-ch:
  242. return true
  243. }
  244. }
  245. func (w *TimeoutCondWaiter) Stop() {
  246. w.timer.Stop()
  247. }