sync.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package sync
  7. import (
  8. "fmt"
  9. "path/filepath"
  10. "runtime"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "github.com/sasha-s/go-deadlock"
  17. )
  18. type Mutex interface {
  19. Lock()
  20. Unlock()
  21. }
  22. type RWMutex interface {
  23. Mutex
  24. RLock()
  25. RUnlock()
  26. }
  27. type WaitGroup interface {
  28. Add(int)
  29. Done()
  30. Wait()
  31. }
  32. func NewMutex() Mutex {
  33. if useDeadlock {
  34. return &deadlock.Mutex{}
  35. }
  36. if debug {
  37. mutex := &loggedMutex{}
  38. mutex.holder.Store(holder{})
  39. return mutex
  40. }
  41. return &sync.Mutex{}
  42. }
  43. func NewRWMutex() RWMutex {
  44. if useDeadlock {
  45. return &deadlock.RWMutex{}
  46. }
  47. if debug {
  48. mutex := &loggedRWMutex{
  49. readHolders: make(map[int][]holder),
  50. unlockers: make(chan holder, 1024),
  51. }
  52. mutex.holder.Store(holder{})
  53. return mutex
  54. }
  55. return &sync.RWMutex{}
  56. }
  57. func NewWaitGroup() WaitGroup {
  58. if debug {
  59. return &loggedWaitGroup{}
  60. }
  61. return &sync.WaitGroup{}
  62. }
  63. type holder struct {
  64. at string
  65. time time.Time
  66. goid int
  67. }
  68. func (h holder) String() string {
  69. if h.at == "" {
  70. return "not held"
  71. }
  72. return fmt.Sprintf("at %s goid: %d for %s", h.at, h.goid, time.Since(h.time))
  73. }
  74. type loggedMutex struct {
  75. sync.Mutex
  76. holder atomic.Value
  77. }
  78. func (m *loggedMutex) Lock() {
  79. m.Mutex.Lock()
  80. m.holder.Store(getHolder())
  81. }
  82. func (m *loggedMutex) Unlock() {
  83. currentHolder := m.holder.Load().(holder)
  84. duration := time.Since(currentHolder.time)
  85. if duration >= threshold {
  86. l.Debugf("Mutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  87. }
  88. m.holder.Store(holder{})
  89. m.Mutex.Unlock()
  90. }
  91. func (m *loggedMutex) Holders() string {
  92. return m.holder.Load().(holder).String()
  93. }
  94. type loggedRWMutex struct {
  95. sync.RWMutex
  96. holder atomic.Value
  97. readHolders map[int][]holder
  98. readHoldersMut sync.Mutex
  99. logUnlockers int32
  100. unlockers chan holder
  101. }
  102. func (m *loggedRWMutex) Lock() {
  103. start := time.Now()
  104. atomic.StoreInt32(&m.logUnlockers, 1)
  105. m.RWMutex.Lock()
  106. m.logUnlockers = 0
  107. holder := getHolder()
  108. m.holder.Store(holder)
  109. duration := holder.time.Sub(start)
  110. if duration > threshold {
  111. var unlockerStrings []string
  112. loop:
  113. for {
  114. select {
  115. case holder := <-m.unlockers:
  116. unlockerStrings = append(unlockerStrings, holder.String())
  117. default:
  118. break loop
  119. }
  120. }
  121. l.Debugf("RWMutex took %v to lock. Locked at %s. RUnlockers while locking:\n%s", duration, holder.at, strings.Join(unlockerStrings, "\n"))
  122. }
  123. }
  124. func (m *loggedRWMutex) Unlock() {
  125. currentHolder := m.holder.Load().(holder)
  126. duration := time.Since(currentHolder.time)
  127. if duration >= threshold {
  128. l.Debugf("RWMutex held for %v. Locked at %s unlocked at %s", duration, currentHolder.at, getHolder().at)
  129. }
  130. m.holder.Store(holder{})
  131. m.RWMutex.Unlock()
  132. }
  133. func (m *loggedRWMutex) RLock() {
  134. m.RWMutex.RLock()
  135. holder := getHolder()
  136. m.readHoldersMut.Lock()
  137. m.readHolders[holder.goid] = append(m.readHolders[holder.goid], holder)
  138. m.readHoldersMut.Unlock()
  139. }
  140. func (m *loggedRWMutex) RUnlock() {
  141. id := goid()
  142. m.readHoldersMut.Lock()
  143. current := m.readHolders[id]
  144. if len(current) > 0 {
  145. m.readHolders[id] = current[:len(current)-1]
  146. }
  147. m.readHoldersMut.Unlock()
  148. if atomic.LoadInt32(&m.logUnlockers) == 1 {
  149. holder := getHolder()
  150. select {
  151. case m.unlockers <- holder:
  152. default:
  153. l.Debugf("Dropped holder %s as channel full", holder)
  154. }
  155. }
  156. m.RWMutex.RUnlock()
  157. }
  158. func (m *loggedRWMutex) Holders() string {
  159. output := m.holder.Load().(holder).String() + " (writer)"
  160. m.readHoldersMut.Lock()
  161. for _, holders := range m.readHolders {
  162. for _, holder := range holders {
  163. output += "\n" + holder.String() + " (reader)"
  164. }
  165. }
  166. m.readHoldersMut.Unlock()
  167. return output
  168. }
  169. type loggedWaitGroup struct {
  170. sync.WaitGroup
  171. }
  172. func (wg *loggedWaitGroup) Wait() {
  173. start := time.Now()
  174. wg.WaitGroup.Wait()
  175. duration := time.Since(start)
  176. if duration >= threshold {
  177. l.Debugf("WaitGroup took %v at %s", duration, getHolder())
  178. }
  179. }
  180. func getHolder() holder {
  181. _, file, line, _ := runtime.Caller(2)
  182. file = filepath.Join(filepath.Base(filepath.Dir(file)), filepath.Base(file))
  183. return holder{
  184. at: fmt.Sprintf("%s:%d", file, line),
  185. goid: goid(),
  186. time: time.Now(),
  187. }
  188. }
  189. func goid() int {
  190. var buf [64]byte
  191. n := runtime.Stack(buf[:], false)
  192. idField := strings.Fields(strings.TrimPrefix(string(buf[:n]), "goroutine "))[0]
  193. id, err := strconv.Atoi(idField)
  194. if err != nil {
  195. return -1
  196. }
  197. return id
  198. }
  199. // TimeoutCond is a variant on Cond. It has roughly the same semantics regarding 'L' - it must be held
  200. // both when broadcasting and when calling TimeoutCondWaiter.Wait()
  201. // Call Broadcast() to broadcast to all waiters on the TimeoutCond. Call SetupWait to create a
  202. // TimeoutCondWaiter configured with the given timeout, which can then be used to listen for
  203. // broadcasts.
  204. type TimeoutCond struct {
  205. L sync.Locker
  206. ch chan struct{}
  207. }
  208. // TimeoutCondWaiter is a type allowing a consumer to wait on a TimeoutCond with a timeout. Wait() may be called multiple times,
  209. // and will return true every time that the TimeoutCond is broadcast to. Once the configured timeout
  210. // expires, Wait() will return false.
  211. // Call Stop() to release resources once this TimeoutCondWaiter is no longer needed.
  212. type TimeoutCondWaiter struct {
  213. c *TimeoutCond
  214. timer *time.Timer
  215. }
  216. func NewTimeoutCond(l sync.Locker) *TimeoutCond {
  217. return &TimeoutCond{
  218. L: l,
  219. }
  220. }
  221. func (c *TimeoutCond) Broadcast() {
  222. // ch.L must be locked when calling this function
  223. if c.ch != nil {
  224. close(c.ch)
  225. c.ch = nil
  226. }
  227. }
  228. func (c *TimeoutCond) SetupWait(timeout time.Duration) *TimeoutCondWaiter {
  229. timer := time.NewTimer(timeout)
  230. return &TimeoutCondWaiter{
  231. c: c,
  232. timer: timer,
  233. }
  234. }
  235. func (w *TimeoutCondWaiter) Wait() bool {
  236. // ch.L must be locked when calling this function
  237. // Ensure that the channel exists, since we're going to be waiting on it
  238. if w.c.ch == nil {
  239. w.c.ch = make(chan struct{})
  240. }
  241. ch := w.c.ch
  242. w.c.L.Unlock()
  243. defer w.c.L.Lock()
  244. select {
  245. case <-w.timer.C:
  246. return false
  247. case <-ch:
  248. return true
  249. }
  250. }
  251. func (w *TimeoutCondWaiter) Stop() {
  252. w.timer.Stop()
  253. }