events.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate counterfeiter -o mocks/buffered_subscription.go --fake-name BufferedSubscription . BufferedSubscription
  7. // Package events provides event subscription and polling functionality.
  8. package events
  9. import (
  10. "context"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "runtime"
  15. "time"
  16. "github.com/thejerf/suture/v4"
  17. "github.com/syncthing/syncthing/lib/sync"
  18. )
  19. type EventType int64
  20. const (
  21. Starting EventType = 1 << iota
  22. StartupComplete
  23. DeviceDiscovered
  24. DeviceConnected
  25. DeviceDisconnected
  26. DeviceRejected // DEPRECATED, superseded by PendingDevicesChanged
  27. PendingDevicesChanged
  28. DevicePaused
  29. DeviceResumed
  30. LocalChangeDetected
  31. RemoteChangeDetected
  32. LocalIndexUpdated
  33. RemoteIndexUpdated
  34. ItemStarted
  35. ItemFinished
  36. StateChanged
  37. FolderRejected // DEPRECATED, superseded by PendingFoldersChanged
  38. PendingFoldersChanged
  39. ConfigSaved
  40. DownloadProgress
  41. RemoteDownloadProgress
  42. FolderSummary
  43. FolderCompletion
  44. FolderErrors
  45. FolderScanProgress
  46. FolderPaused
  47. FolderResumed
  48. FolderWatchStateChanged
  49. ListenAddressesChanged
  50. LoginAttempt
  51. Failure
  52. AllEvents = (1 << iota) - 1
  53. )
  54. var (
  55. runningTests = false
  56. errNoop = errors.New("method of a noop object called")
  57. )
  58. const eventLogTimeout = 15 * time.Millisecond
  59. func (t EventType) String() string {
  60. switch t {
  61. case Starting:
  62. return "Starting"
  63. case StartupComplete:
  64. return "StartupComplete"
  65. case DeviceDiscovered:
  66. return "DeviceDiscovered"
  67. case DeviceConnected:
  68. return "DeviceConnected"
  69. case DeviceDisconnected:
  70. return "DeviceDisconnected"
  71. case DeviceRejected:
  72. return "DeviceRejected"
  73. case PendingDevicesChanged:
  74. return "PendingDevicesChanged"
  75. case LocalChangeDetected:
  76. return "LocalChangeDetected"
  77. case RemoteChangeDetected:
  78. return "RemoteChangeDetected"
  79. case LocalIndexUpdated:
  80. return "LocalIndexUpdated"
  81. case RemoteIndexUpdated:
  82. return "RemoteIndexUpdated"
  83. case ItemStarted:
  84. return "ItemStarted"
  85. case ItemFinished:
  86. return "ItemFinished"
  87. case StateChanged:
  88. return "StateChanged"
  89. case FolderRejected:
  90. return "FolderRejected"
  91. case PendingFoldersChanged:
  92. return "PendingFoldersChanged"
  93. case ConfigSaved:
  94. return "ConfigSaved"
  95. case DownloadProgress:
  96. return "DownloadProgress"
  97. case RemoteDownloadProgress:
  98. return "RemoteDownloadProgress"
  99. case FolderSummary:
  100. return "FolderSummary"
  101. case FolderCompletion:
  102. return "FolderCompletion"
  103. case FolderErrors:
  104. return "FolderErrors"
  105. case DevicePaused:
  106. return "DevicePaused"
  107. case DeviceResumed:
  108. return "DeviceResumed"
  109. case FolderScanProgress:
  110. return "FolderScanProgress"
  111. case FolderPaused:
  112. return "FolderPaused"
  113. case FolderResumed:
  114. return "FolderResumed"
  115. case ListenAddressesChanged:
  116. return "ListenAddressesChanged"
  117. case LoginAttempt:
  118. return "LoginAttempt"
  119. case FolderWatchStateChanged:
  120. return "FolderWatchStateChanged"
  121. case Failure:
  122. return "Failure"
  123. default:
  124. return "Unknown"
  125. }
  126. }
  127. func (t EventType) MarshalText() ([]byte, error) {
  128. return []byte(t.String()), nil
  129. }
  130. func (t *EventType) UnmarshalJSON(b []byte) error {
  131. var s string
  132. if err := json.Unmarshal(b, &s); err != nil {
  133. return err
  134. }
  135. *t = UnmarshalEventType(s)
  136. return nil
  137. }
  138. func UnmarshalEventType(s string) EventType {
  139. switch s {
  140. case "Starting":
  141. return Starting
  142. case "StartupComplete":
  143. return StartupComplete
  144. case "DeviceDiscovered":
  145. return DeviceDiscovered
  146. case "DeviceConnected":
  147. return DeviceConnected
  148. case "DeviceDisconnected":
  149. return DeviceDisconnected
  150. case "DeviceRejected":
  151. return DeviceRejected
  152. case "PendingDevicesChanged":
  153. return PendingDevicesChanged
  154. case "LocalChangeDetected":
  155. return LocalChangeDetected
  156. case "RemoteChangeDetected":
  157. return RemoteChangeDetected
  158. case "LocalIndexUpdated":
  159. return LocalIndexUpdated
  160. case "RemoteIndexUpdated":
  161. return RemoteIndexUpdated
  162. case "ItemStarted":
  163. return ItemStarted
  164. case "ItemFinished":
  165. return ItemFinished
  166. case "StateChanged":
  167. return StateChanged
  168. case "FolderRejected":
  169. return FolderRejected
  170. case "PendingFoldersChanged":
  171. return PendingFoldersChanged
  172. case "ConfigSaved":
  173. return ConfigSaved
  174. case "DownloadProgress":
  175. return DownloadProgress
  176. case "RemoteDownloadProgress":
  177. return RemoteDownloadProgress
  178. case "FolderSummary":
  179. return FolderSummary
  180. case "FolderCompletion":
  181. return FolderCompletion
  182. case "FolderErrors":
  183. return FolderErrors
  184. case "DevicePaused":
  185. return DevicePaused
  186. case "DeviceResumed":
  187. return DeviceResumed
  188. case "FolderScanProgress":
  189. return FolderScanProgress
  190. case "FolderPaused":
  191. return FolderPaused
  192. case "FolderResumed":
  193. return FolderResumed
  194. case "ListenAddressesChanged":
  195. return ListenAddressesChanged
  196. case "LoginAttempt":
  197. return LoginAttempt
  198. case "FolderWatchStateChanged":
  199. return FolderWatchStateChanged
  200. case "Failure":
  201. return Failure
  202. default:
  203. return 0
  204. }
  205. }
  206. const BufferSize = 64
  207. type Logger interface {
  208. suture.Service
  209. Log(t EventType, data interface{})
  210. Subscribe(mask EventType) Subscription
  211. }
  212. type logger struct {
  213. subs []*subscription
  214. nextSubscriptionIDs []int
  215. nextGlobalID int
  216. timeout *time.Timer
  217. events chan Event
  218. funcs chan func(context.Context)
  219. toUnsubscribe chan *subscription
  220. stop chan struct{}
  221. }
  222. type Event struct {
  223. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  224. SubscriptionID int `json:"id"`
  225. // Global ID of the event across all subscriptions
  226. GlobalID int `json:"globalID"`
  227. Time time.Time `json:"time"`
  228. Type EventType `json:"type"`
  229. Data interface{} `json:"data"`
  230. }
  231. type Subscription interface {
  232. C() <-chan Event
  233. Poll(timeout time.Duration) (Event, error)
  234. Mask() EventType
  235. Unsubscribe()
  236. }
  237. type subscription struct {
  238. mask EventType
  239. events chan Event
  240. toUnsubscribe chan *subscription
  241. timeout *time.Timer
  242. ctx context.Context
  243. }
  244. var (
  245. ErrTimeout = errors.New("timeout")
  246. ErrClosed = errors.New("closed")
  247. )
  248. func NewLogger() Logger {
  249. l := &logger{
  250. timeout: time.NewTimer(time.Second),
  251. events: make(chan Event, BufferSize),
  252. funcs: make(chan func(context.Context)),
  253. toUnsubscribe: make(chan *subscription),
  254. }
  255. // Make sure the timer is in the stopped state and hasn't fired anything
  256. // into the channel.
  257. if !l.timeout.Stop() {
  258. <-l.timeout.C
  259. }
  260. return l
  261. }
  262. func (l *logger) Serve(ctx context.Context) error {
  263. loop:
  264. for {
  265. select {
  266. case e := <-l.events:
  267. // Incoming events get sent
  268. l.sendEvent(e)
  269. case fn := <-l.funcs:
  270. // Subscriptions are handled here.
  271. fn(ctx)
  272. case s := <-l.toUnsubscribe:
  273. l.unsubscribe(s)
  274. case <-ctx.Done():
  275. break loop
  276. }
  277. }
  278. // Closing the event channels corresponds to what happens when a
  279. // subscription is unsubscribed; this stops any BufferedSubscription,
  280. // makes Poll() return ErrClosed, etc.
  281. for _, s := range l.subs {
  282. close(s.events)
  283. }
  284. return nil
  285. }
  286. func (l *logger) Log(t EventType, data interface{}) {
  287. l.events <- Event{
  288. Time: time.Now(),
  289. Type: t,
  290. Data: data,
  291. // SubscriptionID and GlobalID are set in sendEvent
  292. }
  293. }
  294. func (l *logger) sendEvent(e Event) {
  295. l.nextGlobalID++
  296. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  297. e.GlobalID = l.nextGlobalID
  298. for i, s := range l.subs {
  299. if s.mask&e.Type != 0 {
  300. e.SubscriptionID = l.nextSubscriptionIDs[i]
  301. l.nextSubscriptionIDs[i]++
  302. l.timeout.Reset(eventLogTimeout)
  303. timedOut := false
  304. select {
  305. case s.events <- e:
  306. case <-l.timeout.C:
  307. // if s.events is not ready, drop the event
  308. timedOut = true
  309. }
  310. // If stop returns false it already sent something to the
  311. // channel. If we didn't already read it above we must do so now
  312. // or we get a spurious timeout on the next loop.
  313. if !l.timeout.Stop() && !timedOut {
  314. <-l.timeout.C
  315. }
  316. }
  317. }
  318. }
  319. func (l *logger) Subscribe(mask EventType) Subscription {
  320. res := make(chan Subscription)
  321. l.funcs <- func(ctx context.Context) {
  322. dl.Debugln("subscribe", mask)
  323. s := &subscription{
  324. mask: mask,
  325. events: make(chan Event, BufferSize),
  326. toUnsubscribe: l.toUnsubscribe,
  327. timeout: time.NewTimer(0),
  328. ctx: ctx,
  329. }
  330. // We need to create the timeout timer in the stopped, non-fired state so
  331. // that Subscription.Poll() can safely reset it and select on the timeout
  332. // channel. This ensures the timer is stopped and the channel drained.
  333. if runningTests {
  334. // Make the behavior stable when running tests to avoid randomly
  335. // varying test coverage. This ensures, in practice if not in
  336. // theory, that the timer fires and we take the true branch of the
  337. // next if.
  338. runtime.Gosched()
  339. }
  340. if !s.timeout.Stop() {
  341. <-s.timeout.C
  342. }
  343. l.subs = append(l.subs, s)
  344. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  345. res <- s
  346. }
  347. return <-res
  348. }
  349. func (l *logger) unsubscribe(s *subscription) {
  350. dl.Debugln("unsubscribe", s.mask)
  351. for i, ss := range l.subs {
  352. if s == ss {
  353. last := len(l.subs) - 1
  354. l.subs[i] = l.subs[last]
  355. l.subs[last] = nil
  356. l.subs = l.subs[:last]
  357. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  358. l.nextSubscriptionIDs[last] = 0
  359. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  360. break
  361. }
  362. }
  363. close(s.events)
  364. }
  365. func (l *logger) String() string {
  366. return fmt.Sprintf("events.Logger/@%p", l)
  367. }
  368. // Poll returns an event from the subscription or an error if the poll times
  369. // out of the event channel is closed. Poll should not be called concurrently
  370. // from multiple goroutines for a single subscription.
  371. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  372. dl.Debugln("poll", timeout)
  373. s.timeout.Reset(timeout)
  374. select {
  375. case e, ok := <-s.events:
  376. if !ok {
  377. return e, ErrClosed
  378. }
  379. if runningTests {
  380. // Make the behavior stable when running tests to avoid randomly
  381. // varying test coverage. This ensures, in practice if not in
  382. // theory, that the timer fires and we take the true branch of
  383. // the next if.
  384. s.timeout.Reset(0)
  385. runtime.Gosched()
  386. }
  387. if !s.timeout.Stop() {
  388. // The timeout must be stopped and possibly drained to be ready
  389. // for reuse in the next call.
  390. <-s.timeout.C
  391. }
  392. return e, nil
  393. case <-s.timeout.C:
  394. return Event{}, ErrTimeout
  395. }
  396. }
  397. func (s *subscription) C() <-chan Event {
  398. return s.events
  399. }
  400. func (s *subscription) Mask() EventType {
  401. return s.mask
  402. }
  403. func (s *subscription) Unsubscribe() {
  404. select {
  405. case s.toUnsubscribe <- s:
  406. case <-s.ctx.Done():
  407. }
  408. }
  409. type bufferedSubscription struct {
  410. sub Subscription
  411. buf []Event
  412. next int
  413. cur int // Current SubscriptionID
  414. mut sync.Mutex
  415. cond *sync.TimeoutCond
  416. }
  417. type BufferedSubscription interface {
  418. Since(id int, into []Event, timeout time.Duration) []Event
  419. Mask() EventType
  420. }
  421. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  422. bs := &bufferedSubscription{
  423. sub: s,
  424. buf: make([]Event, size),
  425. mut: sync.NewMutex(),
  426. }
  427. bs.cond = sync.NewTimeoutCond(bs.mut)
  428. go bs.pollingLoop()
  429. return bs
  430. }
  431. func (s *bufferedSubscription) pollingLoop() {
  432. for ev := range s.sub.C() {
  433. s.mut.Lock()
  434. s.buf[s.next] = ev
  435. s.next = (s.next + 1) % len(s.buf)
  436. s.cur = ev.SubscriptionID
  437. s.cond.Broadcast()
  438. s.mut.Unlock()
  439. }
  440. }
  441. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  442. s.mut.Lock()
  443. defer s.mut.Unlock()
  444. // Check once first before generating the TimeoutCondWaiter
  445. if id >= s.cur {
  446. waiter := s.cond.SetupWait(timeout)
  447. defer waiter.Stop()
  448. for id >= s.cur {
  449. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  450. // Timed out
  451. return into
  452. }
  453. }
  454. }
  455. for i := s.next; i < len(s.buf); i++ {
  456. if s.buf[i].SubscriptionID > id {
  457. into = append(into, s.buf[i])
  458. }
  459. }
  460. for i := 0; i < s.next; i++ {
  461. if s.buf[i].SubscriptionID > id {
  462. into = append(into, s.buf[i])
  463. }
  464. }
  465. return into
  466. }
  467. func (s *bufferedSubscription) Mask() EventType {
  468. return s.sub.Mask()
  469. }
  470. // Error returns a string pointer suitable for JSON marshalling errors. It
  471. // retains the "null on success" semantics, but ensures the error result is a
  472. // string regardless of the underlying concrete error type.
  473. func Error(err error) *string {
  474. if err == nil {
  475. return nil
  476. }
  477. str := err.Error()
  478. return &str
  479. }
  480. type noopLogger struct{}
  481. var NoopLogger Logger = &noopLogger{}
  482. func (*noopLogger) Serve(ctx context.Context) error { return nil }
  483. func (*noopLogger) Stop() {}
  484. func (*noopLogger) Log(t EventType, data interface{}) {}
  485. func (*noopLogger) Subscribe(mask EventType) Subscription {
  486. return &noopSubscription{}
  487. }
  488. type noopSubscription struct{}
  489. func (*noopSubscription) C() <-chan Event {
  490. return nil
  491. }
  492. func (*noopSubscription) Poll(timeout time.Duration) (Event, error) {
  493. return Event{}, errNoop
  494. }
  495. func (s *noopSubscription) Mask() EventType {
  496. return 0
  497. }
  498. func (*noopSubscription) Unsubscribe() {}