events.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "context"
  10. "encoding/json"
  11. "errors"
  12. "fmt"
  13. "runtime"
  14. "time"
  15. "github.com/thejerf/suture"
  16. "github.com/syncthing/syncthing/lib/sync"
  17. "github.com/syncthing/syncthing/lib/util"
  18. )
  19. type EventType int64
  20. const (
  21. Starting EventType = 1 << iota
  22. StartupComplete
  23. DeviceDiscovered
  24. DeviceConnected
  25. DeviceDisconnected
  26. DeviceRejected
  27. DevicePaused
  28. DeviceResumed
  29. LocalChangeDetected
  30. RemoteChangeDetected
  31. LocalIndexUpdated
  32. RemoteIndexUpdated
  33. ItemStarted
  34. ItemFinished
  35. StateChanged
  36. FolderRejected
  37. ConfigSaved
  38. DownloadProgress
  39. RemoteDownloadProgress
  40. FolderSummary
  41. FolderCompletion
  42. FolderErrors
  43. FolderScanProgress
  44. FolderPaused
  45. FolderResumed
  46. FolderWatchStateChanged
  47. ListenAddressesChanged
  48. LoginAttempt
  49. Failure
  50. AllEvents = (1 << iota) - 1
  51. )
  52. var (
  53. runningTests = false
  54. errNoop = errors.New("method of a noop object called")
  55. )
  56. const eventLogTimeout = 15 * time.Millisecond
  57. func (t EventType) String() string {
  58. switch t {
  59. case Starting:
  60. return "Starting"
  61. case StartupComplete:
  62. return "StartupComplete"
  63. case DeviceDiscovered:
  64. return "DeviceDiscovered"
  65. case DeviceConnected:
  66. return "DeviceConnected"
  67. case DeviceDisconnected:
  68. return "DeviceDisconnected"
  69. case DeviceRejected:
  70. return "DeviceRejected"
  71. case LocalChangeDetected:
  72. return "LocalChangeDetected"
  73. case RemoteChangeDetected:
  74. return "RemoteChangeDetected"
  75. case LocalIndexUpdated:
  76. return "LocalIndexUpdated"
  77. case RemoteIndexUpdated:
  78. return "RemoteIndexUpdated"
  79. case ItemStarted:
  80. return "ItemStarted"
  81. case ItemFinished:
  82. return "ItemFinished"
  83. case StateChanged:
  84. return "StateChanged"
  85. case FolderRejected:
  86. return "FolderRejected"
  87. case ConfigSaved:
  88. return "ConfigSaved"
  89. case DownloadProgress:
  90. return "DownloadProgress"
  91. case RemoteDownloadProgress:
  92. return "RemoteDownloadProgress"
  93. case FolderSummary:
  94. return "FolderSummary"
  95. case FolderCompletion:
  96. return "FolderCompletion"
  97. case FolderErrors:
  98. return "FolderErrors"
  99. case DevicePaused:
  100. return "DevicePaused"
  101. case DeviceResumed:
  102. return "DeviceResumed"
  103. case FolderScanProgress:
  104. return "FolderScanProgress"
  105. case FolderPaused:
  106. return "FolderPaused"
  107. case FolderResumed:
  108. return "FolderResumed"
  109. case ListenAddressesChanged:
  110. return "ListenAddressesChanged"
  111. case LoginAttempt:
  112. return "LoginAttempt"
  113. case FolderWatchStateChanged:
  114. return "FolderWatchStateChanged"
  115. case Failure:
  116. return "Failure"
  117. default:
  118. return "Unknown"
  119. }
  120. }
  121. func (t EventType) MarshalText() ([]byte, error) {
  122. return []byte(t.String()), nil
  123. }
  124. func (t *EventType) UnmarshalJSON(b []byte) error {
  125. var s string
  126. if err := json.Unmarshal(b, &s); err != nil {
  127. return err
  128. }
  129. *t = UnmarshalEventType(s)
  130. return nil
  131. }
  132. func UnmarshalEventType(s string) EventType {
  133. switch s {
  134. case "Starting":
  135. return Starting
  136. case "StartupComplete":
  137. return StartupComplete
  138. case "DeviceDiscovered":
  139. return DeviceDiscovered
  140. case "DeviceConnected":
  141. return DeviceConnected
  142. case "DeviceDisconnected":
  143. return DeviceDisconnected
  144. case "DeviceRejected":
  145. return DeviceRejected
  146. case "LocalChangeDetected":
  147. return LocalChangeDetected
  148. case "RemoteChangeDetected":
  149. return RemoteChangeDetected
  150. case "LocalIndexUpdated":
  151. return LocalIndexUpdated
  152. case "RemoteIndexUpdated":
  153. return RemoteIndexUpdated
  154. case "ItemStarted":
  155. return ItemStarted
  156. case "ItemFinished":
  157. return ItemFinished
  158. case "StateChanged":
  159. return StateChanged
  160. case "FolderRejected":
  161. return FolderRejected
  162. case "ConfigSaved":
  163. return ConfigSaved
  164. case "DownloadProgress":
  165. return DownloadProgress
  166. case "RemoteDownloadProgress":
  167. return RemoteDownloadProgress
  168. case "FolderSummary":
  169. return FolderSummary
  170. case "FolderCompletion":
  171. return FolderCompletion
  172. case "FolderErrors":
  173. return FolderErrors
  174. case "DevicePaused":
  175. return DevicePaused
  176. case "DeviceResumed":
  177. return DeviceResumed
  178. case "FolderScanProgress":
  179. return FolderScanProgress
  180. case "FolderPaused":
  181. return FolderPaused
  182. case "FolderResumed":
  183. return FolderResumed
  184. case "ListenAddressesChanged":
  185. return ListenAddressesChanged
  186. case "LoginAttempt":
  187. return LoginAttempt
  188. case "FolderWatchStateChanged":
  189. return FolderWatchStateChanged
  190. case "Failure":
  191. return Failure
  192. default:
  193. return 0
  194. }
  195. }
  196. const BufferSize = 64
  197. type Logger interface {
  198. suture.Service
  199. Log(t EventType, data interface{})
  200. Subscribe(mask EventType) Subscription
  201. }
  202. type logger struct {
  203. suture.Service
  204. subs []*subscription
  205. nextSubscriptionIDs []int
  206. nextGlobalID int
  207. timeout *time.Timer
  208. events chan Event
  209. funcs chan func(context.Context)
  210. toUnsubscribe chan *subscription
  211. stop chan struct{}
  212. }
  213. type Event struct {
  214. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  215. SubscriptionID int `json:"id"`
  216. // Global ID of the event across all subscriptions
  217. GlobalID int `json:"globalID"`
  218. Time time.Time `json:"time"`
  219. Type EventType `json:"type"`
  220. Data interface{} `json:"data"`
  221. }
  222. type Subscription interface {
  223. C() <-chan Event
  224. Poll(timeout time.Duration) (Event, error)
  225. Mask() EventType
  226. Unsubscribe()
  227. }
  228. type subscription struct {
  229. mask EventType
  230. events chan Event
  231. toUnsubscribe chan *subscription
  232. timeout *time.Timer
  233. ctx context.Context
  234. }
  235. var (
  236. ErrTimeout = errors.New("timeout")
  237. ErrClosed = errors.New("closed")
  238. )
  239. func NewLogger() Logger {
  240. l := &logger{
  241. timeout: time.NewTimer(time.Second),
  242. events: make(chan Event, BufferSize),
  243. funcs: make(chan func(context.Context)),
  244. toUnsubscribe: make(chan *subscription),
  245. }
  246. l.Service = util.AsService(l.serve, l.String())
  247. // Make sure the timer is in the stopped state and hasn't fired anything
  248. // into the channel.
  249. if !l.timeout.Stop() {
  250. <-l.timeout.C
  251. }
  252. return l
  253. }
  254. func (l *logger) serve(ctx context.Context) {
  255. loop:
  256. for {
  257. select {
  258. case e := <-l.events:
  259. // Incoming events get sent
  260. l.sendEvent(e)
  261. case fn := <-l.funcs:
  262. // Subscriptions are handled here.
  263. fn(ctx)
  264. case s := <-l.toUnsubscribe:
  265. l.unsubscribe(s)
  266. case <-ctx.Done():
  267. break loop
  268. }
  269. }
  270. // Closing the event channels corresponds to what happens when a
  271. // subscription is unsubscribed; this stops any BufferedSubscription,
  272. // makes Poll() return ErrClosed, etc.
  273. for _, s := range l.subs {
  274. close(s.events)
  275. }
  276. }
  277. func (l *logger) Log(t EventType, data interface{}) {
  278. l.events <- Event{
  279. Time: time.Now(),
  280. Type: t,
  281. Data: data,
  282. // SubscriptionID and GlobalID are set in sendEvent
  283. }
  284. }
  285. func (l *logger) sendEvent(e Event) {
  286. l.nextGlobalID++
  287. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  288. e.GlobalID = l.nextGlobalID
  289. for i, s := range l.subs {
  290. if s.mask&e.Type != 0 {
  291. e.SubscriptionID = l.nextSubscriptionIDs[i]
  292. l.nextSubscriptionIDs[i]++
  293. l.timeout.Reset(eventLogTimeout)
  294. timedOut := false
  295. select {
  296. case s.events <- e:
  297. case <-l.timeout.C:
  298. // if s.events is not ready, drop the event
  299. timedOut = true
  300. }
  301. // If stop returns false it already sent something to the
  302. // channel. If we didn't already read it above we must do so now
  303. // or we get a spurious timeout on the next loop.
  304. if !l.timeout.Stop() && !timedOut {
  305. <-l.timeout.C
  306. }
  307. }
  308. }
  309. }
  310. func (l *logger) Subscribe(mask EventType) Subscription {
  311. res := make(chan Subscription)
  312. l.funcs <- func(ctx context.Context) {
  313. dl.Debugln("subscribe", mask)
  314. s := &subscription{
  315. mask: mask,
  316. events: make(chan Event, BufferSize),
  317. toUnsubscribe: l.toUnsubscribe,
  318. timeout: time.NewTimer(0),
  319. ctx: ctx,
  320. }
  321. // We need to create the timeout timer in the stopped, non-fired state so
  322. // that Subscription.Poll() can safely reset it and select on the timeout
  323. // channel. This ensures the timer is stopped and the channel drained.
  324. if runningTests {
  325. // Make the behavior stable when running tests to avoid randomly
  326. // varying test coverage. This ensures, in practice if not in
  327. // theory, that the timer fires and we take the true branch of the
  328. // next if.
  329. runtime.Gosched()
  330. }
  331. if !s.timeout.Stop() {
  332. <-s.timeout.C
  333. }
  334. l.subs = append(l.subs, s)
  335. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  336. res <- s
  337. }
  338. return <-res
  339. }
  340. func (l *logger) unsubscribe(s *subscription) {
  341. dl.Debugln("unsubscribe", s.mask)
  342. for i, ss := range l.subs {
  343. if s == ss {
  344. last := len(l.subs) - 1
  345. l.subs[i] = l.subs[last]
  346. l.subs[last] = nil
  347. l.subs = l.subs[:last]
  348. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  349. l.nextSubscriptionIDs[last] = 0
  350. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  351. break
  352. }
  353. }
  354. close(s.events)
  355. }
  356. func (l *logger) String() string {
  357. return fmt.Sprintf("events.Logger/@%p", l)
  358. }
  359. // Poll returns an event from the subscription or an error if the poll times
  360. // out of the event channel is closed. Poll should not be called concurrently
  361. // from multiple goroutines for a single subscription.
  362. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  363. dl.Debugln("poll", timeout)
  364. s.timeout.Reset(timeout)
  365. select {
  366. case e, ok := <-s.events:
  367. if !ok {
  368. return e, ErrClosed
  369. }
  370. if runningTests {
  371. // Make the behavior stable when running tests to avoid randomly
  372. // varying test coverage. This ensures, in practice if not in
  373. // theory, that the timer fires and we take the true branch of
  374. // the next if.
  375. s.timeout.Reset(0)
  376. runtime.Gosched()
  377. }
  378. if !s.timeout.Stop() {
  379. // The timeout must be stopped and possibly drained to be ready
  380. // for reuse in the next call.
  381. <-s.timeout.C
  382. }
  383. return e, nil
  384. case <-s.timeout.C:
  385. return Event{}, ErrTimeout
  386. }
  387. }
  388. func (s *subscription) C() <-chan Event {
  389. return s.events
  390. }
  391. func (s *subscription) Mask() EventType {
  392. return s.mask
  393. }
  394. func (s *subscription) Unsubscribe() {
  395. select {
  396. case s.toUnsubscribe <- s:
  397. case <-s.ctx.Done():
  398. }
  399. }
  400. type bufferedSubscription struct {
  401. sub Subscription
  402. buf []Event
  403. next int
  404. cur int // Current SubscriptionID
  405. mut sync.Mutex
  406. cond *sync.TimeoutCond
  407. }
  408. type BufferedSubscription interface {
  409. Since(id int, into []Event, timeout time.Duration) []Event
  410. Mask() EventType
  411. }
  412. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  413. bs := &bufferedSubscription{
  414. sub: s,
  415. buf: make([]Event, size),
  416. mut: sync.NewMutex(),
  417. }
  418. bs.cond = sync.NewTimeoutCond(bs.mut)
  419. go bs.pollingLoop()
  420. return bs
  421. }
  422. func (s *bufferedSubscription) pollingLoop() {
  423. for ev := range s.sub.C() {
  424. s.mut.Lock()
  425. s.buf[s.next] = ev
  426. s.next = (s.next + 1) % len(s.buf)
  427. s.cur = ev.SubscriptionID
  428. s.cond.Broadcast()
  429. s.mut.Unlock()
  430. }
  431. }
  432. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  433. s.mut.Lock()
  434. defer s.mut.Unlock()
  435. // Check once first before generating the TimeoutCondWaiter
  436. if id >= s.cur {
  437. waiter := s.cond.SetupWait(timeout)
  438. defer waiter.Stop()
  439. for id >= s.cur {
  440. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  441. // Timed out
  442. return into
  443. }
  444. }
  445. }
  446. for i := s.next; i < len(s.buf); i++ {
  447. if s.buf[i].SubscriptionID > id {
  448. into = append(into, s.buf[i])
  449. }
  450. }
  451. for i := 0; i < s.next; i++ {
  452. if s.buf[i].SubscriptionID > id {
  453. into = append(into, s.buf[i])
  454. }
  455. }
  456. return into
  457. }
  458. func (s *bufferedSubscription) Mask() EventType {
  459. return s.sub.Mask()
  460. }
  461. // Error returns a string pointer suitable for JSON marshalling errors. It
  462. // retains the "null on success" semantics, but ensures the error result is a
  463. // string regardless of the underlying concrete error type.
  464. func Error(err error) *string {
  465. if err == nil {
  466. return nil
  467. }
  468. str := err.Error()
  469. return &str
  470. }
  471. type noopLogger struct{}
  472. var NoopLogger Logger = &noopLogger{}
  473. func (*noopLogger) Serve() {}
  474. func (*noopLogger) Stop() {}
  475. func (*noopLogger) Log(t EventType, data interface{}) {}
  476. func (*noopLogger) Subscribe(mask EventType) Subscription {
  477. return &noopSubscription{}
  478. }
  479. type noopSubscription struct{}
  480. func (*noopSubscription) C() <-chan Event {
  481. return nil
  482. }
  483. func (*noopSubscription) Poll(timeout time.Duration) (Event, error) {
  484. return Event{}, errNoop
  485. }
  486. func (s *noopSubscription) Mask() EventType {
  487. return 0
  488. }
  489. func (*noopSubscription) Unsubscribe() {}