events.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. //go:generate go tool counterfeiter -o mocks/buffered_subscription.go --fake-name BufferedSubscription . BufferedSubscription
  7. // Package events provides event subscription and polling functionality.
  8. package events
  9. import (
  10. "context"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "runtime"
  15. "sync"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/syncutil"
  18. "github.com/thejerf/suture/v4"
  19. )
  20. type EventType int64
  21. const (
  22. Starting EventType = 1 << iota
  23. StartupComplete
  24. DeviceDiscovered
  25. DeviceConnected
  26. DeviceDisconnected
  27. DeviceRejected // DEPRECATED, superseded by PendingDevicesChanged
  28. PendingDevicesChanged
  29. DevicePaused
  30. DeviceResumed
  31. ClusterConfigReceived
  32. LocalChangeDetected
  33. RemoteChangeDetected
  34. LocalIndexUpdated
  35. RemoteIndexUpdated
  36. ItemStarted
  37. ItemFinished
  38. StateChanged
  39. FolderRejected // DEPRECATED, superseded by PendingFoldersChanged
  40. PendingFoldersChanged
  41. ConfigSaved
  42. DownloadProgress
  43. RemoteDownloadProgress
  44. FolderSummary
  45. FolderCompletion
  46. FolderErrors
  47. FolderScanProgress
  48. FolderPaused
  49. FolderResumed
  50. FolderWatchStateChanged
  51. ListenAddressesChanged
  52. LoginAttempt
  53. Failure
  54. UpgradeRestartScheduled
  55. AllEvents = (1 << iota) - 1
  56. )
  57. var (
  58. runningTests = false
  59. errNoop = errors.New("method of a noop object called")
  60. )
  61. const eventLogTimeout = 15 * time.Millisecond
  62. func (t EventType) String() string {
  63. switch t {
  64. case Starting:
  65. return "Starting"
  66. case StartupComplete:
  67. return "StartupComplete"
  68. case DeviceDiscovered:
  69. return "DeviceDiscovered"
  70. case DeviceConnected:
  71. return "DeviceConnected"
  72. case DeviceDisconnected:
  73. return "DeviceDisconnected"
  74. case DeviceRejected:
  75. return "DeviceRejected"
  76. case PendingDevicesChanged:
  77. return "PendingDevicesChanged"
  78. case LocalChangeDetected:
  79. return "LocalChangeDetected"
  80. case RemoteChangeDetected:
  81. return "RemoteChangeDetected"
  82. case LocalIndexUpdated:
  83. return "LocalIndexUpdated"
  84. case RemoteIndexUpdated:
  85. return "RemoteIndexUpdated"
  86. case ItemStarted:
  87. return "ItemStarted"
  88. case ItemFinished:
  89. return "ItemFinished"
  90. case StateChanged:
  91. return "StateChanged"
  92. case FolderRejected:
  93. return "FolderRejected"
  94. case PendingFoldersChanged:
  95. return "PendingFoldersChanged"
  96. case ConfigSaved:
  97. return "ConfigSaved"
  98. case DownloadProgress:
  99. return "DownloadProgress"
  100. case RemoteDownloadProgress:
  101. return "RemoteDownloadProgress"
  102. case FolderSummary:
  103. return "FolderSummary"
  104. case FolderCompletion:
  105. return "FolderCompletion"
  106. case FolderErrors:
  107. return "FolderErrors"
  108. case DevicePaused:
  109. return "DevicePaused"
  110. case DeviceResumed:
  111. return "DeviceResumed"
  112. case ClusterConfigReceived:
  113. return "ClusterConfigReceived"
  114. case FolderScanProgress:
  115. return "FolderScanProgress"
  116. case FolderPaused:
  117. return "FolderPaused"
  118. case FolderResumed:
  119. return "FolderResumed"
  120. case ListenAddressesChanged:
  121. return "ListenAddressesChanged"
  122. case LoginAttempt:
  123. return "LoginAttempt"
  124. case FolderWatchStateChanged:
  125. return "FolderWatchStateChanged"
  126. case Failure:
  127. return "Failure"
  128. case UpgradeRestartScheduled:
  129. return "UpgradeRestartScheduled"
  130. default:
  131. return "Unknown"
  132. }
  133. }
  134. func (t EventType) MarshalText() ([]byte, error) {
  135. return []byte(t.String()), nil
  136. }
  137. func (t *EventType) UnmarshalJSON(b []byte) error {
  138. var s string
  139. if err := json.Unmarshal(b, &s); err != nil {
  140. return err
  141. }
  142. *t = UnmarshalEventType(s)
  143. return nil
  144. }
  145. func UnmarshalEventType(s string) EventType {
  146. switch s {
  147. case "Starting":
  148. return Starting
  149. case "StartupComplete":
  150. return StartupComplete
  151. case "DeviceDiscovered":
  152. return DeviceDiscovered
  153. case "DeviceConnected":
  154. return DeviceConnected
  155. case "DeviceDisconnected":
  156. return DeviceDisconnected
  157. case "DeviceRejected":
  158. return DeviceRejected
  159. case "PendingDevicesChanged":
  160. return PendingDevicesChanged
  161. case "LocalChangeDetected":
  162. return LocalChangeDetected
  163. case "RemoteChangeDetected":
  164. return RemoteChangeDetected
  165. case "LocalIndexUpdated":
  166. return LocalIndexUpdated
  167. case "RemoteIndexUpdated":
  168. return RemoteIndexUpdated
  169. case "ItemStarted":
  170. return ItemStarted
  171. case "ItemFinished":
  172. return ItemFinished
  173. case "StateChanged":
  174. return StateChanged
  175. case "FolderRejected":
  176. return FolderRejected
  177. case "PendingFoldersChanged":
  178. return PendingFoldersChanged
  179. case "ConfigSaved":
  180. return ConfigSaved
  181. case "DownloadProgress":
  182. return DownloadProgress
  183. case "RemoteDownloadProgress":
  184. return RemoteDownloadProgress
  185. case "FolderSummary":
  186. return FolderSummary
  187. case "FolderCompletion":
  188. return FolderCompletion
  189. case "FolderErrors":
  190. return FolderErrors
  191. case "DevicePaused":
  192. return DevicePaused
  193. case "DeviceResumed":
  194. return DeviceResumed
  195. case "ClusterConfigReceived":
  196. return ClusterConfigReceived
  197. case "FolderScanProgress":
  198. return FolderScanProgress
  199. case "FolderPaused":
  200. return FolderPaused
  201. case "FolderResumed":
  202. return FolderResumed
  203. case "ListenAddressesChanged":
  204. return ListenAddressesChanged
  205. case "LoginAttempt":
  206. return LoginAttempt
  207. case "FolderWatchStateChanged":
  208. return FolderWatchStateChanged
  209. case "Failure":
  210. return Failure
  211. case "UpgradeRestartScheduled":
  212. return UpgradeRestartScheduled
  213. default:
  214. return 0
  215. }
  216. }
  217. const BufferSize = 64
  218. type Logger interface {
  219. suture.Service
  220. Log(t EventType, data interface{})
  221. Subscribe(mask EventType) Subscription
  222. }
  223. type logger struct {
  224. subs []*subscription
  225. nextSubscriptionIDs []int
  226. nextGlobalID int
  227. timeout *time.Timer
  228. events chan Event
  229. funcs chan func(context.Context)
  230. toUnsubscribe chan *subscription
  231. }
  232. type Event struct {
  233. // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
  234. SubscriptionID int `json:"id"`
  235. // Global ID of the event across all subscriptions
  236. GlobalID int `json:"globalID"`
  237. Time time.Time `json:"time"`
  238. Type EventType `json:"type"`
  239. Data interface{} `json:"data"`
  240. }
  241. type Subscription interface {
  242. C() <-chan Event
  243. Poll(timeout time.Duration) (Event, error)
  244. Mask() EventType
  245. Unsubscribe()
  246. }
  247. type subscription struct {
  248. mask EventType
  249. events chan Event
  250. toUnsubscribe chan *subscription
  251. timeout *time.Timer
  252. ctx context.Context
  253. }
  254. var (
  255. ErrTimeout = errors.New("timeout")
  256. ErrClosed = errors.New("closed")
  257. )
  258. func NewLogger() Logger {
  259. l := &logger{
  260. timeout: time.NewTimer(time.Second),
  261. events: make(chan Event, BufferSize),
  262. funcs: make(chan func(context.Context)),
  263. toUnsubscribe: make(chan *subscription),
  264. }
  265. // Make sure the timer is in the stopped state and hasn't fired anything
  266. // into the channel.
  267. if !l.timeout.Stop() {
  268. <-l.timeout.C
  269. }
  270. return l
  271. }
  272. func (l *logger) Serve(ctx context.Context) error {
  273. loop:
  274. for {
  275. select {
  276. case e := <-l.events:
  277. // Incoming events get sent
  278. l.sendEvent(e)
  279. metricEvents.WithLabelValues(e.Type.String(), metricEventStateCreated).Inc()
  280. case fn := <-l.funcs:
  281. // Subscriptions are handled here.
  282. fn(ctx)
  283. case s := <-l.toUnsubscribe:
  284. l.unsubscribe(s)
  285. case <-ctx.Done():
  286. break loop
  287. }
  288. }
  289. // Closing the event channels corresponds to what happens when a
  290. // subscription is unsubscribed; this stops any BufferedSubscription,
  291. // makes Poll() return ErrClosed, etc.
  292. for _, s := range l.subs {
  293. close(s.events)
  294. }
  295. return nil
  296. }
  297. func (l *logger) Log(t EventType, data interface{}) {
  298. l.events <- Event{
  299. Time: time.Now(), // intentionally high precision
  300. Type: t,
  301. Data: data,
  302. // SubscriptionID and GlobalID are set in sendEvent
  303. }
  304. }
  305. func (l *logger) sendEvent(e Event) {
  306. l.nextGlobalID++
  307. dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
  308. e.GlobalID = l.nextGlobalID
  309. for i, s := range l.subs {
  310. if s.mask&e.Type != 0 {
  311. e.SubscriptionID = l.nextSubscriptionIDs[i]
  312. l.nextSubscriptionIDs[i]++
  313. l.timeout.Reset(eventLogTimeout)
  314. timedOut := false
  315. select {
  316. case s.events <- e:
  317. metricEvents.WithLabelValues(e.Type.String(), metricEventStateDelivered).Inc()
  318. case <-l.timeout.C:
  319. // if s.events is not ready, drop the event
  320. timedOut = true
  321. metricEvents.WithLabelValues(e.Type.String(), metricEventStateDropped).Inc()
  322. }
  323. // If stop returns false it already sent something to the
  324. // channel. If we didn't already read it above we must do so now
  325. // or we get a spurious timeout on the next loop.
  326. if !l.timeout.Stop() && !timedOut {
  327. <-l.timeout.C
  328. }
  329. }
  330. }
  331. }
  332. func (l *logger) Subscribe(mask EventType) Subscription {
  333. res := make(chan Subscription)
  334. l.funcs <- func(ctx context.Context) {
  335. dl.Debugln("subscribe", mask)
  336. s := &subscription{
  337. mask: mask,
  338. events: make(chan Event, BufferSize),
  339. toUnsubscribe: l.toUnsubscribe,
  340. timeout: time.NewTimer(0),
  341. ctx: ctx,
  342. }
  343. // We need to create the timeout timer in the stopped, non-fired state so
  344. // that Subscription.Poll() can safely reset it and select on the timeout
  345. // channel. This ensures the timer is stopped and the channel drained.
  346. if runningTests {
  347. // Make the behavior stable when running tests to avoid randomly
  348. // varying test coverage. This ensures, in practice if not in
  349. // theory, that the timer fires and we take the true branch of the
  350. // next if.
  351. runtime.Gosched()
  352. }
  353. if !s.timeout.Stop() {
  354. <-s.timeout.C
  355. }
  356. l.subs = append(l.subs, s)
  357. l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
  358. res <- s
  359. }
  360. return <-res
  361. }
  362. func (l *logger) unsubscribe(s *subscription) {
  363. dl.Debugln("unsubscribe", s.mask)
  364. for i, ss := range l.subs {
  365. if s == ss {
  366. last := len(l.subs) - 1
  367. l.subs[i] = l.subs[last]
  368. l.subs[last] = nil
  369. l.subs = l.subs[:last]
  370. l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
  371. l.nextSubscriptionIDs[last] = 0
  372. l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
  373. break
  374. }
  375. }
  376. close(s.events)
  377. }
  378. func (l *logger) String() string {
  379. return fmt.Sprintf("events.Logger/@%p", l)
  380. }
  381. // Poll returns an event from the subscription or an error if the poll times
  382. // out of the event channel is closed. Poll should not be called concurrently
  383. // from multiple goroutines for a single subscription.
  384. func (s *subscription) Poll(timeout time.Duration) (Event, error) {
  385. dl.Debugln("poll", timeout)
  386. s.timeout.Reset(timeout)
  387. select {
  388. case e, ok := <-s.events:
  389. if !ok {
  390. return e, ErrClosed
  391. }
  392. if runningTests {
  393. // Make the behavior stable when running tests to avoid randomly
  394. // varying test coverage. This ensures, in practice if not in
  395. // theory, that the timer fires and we take the true branch of
  396. // the next if.
  397. s.timeout.Reset(0)
  398. runtime.Gosched()
  399. }
  400. if !s.timeout.Stop() {
  401. // The timeout must be stopped and possibly drained to be ready
  402. // for reuse in the next call.
  403. <-s.timeout.C
  404. }
  405. return e, nil
  406. case <-s.timeout.C:
  407. return Event{}, ErrTimeout
  408. }
  409. }
  410. func (s *subscription) C() <-chan Event {
  411. return s.events
  412. }
  413. func (s *subscription) Mask() EventType {
  414. return s.mask
  415. }
  416. func (s *subscription) Unsubscribe() {
  417. select {
  418. case s.toUnsubscribe <- s:
  419. case <-s.ctx.Done():
  420. }
  421. }
  422. type bufferedSubscription struct {
  423. sub Subscription
  424. buf []Event
  425. next int
  426. cur int // Current SubscriptionID
  427. mut sync.Mutex
  428. cond *syncutil.TimeoutCond
  429. }
  430. type BufferedSubscription interface {
  431. Since(id int, into []Event, timeout time.Duration) []Event
  432. Mask() EventType
  433. }
  434. func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
  435. bs := &bufferedSubscription{
  436. sub: s,
  437. buf: make([]Event, size),
  438. }
  439. bs.cond = syncutil.NewTimeoutCond(&bs.mut)
  440. go bs.pollingLoop()
  441. return bs
  442. }
  443. func (s *bufferedSubscription) pollingLoop() {
  444. for ev := range s.sub.C() {
  445. s.mut.Lock()
  446. s.buf[s.next] = ev
  447. s.next = (s.next + 1) % len(s.buf)
  448. s.cur = ev.SubscriptionID
  449. s.cond.Broadcast()
  450. s.mut.Unlock()
  451. }
  452. }
  453. func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
  454. s.mut.Lock()
  455. defer s.mut.Unlock()
  456. // Check once first before generating the TimeoutCondWaiter
  457. if id >= s.cur {
  458. waiter := s.cond.SetupWait(timeout)
  459. defer waiter.Stop()
  460. for id >= s.cur {
  461. if eventsAvailable := waiter.Wait(); !eventsAvailable {
  462. // Timed out
  463. return into
  464. }
  465. }
  466. }
  467. for i := s.next; i < len(s.buf); i++ {
  468. if s.buf[i].SubscriptionID > id {
  469. into = append(into, s.buf[i])
  470. }
  471. }
  472. for i := range s.next {
  473. if s.buf[i].SubscriptionID > id {
  474. into = append(into, s.buf[i])
  475. }
  476. }
  477. return into
  478. }
  479. func (s *bufferedSubscription) Mask() EventType {
  480. return s.sub.Mask()
  481. }
  482. // Error returns a string pointer suitable for JSON marshalling errors. It
  483. // retains the "null on success" semantics, but ensures the error result is a
  484. // string regardless of the underlying concrete error type.
  485. func Error(err error) *string {
  486. if err == nil {
  487. return nil
  488. }
  489. str := err.Error()
  490. return &str
  491. }
  492. type noopLogger struct{}
  493. var NoopLogger Logger = &noopLogger{}
  494. func (*noopLogger) Serve(_ context.Context) error { return nil }
  495. func (*noopLogger) Log(_ EventType, _ interface{}) {}
  496. func (*noopLogger) Subscribe(_ EventType) Subscription {
  497. return &noopSubscription{}
  498. }
  499. type noopSubscription struct{}
  500. func (*noopSubscription) C() <-chan Event {
  501. return nil
  502. }
  503. func (*noopSubscription) Poll(_ time.Duration) (Event, error) {
  504. return Event{}, errNoop
  505. }
  506. func (*noopSubscription) Mask() EventType {
  507. return 0
  508. }
  509. func (*noopSubscription) Unsubscribe() {}