events.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "encoding/json"
  10. "errors"
  11. "runtime"
  12. "time"
  13. "github.com/thejerf/suture"
  14. "github.com/syncthing/syncthing/lib/sync"
  15. "github.com/syncthing/syncthing/lib/util"
  16. )
  17. type EventType int
  18. const (
  19. Starting EventType = 1 << iota
  20. StartupComplete
  21. DeviceDiscovered
  22. DeviceConnected
  23. DeviceDisconnected
  24. DeviceRejected
  25. DevicePaused
  26. DeviceResumed
  27. LocalChangeDetected
  28. RemoteChangeDetected
  29. LocalIndexUpdated
  30. RemoteIndexUpdated
  31. ItemStarted
  32. ItemFinished
  33. StateChanged
  34. FolderRejected
  35. ConfigSaved
  36. DownloadProgress
  37. RemoteDownloadProgress
  38. FolderSummary
  39. FolderCompletion
  40. FolderErrors
  41. FolderScanProgress
  42. FolderPaused
  43. FolderResumed
  44. FolderWatchStateChanged
  45. ListenAddressesChanged
  46. LoginAttempt
  47. AllEvents = (1 << iota) - 1
  48. )
  49. var (
  50. runningTests = false
  51. errNoop = errors.New("method of a noop object called")
  52. )
  53. const eventLogTimeout = 15 * time.Millisecond
  54. func (t EventType) String() string {
  55. switch t {
  56. case Starting:
  57. return "Starting"
  58. case StartupComplete:
  59. return "StartupComplete"
  60. case DeviceDiscovered:
  61. return "DeviceDiscovered"
  62. case DeviceConnected:
  63. return "DeviceConnected"
  64. case DeviceDisconnected:
  65. return "DeviceDisconnected"
  66. case DeviceRejected:
  67. return "DeviceRejected"
  68. case LocalChangeDetected:
  69. return "LocalChangeDetected"
  70. case RemoteChangeDetected:
  71. return "RemoteChangeDetected"
  72. case LocalIndexUpdated:
  73. return "LocalIndexUpdated"
  74. case RemoteIndexUpdated:
  75. return "RemoteIndexUpdated"
  76. case ItemStarted:
  77. return "ItemStarted"
  78. case ItemFinished:
  79. return "ItemFinished"
  80. case StateChanged:
  81. return "StateChanged"
  82. case FolderRejected:
  83. return "FolderRejected"
  84. case ConfigSaved:
  85. return "ConfigSaved"
  86. case DownloadProgress:
  87. return "DownloadProgress"
  88. case RemoteDownloadProgress:
  89. return "RemoteDownloadProgress"
  90. case FolderSummary:
  91. return "FolderSummary"
  92. case FolderCompletion:
  93. return "FolderCompletion"
  94. case FolderErrors:
  95. return "FolderErrors"
  96. case DevicePaused:
  97. return "DevicePaused"
  98. case DeviceResumed:
  99. return "DeviceResumed"
  100. case FolderScanProgress:
  101. return "FolderScanProgress"
  102. case FolderPaused:
  103. return "FolderPaused"
  104. case FolderResumed:
  105. return "FolderResumed"
  106. case ListenAddressesChanged:
  107. return "ListenAddressesChanged"
  108. case LoginAttempt:
  109. return "LoginAttempt"
  110. case FolderWatchStateChanged:
  111. return "FolderWatchStateChanged"
  112. default:
  113. return "Unknown"
  114. }
  115. }
  116. func (t EventType) MarshalText() ([]byte, error) {
  117. return []byte(t.String()), nil
  118. }
  119. func (t *EventType) UnmarshalJSON(b []byte) error {
  120. var s string
  121. if err := json.Unmarshal(b, &s); err != nil {
  122. return err
  123. }
  124. *t = UnmarshalEventType(s)
  125. return nil
  126. }
  127. func UnmarshalEventType(s string) EventType {
  128. switch s {
  129. case "Starting":
  130. return Starting
  131. case "StartupComplete":
  132. return StartupComplete
  133. case "DeviceDiscovered":
  134. return DeviceDiscovered
  135. case "DeviceConnected":
  136. return DeviceConnected
  137. case "DeviceDisconnected":
  138. return DeviceDisconnected
  139. case "DeviceRejected":
  140. return DeviceRejected
  141. case "LocalChangeDetected":
  142. return LocalChangeDetected
  143. case "RemoteChangeDetected":
  144. return RemoteChangeDetected
  145. case "LocalIndexUpdated":
  146. return LocalIndexUpdated
  147. case "RemoteIndexUpdated":
  148. return RemoteIndexUpdated
  149. case "ItemStarted":
  150. return ItemStarted
  151. case "ItemFinished":
  152. return ItemFinished
  153. case "StateChanged":
  154. return StateChanged
  155. case "FolderRejected":
  156. return FolderRejected
  157. case "ConfigSaved":
  158. return ConfigSaved
  159. case "DownloadProgress":
  160. return DownloadProgress
  161. case "RemoteDownloadProgress":
  162. return RemoteDownloadProgress
  163. case "FolderSummary":
  164. return FolderSummary
  165. case "FolderCompletion":
  166. return FolderCompletion
  167. case "FolderErrors":
  168. return FolderErrors
  169. case "DevicePaused":
  170. return DevicePaused
  171. case "DeviceResumed":
  172. return DeviceResumed
  173. case "FolderScanProgress":
  174. return FolderScanProgress
  175. case "FolderPaused":
  176. return FolderPaused
  177. case "FolderResumed":
  178. return FolderResumed
  179. case "ListenAddressesChanged":
  180. return ListenAddressesChanged
  181. case "LoginAttempt":
  182. return LoginAttempt
  183. case "FolderWatchStateChanged":
  184. return FolderWatchStateChanged
  185. default:
  186. return 0
  187. }
  188. }
  189. const BufferSize = 64
  190. type Logger interface {
  191. suture.Service
  192. Log(t EventType, data interface{})
  193. Subscribe(mask EventType) Subscription
  194. }
  195. type logger struct {
  196. suture.Service
  197. subs []*subscription
  198. nextSubscriptionIDs []int
  199. nextGlobalID int
  200. timeout *time.Timer
  201. events chan Event
  202. funcs chan func()
  203. toUnsubscribe chan *subscription
  204. stop chan struct{}
  205. }
  206. type Event struct {
  207. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  208. SubscriptionID int `json:"id"`
  209. // Global ID of the event across all subscriptions
  210. GlobalID int `json:"globalID"`
  211. Time time.Time `json:"time"`
  212. Type EventType `json:"type"`
  213. Data interface{} `json:"data"`
  214. }
  215. type Subscription interface {
  216. C() <-chan Event
  217. Poll(timeout time.Duration) (Event, error)
  218. Unsubscribe()
  219. }
  220. type subscription struct {
  221. mask EventType
  222. events chan Event
  223. toUnsubscribe chan *subscription
  224. timeout *time.Timer
  225. }
  226. var (
  227. ErrTimeout = errors.New("timeout")
  228. ErrClosed = errors.New("closed")
  229. )
  230. func NewLogger() Logger {
  231. l := &logger{
  232. timeout: time.NewTimer(time.Second),
  233. events: make(chan Event, BufferSize),
  234. funcs: make(chan func()),
  235. toUnsubscribe: make(chan *subscription),
  236. }
  237. l.Service = util.AsService(l.serve)
  238. // Make sure the timer is in the stopped state and hasn't fired anything
  239. // into the channel.
  240. if !l.timeout.Stop() {
  241. <-l.timeout.C
  242. }
  243. return l
  244. }
  245. func (l *logger) serve(stop chan struct{}) {
  246. loop:
  247. for {
  248. select {
  249. case e := <-l.events:
  250. // Incoming events get sent
  251. l.sendEvent(e)
  252. case fn := <-l.funcs:
  253. // Subscriptions are handled here.
  254. fn()
  255. case s := <-l.toUnsubscribe:
  256. l.unsubscribe(s)
  257. case <-stop:
  258. break loop
  259. }
  260. }
  261. // Closing the event channels corresponds to what happens when a
  262. // subscription is unsubscribed; this stops any BufferedSubscription,
  263. // makes Poll() return ErrClosed, etc.
  264. for _, s := range l.subs {
  265. close(s.events)
  266. }
  267. }
  268. func (l *logger) Log(t EventType, data interface{}) {
  269. l.events <- Event{
  270. Time: time.Now(),
  271. Type: t,
  272. Data: data,
  273. // SubscriptionID and GlobalID are set in sendEvent
  274. }
  275. }
  276. func (l *logger) sendEvent(e Event) {
  277. l.nextGlobalID++
  278. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  279. e.GlobalID = l.nextGlobalID
  280. for i, s := range l.subs {
  281. if s.mask&e.Type != 0 {
  282. e.SubscriptionID = l.nextSubscriptionIDs[i]
  283. l.nextSubscriptionIDs[i]++
  284. l.timeout.Reset(eventLogTimeout)
  285. timedOut := false
  286. select {
  287. case s.events <- e:
  288. case <-l.timeout.C:
  289. // if s.events is not ready, drop the event
  290. timedOut = true
  291. }
  292. // If stop returns false it already sent something to the
  293. // channel. If we didn't already read it above we must do so now
  294. // or we get a spurious timeout on the next loop.
  295. if !l.timeout.Stop() && !timedOut {
  296. <-l.timeout.C
  297. }
  298. }
  299. }
  300. }
  301. func (l *logger) Subscribe(mask EventType) Subscription {
  302. res := make(chan Subscription)
  303. l.funcs <- func() {
  304. dl.Debugln("subscribe", mask)
  305. s := &subscription{
  306. mask: mask,
  307. events: make(chan Event, BufferSize),
  308. toUnsubscribe: l.toUnsubscribe,
  309. timeout: time.NewTimer(0),
  310. }
  311. // We need to create the timeout timer in the stopped, non-fired state so
  312. // that Subscription.Poll() can safely reset it and select on the timeout
  313. // channel. This ensures the timer is stopped and the channel drained.
  314. if runningTests {
  315. // Make the behavior stable when running tests to avoid randomly
  316. // varying test coverage. This ensures, in practice if not in
  317. // theory, that the timer fires and we take the true branch of the
  318. // next if.
  319. runtime.Gosched()
  320. }
  321. if !s.timeout.Stop() {
  322. <-s.timeout.C
  323. }
  324. l.subs = append(l.subs, s)
  325. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  326. res <- s
  327. }
  328. return <-res
  329. }
  330. func (l *logger) unsubscribe(s *subscription) {
  331. dl.Debugln("unsubscribe", s.mask)
  332. for i, ss := range l.subs {
  333. if s == ss {
  334. last := len(l.subs) - 1
  335. l.subs[i] = l.subs[last]
  336. l.subs[last] = nil
  337. l.subs = l.subs[:last]
  338. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  339. l.nextSubscriptionIDs[last] = 0
  340. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  341. break
  342. }
  343. }
  344. close(s.events)
  345. }
  346. // Poll returns an event from the subscription or an error if the poll times
  347. // out of the event channel is closed. Poll should not be called concurrently
  348. // from multiple goroutines for a single subscription.
  349. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  350. dl.Debugln("poll", timeout)
  351. s.timeout.Reset(timeout)
  352. select {
  353. case e, ok := <-s.events:
  354. if !ok {
  355. return e, ErrClosed
  356. }
  357. if runningTests {
  358. // Make the behavior stable when running tests to avoid randomly
  359. // varying test coverage. This ensures, in practice if not in
  360. // theory, that the timer fires and we take the true branch of
  361. // the next if.
  362. s.timeout.Reset(0)
  363. runtime.Gosched()
  364. }
  365. if !s.timeout.Stop() {
  366. // The timeout must be stopped and possibly drained to be ready
  367. // for reuse in the next call.
  368. <-s.timeout.C
  369. }
  370. return e, nil
  371. case <-s.timeout.C:
  372. return Event{}, ErrTimeout
  373. }
  374. }
  375. func (s *subscription) C() <-chan Event {
  376. return s.events
  377. }
  378. func (s *subscription) Unsubscribe() {
  379. s.toUnsubscribe <- s
  380. }
  381. type bufferedSubscription struct {
  382. sub Subscription
  383. buf []Event
  384. next int
  385. cur int // Current SubscriptionID
  386. mut sync.Mutex
  387. cond *sync.TimeoutCond
  388. }
  389. type BufferedSubscription interface {
  390. Since(id int, into []Event, timeout time.Duration) []Event
  391. }
  392. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  393. bs := &bufferedSubscription{
  394. sub: s,
  395. buf: make([]Event, size),
  396. mut: sync.NewMutex(),
  397. }
  398. bs.cond = sync.NewTimeoutCond(bs.mut)
  399. go bs.pollingLoop()
  400. return bs
  401. }
  402. func (s *bufferedSubscription) pollingLoop() {
  403. for ev := range s.sub.C() {
  404. s.mut.Lock()
  405. s.buf[s.next] = ev
  406. s.next = (s.next + 1) % len(s.buf)
  407. s.cur = ev.SubscriptionID
  408. s.cond.Broadcast()
  409. s.mut.Unlock()
  410. }
  411. }
  412. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  413. s.mut.Lock()
  414. defer s.mut.Unlock()
  415. // Check once first before generating the TimeoutCondWaiter
  416. if id >= s.cur {
  417. waiter := s.cond.SetupWait(timeout)
  418. defer waiter.Stop()
  419. for id >= s.cur {
  420. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  421. // Timed out
  422. return into
  423. }
  424. }
  425. }
  426. for i := s.next; i < len(s.buf); i++ {
  427. if s.buf[i].SubscriptionID > id {
  428. into = append(into, s.buf[i])
  429. }
  430. }
  431. for i := 0; i < s.next; i++ {
  432. if s.buf[i].SubscriptionID > id {
  433. into = append(into, s.buf[i])
  434. }
  435. }
  436. return into
  437. }
  438. // Error returns a string pointer suitable for JSON marshalling errors. It
  439. // retains the "null on success" semantics, but ensures the error result is a
  440. // string regardless of the underlying concrete error type.
  441. func Error(err error) *string {
  442. if err == nil {
  443. return nil
  444. }
  445. str := err.Error()
  446. return &str
  447. }
  448. type noopLogger struct{}
  449. var NoopLogger Logger = &noopLogger{}
  450. func (*noopLogger) Serve() {}
  451. func (*noopLogger) Stop() {}
  452. func (*noopLogger) Log(t EventType, data interface{}) {}
  453. func (*noopLogger) Subscribe(mask EventType) Subscription {
  454. return &noopSubscription{}
  455. }
  456. type noopSubscription struct{}
  457. func (*noopSubscription) C() <-chan Event {
  458. return nil
  459. }
  460. func (*noopSubscription) Poll(timeout time.Duration) (Event, error) {
  461. return Event{}, errNoop
  462. }
  463. func (*noopSubscription) Unsubscribe() {}