events.go 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "errors"
  10. "runtime"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. type EventType int
  15. const (
  16. Starting EventType = 1 << iota
  17. StartupComplete
  18. DeviceDiscovered
  19. DeviceConnected
  20. DeviceDisconnected
  21. DeviceRejected
  22. DevicePaused
  23. DeviceResumed
  24. LocalChangeDetected
  25. RemoteChangeDetected
  26. LocalIndexUpdated
  27. RemoteIndexUpdated
  28. ItemStarted
  29. ItemFinished
  30. StateChanged
  31. FolderRejected
  32. ConfigSaved
  33. DownloadProgress
  34. RemoteDownloadProgress
  35. FolderSummary
  36. FolderCompletion
  37. FolderErrors
  38. FolderScanProgress
  39. FolderPaused
  40. FolderResumed
  41. ListenAddressesChanged
  42. LoginAttempt
  43. AllEvents = (1 << iota) - 1
  44. )
  45. var runningTests = false
  46. func (t EventType) String() string {
  47. switch t {
  48. case Starting:
  49. return "Starting"
  50. case StartupComplete:
  51. return "StartupComplete"
  52. case DeviceDiscovered:
  53. return "DeviceDiscovered"
  54. case DeviceConnected:
  55. return "DeviceConnected"
  56. case DeviceDisconnected:
  57. return "DeviceDisconnected"
  58. case DeviceRejected:
  59. return "DeviceRejected"
  60. case LocalChangeDetected:
  61. return "LocalChangeDetected"
  62. case RemoteChangeDetected:
  63. return "RemoteChangeDetected"
  64. case LocalIndexUpdated:
  65. return "LocalIndexUpdated"
  66. case RemoteIndexUpdated:
  67. return "RemoteIndexUpdated"
  68. case ItemStarted:
  69. return "ItemStarted"
  70. case ItemFinished:
  71. return "ItemFinished"
  72. case StateChanged:
  73. return "StateChanged"
  74. case FolderRejected:
  75. return "FolderRejected"
  76. case ConfigSaved:
  77. return "ConfigSaved"
  78. case DownloadProgress:
  79. return "DownloadProgress"
  80. case RemoteDownloadProgress:
  81. return "RemoteDownloadProgress"
  82. case FolderSummary:
  83. return "FolderSummary"
  84. case FolderCompletion:
  85. return "FolderCompletion"
  86. case FolderErrors:
  87. return "FolderErrors"
  88. case DevicePaused:
  89. return "DevicePaused"
  90. case DeviceResumed:
  91. return "DeviceResumed"
  92. case FolderScanProgress:
  93. return "FolderScanProgress"
  94. case FolderPaused:
  95. return "FolderPaused"
  96. case FolderResumed:
  97. return "FolderResumed"
  98. case ListenAddressesChanged:
  99. return "ListenAddressesChanged"
  100. case LoginAttempt:
  101. return "LoginAttempt"
  102. default:
  103. return "Unknown"
  104. }
  105. }
  106. func (t EventType) MarshalText() ([]byte, error) {
  107. return []byte(t.String()), nil
  108. }
  109. const BufferSize = 64
  110. type Logger struct {
  111. subs []*Subscription
  112. nextSubscriptionIDs []int
  113. nextGlobalID int
  114. mutex sync.Mutex
  115. }
  116. type Event struct {
  117. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  118. SubscriptionID int `json:"id"`
  119. // Global ID of the event across all subscriptions
  120. GlobalID int `json:"globalID"`
  121. Time time.Time `json:"time"`
  122. Type EventType `json:"type"`
  123. Data interface{} `json:"data"`
  124. }
  125. type Subscription struct {
  126. mask EventType
  127. events chan Event
  128. timeout *time.Timer
  129. }
  130. var Default = NewLogger()
  131. var (
  132. ErrTimeout = errors.New("timeout")
  133. ErrClosed = errors.New("closed")
  134. )
  135. func NewLogger() *Logger {
  136. return &Logger{
  137. mutex: sync.NewMutex(),
  138. }
  139. }
  140. func (l *Logger) Log(t EventType, data interface{}) {
  141. l.mutex.Lock()
  142. dl.Debugln("log", l.nextGlobalID, t, data)
  143. l.nextGlobalID++
  144. e := Event{
  145. GlobalID: l.nextGlobalID,
  146. Time: time.Now(),
  147. Type: t,
  148. Data: data,
  149. }
  150. for i, s := range l.subs {
  151. if s.mask&t != 0 {
  152. e.SubscriptionID = l.nextSubscriptionIDs[i]
  153. l.nextSubscriptionIDs[i]++
  154. select {
  155. case s.events <- e:
  156. default:
  157. // if s.events is not ready, drop the event
  158. }
  159. }
  160. }
  161. l.mutex.Unlock()
  162. }
  163. func (l *Logger) Subscribe(mask EventType) *Subscription {
  164. l.mutex.Lock()
  165. dl.Debugln("subscribe", mask)
  166. s := &Subscription{
  167. mask: mask,
  168. events: make(chan Event, BufferSize),
  169. timeout: time.NewTimer(0),
  170. }
  171. // We need to create the timeout timer in the stopped, non-fired state so
  172. // that Subscription.Poll() can safely reset it and select on the timeout
  173. // channel. This ensures the timer is stopped and the channel drained.
  174. if runningTests {
  175. // Make the behavior stable when running tests to avoid randomly
  176. // varying test coverage. This ensures, in practice if not in
  177. // theory, that the timer fires and we take the true branch of the
  178. // next if.
  179. runtime.Gosched()
  180. }
  181. if !s.timeout.Stop() {
  182. <-s.timeout.C
  183. }
  184. l.subs = append(l.subs, s)
  185. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  186. l.mutex.Unlock()
  187. return s
  188. }
  189. func (l *Logger) Unsubscribe(s *Subscription) {
  190. l.mutex.Lock()
  191. dl.Debugln("unsubscribe")
  192. for i, ss := range l.subs {
  193. if s == ss {
  194. last := len(l.subs) - 1
  195. l.subs[i] = l.subs[last]
  196. l.subs[last] = nil
  197. l.subs = l.subs[:last]
  198. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  199. l.nextSubscriptionIDs[last] = 0
  200. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  201. break
  202. }
  203. }
  204. close(s.events)
  205. l.mutex.Unlock()
  206. }
  207. // Poll returns an event from the subscription or an error if the poll times
  208. // out of the event channel is closed. Poll should not be called concurrently
  209. // from multiple goroutines for a single subscription.
  210. func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
  211. dl.Debugln("poll", timeout)
  212. s.timeout.Reset(timeout)
  213. select {
  214. case e, ok := <-s.events:
  215. if !ok {
  216. return e, ErrClosed
  217. }
  218. if runningTests {
  219. // Make the behavior stable when running tests to avoid randomly
  220. // varying test coverage. This ensures, in practice if not in
  221. // theory, that the timer fires and we take the true branch of
  222. // the next if.
  223. s.timeout.Reset(0)
  224. runtime.Gosched()
  225. }
  226. if !s.timeout.Stop() {
  227. // The timeout must be stopped and possibly drained to be ready
  228. // for reuse in the next call.
  229. <-s.timeout.C
  230. }
  231. return e, nil
  232. case <-s.timeout.C:
  233. return Event{}, ErrTimeout
  234. }
  235. }
  236. func (s *Subscription) C() <-chan Event {
  237. return s.events
  238. }
  239. type bufferedSubscription struct {
  240. sub *Subscription
  241. buf []Event
  242. next int
  243. cur int // Current SubscriptionID
  244. mut sync.Mutex
  245. cond *sync.TimeoutCond
  246. }
  247. type BufferedSubscription interface {
  248. Since(id int, into []Event, timeout time.Duration) []Event
  249. }
  250. func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
  251. bs := &bufferedSubscription{
  252. sub: s,
  253. buf: make([]Event, size),
  254. mut: sync.NewMutex(),
  255. }
  256. bs.cond = sync.NewTimeoutCond(bs.mut)
  257. go bs.pollingLoop()
  258. return bs
  259. }
  260. func (s *bufferedSubscription) pollingLoop() {
  261. for {
  262. ev, err := s.sub.Poll(60 * time.Second)
  263. if err == ErrTimeout {
  264. continue
  265. }
  266. if err == ErrClosed {
  267. return
  268. }
  269. if err != nil {
  270. panic("unexpected error: " + err.Error())
  271. }
  272. s.mut.Lock()
  273. s.buf[s.next] = ev
  274. s.next = (s.next + 1) % len(s.buf)
  275. s.cur = ev.SubscriptionID
  276. s.cond.Broadcast()
  277. s.mut.Unlock()
  278. }
  279. }
  280. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  281. s.mut.Lock()
  282. defer s.mut.Unlock()
  283. // Check once first before generating the TimeoutCondWaiter
  284. if id >= s.cur {
  285. waiter := s.cond.SetupWait(timeout)
  286. defer waiter.Stop()
  287. for id >= s.cur {
  288. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  289. // Timed out
  290. return into
  291. }
  292. }
  293. }
  294. for i := s.next; i < len(s.buf); i++ {
  295. if s.buf[i].SubscriptionID > id {
  296. into = append(into, s.buf[i])
  297. }
  298. }
  299. for i := 0; i < s.next; i++ {
  300. if s.buf[i].SubscriptionID > id {
  301. into = append(into, s.buf[i])
  302. }
  303. }
  304. return into
  305. }
  306. // Error returns a string pointer suitable for JSON marshalling errors. It
  307. // retains the "null on success" semantics, but ensures the error result is a
  308. // string regardless of the underlying concrete error type.
  309. func Error(err error) *string {
  310. if err == nil {
  311. return nil
  312. }
  313. str := err.Error()
  314. return &str
  315. }