events.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "errors"
  10. "runtime"
  11. stdsync "sync"
  12. "time"
  13. "github.com/syncthing/syncthing/lib/sync"
  14. )
  15. type EventType int
  16. const (
  17. Ping EventType = 1 << iota
  18. Starting
  19. StartupComplete
  20. DeviceDiscovered
  21. DeviceConnected
  22. DeviceDisconnected
  23. DeviceRejected
  24. DevicePaused
  25. DeviceResumed
  26. LocalChangeDetected
  27. LocalIndexUpdated
  28. RemoteIndexUpdated
  29. ItemStarted
  30. ItemFinished
  31. StateChanged
  32. FolderRejected
  33. ConfigSaved
  34. DownloadProgress
  35. RemoteDownloadProgress
  36. FolderSummary
  37. FolderCompletion
  38. FolderErrors
  39. FolderScanProgress
  40. ListenAddressesChanged
  41. LoginAttempt
  42. AllEvents = (1 << iota) - 1
  43. )
  44. var runningTests = false
  45. func (t EventType) String() string {
  46. switch t {
  47. case Ping:
  48. return "Ping"
  49. case Starting:
  50. return "Starting"
  51. case StartupComplete:
  52. return "StartupComplete"
  53. case DeviceDiscovered:
  54. return "DeviceDiscovered"
  55. case DeviceConnected:
  56. return "DeviceConnected"
  57. case DeviceDisconnected:
  58. return "DeviceDisconnected"
  59. case DeviceRejected:
  60. return "DeviceRejected"
  61. case LocalChangeDetected:
  62. return "LocalChangeDetected"
  63. case LocalIndexUpdated:
  64. return "LocalIndexUpdated"
  65. case RemoteIndexUpdated:
  66. return "RemoteIndexUpdated"
  67. case ItemStarted:
  68. return "ItemStarted"
  69. case ItemFinished:
  70. return "ItemFinished"
  71. case StateChanged:
  72. return "StateChanged"
  73. case FolderRejected:
  74. return "FolderRejected"
  75. case ConfigSaved:
  76. return "ConfigSaved"
  77. case DownloadProgress:
  78. return "DownloadProgress"
  79. case RemoteDownloadProgress:
  80. return "RemoteDownloadProgress"
  81. case FolderSummary:
  82. return "FolderSummary"
  83. case FolderCompletion:
  84. return "FolderCompletion"
  85. case FolderErrors:
  86. return "FolderErrors"
  87. case DevicePaused:
  88. return "DevicePaused"
  89. case DeviceResumed:
  90. return "DeviceResumed"
  91. case FolderScanProgress:
  92. return "FolderScanProgress"
  93. case ListenAddressesChanged:
  94. return "ListenAddressesChanged"
  95. case LoginAttempt:
  96. return "LoginAttempt"
  97. default:
  98. return "Unknown"
  99. }
  100. }
  101. func (t EventType) MarshalText() ([]byte, error) {
  102. return []byte(t.String()), nil
  103. }
  104. const BufferSize = 64
  105. type Logger struct {
  106. subs []*Subscription
  107. nextSubscriptionIDs []int
  108. nextGlobalID int
  109. mutex sync.Mutex
  110. }
  111. type Event struct {
  112. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  113. SubscriptionID int `json:"id"`
  114. // Global ID of the event across all subscriptions
  115. GlobalID int `json:"globalID"`
  116. Time time.Time `json:"time"`
  117. Type EventType `json:"type"`
  118. Data interface{} `json:"data"`
  119. }
  120. type Subscription struct {
  121. mask EventType
  122. events chan Event
  123. timeout *time.Timer
  124. }
  125. var Default = NewLogger()
  126. var (
  127. ErrTimeout = errors.New("timeout")
  128. ErrClosed = errors.New("closed")
  129. )
  130. func NewLogger() *Logger {
  131. return &Logger{
  132. mutex: sync.NewMutex(),
  133. }
  134. }
  135. func (l *Logger) Log(t EventType, data interface{}) {
  136. l.mutex.Lock()
  137. dl.Debugln("log", l.nextGlobalID, t, data)
  138. l.nextGlobalID++
  139. e := Event{
  140. GlobalID: l.nextGlobalID,
  141. Time: time.Now(),
  142. Type: t,
  143. Data: data,
  144. }
  145. for i, s := range l.subs {
  146. if s.mask&t != 0 {
  147. e.SubscriptionID = l.nextSubscriptionIDs[i]
  148. l.nextSubscriptionIDs[i]++
  149. select {
  150. case s.events <- e:
  151. default:
  152. // if s.events is not ready, drop the event
  153. }
  154. }
  155. }
  156. l.mutex.Unlock()
  157. }
  158. func (l *Logger) Subscribe(mask EventType) *Subscription {
  159. l.mutex.Lock()
  160. dl.Debugln("subscribe", mask)
  161. s := &Subscription{
  162. mask: mask,
  163. events: make(chan Event, BufferSize),
  164. timeout: time.NewTimer(0),
  165. }
  166. // We need to create the timeout timer in the stopped, non-fired state so
  167. // that Subscription.Poll() can safely reset it and select on the timeout
  168. // channel. This ensures the timer is stopped and the channel drained.
  169. if runningTests {
  170. // Make the behavior stable when running tests to avoid randomly
  171. // varying test coverage. This ensures, in practice if not in
  172. // theory, that the timer fires and we take the true branch of the
  173. // next if.
  174. runtime.Gosched()
  175. }
  176. if !s.timeout.Stop() {
  177. <-s.timeout.C
  178. }
  179. l.subs = append(l.subs, s)
  180. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  181. l.mutex.Unlock()
  182. return s
  183. }
  184. func (l *Logger) Unsubscribe(s *Subscription) {
  185. l.mutex.Lock()
  186. dl.Debugln("unsubscribe")
  187. for i, ss := range l.subs {
  188. if s == ss {
  189. last := len(l.subs) - 1
  190. l.subs[i] = l.subs[last]
  191. l.subs[last] = nil
  192. l.subs = l.subs[:last]
  193. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  194. l.nextSubscriptionIDs[last] = 0
  195. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  196. break
  197. }
  198. }
  199. close(s.events)
  200. l.mutex.Unlock()
  201. }
  202. // Poll returns an event from the subscription or an error if the poll times
  203. // out of the event channel is closed. Poll should not be called concurrently
  204. // from multiple goroutines for a single subscription.
  205. func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
  206. dl.Debugln("poll", timeout)
  207. s.timeout.Reset(timeout)
  208. select {
  209. case e, ok := <-s.events:
  210. if !ok {
  211. return e, ErrClosed
  212. }
  213. if runningTests {
  214. // Make the behavior stable when running tests to avoid randomly
  215. // varying test coverage. This ensures, in practice if not in
  216. // theory, that the timer fires and we take the true branch of
  217. // the next if.
  218. s.timeout.Reset(0)
  219. runtime.Gosched()
  220. }
  221. if !s.timeout.Stop() {
  222. // The timeout must be stopped and possibly drained to be ready
  223. // for reuse in the next call.
  224. <-s.timeout.C
  225. }
  226. return e, nil
  227. case <-s.timeout.C:
  228. return Event{}, ErrTimeout
  229. }
  230. }
  231. func (s *Subscription) C() <-chan Event {
  232. return s.events
  233. }
  234. type bufferedSubscription struct {
  235. sub *Subscription
  236. buf []Event
  237. next int
  238. cur int // Current SubscriptionID
  239. mut sync.Mutex
  240. cond *stdsync.Cond
  241. }
  242. type BufferedSubscription interface {
  243. Since(id int, into []Event) []Event
  244. }
  245. func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
  246. bs := &bufferedSubscription{
  247. sub: s,
  248. buf: make([]Event, size),
  249. mut: sync.NewMutex(),
  250. }
  251. bs.cond = stdsync.NewCond(bs.mut)
  252. go bs.pollingLoop()
  253. return bs
  254. }
  255. func (s *bufferedSubscription) pollingLoop() {
  256. for {
  257. ev, err := s.sub.Poll(60 * time.Second)
  258. if err == ErrTimeout {
  259. continue
  260. }
  261. if err == ErrClosed {
  262. return
  263. }
  264. if err != nil {
  265. panic("unexpected error: " + err.Error())
  266. }
  267. s.mut.Lock()
  268. s.buf[s.next] = ev
  269. s.next = (s.next + 1) % len(s.buf)
  270. s.cur = ev.SubscriptionID
  271. s.cond.Broadcast()
  272. s.mut.Unlock()
  273. }
  274. }
  275. func (s *bufferedSubscription) Since(id int, into []Event) []Event {
  276. s.mut.Lock()
  277. defer s.mut.Unlock()
  278. for id >= s.cur {
  279. s.cond.Wait()
  280. }
  281. for i := s.next; i < len(s.buf); i++ {
  282. if s.buf[i].SubscriptionID > id {
  283. into = append(into, s.buf[i])
  284. }
  285. }
  286. for i := 0; i < s.next; i++ {
  287. if s.buf[i].SubscriptionID > id {
  288. into = append(into, s.buf[i])
  289. }
  290. }
  291. return into
  292. }
  293. // Error returns a string pointer suitable for JSON marshalling errors. It
  294. // retains the "null on success" semantics, but ensures the error result is a
  295. // string regardless of the underlying concrete error type.
  296. func Error(err error) *string {
  297. if err == nil {
  298. return nil
  299. }
  300. str := err.Error()
  301. return &str
  302. }