events.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "errors"
  10. stdsync "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. type EventType int
  15. const (
  16. Ping EventType = 1 << iota
  17. Starting
  18. StartupComplete
  19. DeviceDiscovered
  20. DeviceConnected
  21. DeviceDisconnected
  22. DeviceRejected
  23. DevicePaused
  24. DeviceResumed
  25. LocalIndexUpdated
  26. RemoteIndexUpdated
  27. ItemStarted
  28. ItemFinished
  29. StateChanged
  30. FolderRejected
  31. ConfigSaved
  32. DownloadProgress
  33. FolderSummary
  34. FolderCompletion
  35. FolderErrors
  36. FolderScanProgress
  37. ListenAddressesChanged
  38. LoginAttempt
  39. AllEvents = (1 << iota) - 1
  40. )
  41. func (t EventType) String() string {
  42. switch t {
  43. case Ping:
  44. return "Ping"
  45. case Starting:
  46. return "Starting"
  47. case StartupComplete:
  48. return "StartupComplete"
  49. case DeviceDiscovered:
  50. return "DeviceDiscovered"
  51. case DeviceConnected:
  52. return "DeviceConnected"
  53. case DeviceDisconnected:
  54. return "DeviceDisconnected"
  55. case DeviceRejected:
  56. return "DeviceRejected"
  57. case LocalIndexUpdated:
  58. return "LocalIndexUpdated"
  59. case RemoteIndexUpdated:
  60. return "RemoteIndexUpdated"
  61. case ItemStarted:
  62. return "ItemStarted"
  63. case ItemFinished:
  64. return "ItemFinished"
  65. case StateChanged:
  66. return "StateChanged"
  67. case FolderRejected:
  68. return "FolderRejected"
  69. case ConfigSaved:
  70. return "ConfigSaved"
  71. case DownloadProgress:
  72. return "DownloadProgress"
  73. case FolderSummary:
  74. return "FolderSummary"
  75. case FolderCompletion:
  76. return "FolderCompletion"
  77. case FolderErrors:
  78. return "FolderErrors"
  79. case DevicePaused:
  80. return "DevicePaused"
  81. case DeviceResumed:
  82. return "DeviceResumed"
  83. case FolderScanProgress:
  84. return "FolderScanProgress"
  85. case ListenAddressesChanged:
  86. return "ListenAddressesChanged"
  87. case LoginAttempt:
  88. return "LoginAttempt"
  89. default:
  90. return "Unknown"
  91. }
  92. }
  93. func (t EventType) MarshalText() ([]byte, error) {
  94. return []byte(t.String()), nil
  95. }
  96. const BufferSize = 64
  97. type Logger struct {
  98. subs []*Subscription
  99. nextID int
  100. mutex sync.Mutex
  101. }
  102. type Event struct {
  103. ID int `json:"id"`
  104. Time time.Time `json:"time"`
  105. Type EventType `json:"type"`
  106. Data interface{} `json:"data"`
  107. }
  108. type Subscription struct {
  109. mask EventType
  110. events chan Event
  111. timeout *time.Timer
  112. }
  113. var Default = NewLogger()
  114. var (
  115. ErrTimeout = errors.New("timeout")
  116. ErrClosed = errors.New("closed")
  117. )
  118. func NewLogger() *Logger {
  119. return &Logger{
  120. mutex: sync.NewMutex(),
  121. }
  122. }
  123. func (l *Logger) Log(t EventType, data interface{}) {
  124. l.mutex.Lock()
  125. dl.Debugln("log", l.nextID, t, data)
  126. l.nextID++
  127. e := Event{
  128. ID: l.nextID,
  129. Time: time.Now(),
  130. Type: t,
  131. Data: data,
  132. }
  133. for _, s := range l.subs {
  134. if s.mask&t != 0 {
  135. select {
  136. case s.events <- e:
  137. default:
  138. // if s.events is not ready, drop the event
  139. }
  140. }
  141. }
  142. l.mutex.Unlock()
  143. }
  144. func (l *Logger) Subscribe(mask EventType) *Subscription {
  145. l.mutex.Lock()
  146. dl.Debugln("subscribe", mask)
  147. s := &Subscription{
  148. mask: mask,
  149. events: make(chan Event, BufferSize),
  150. timeout: time.NewTimer(0),
  151. }
  152. // We need to create the timeout timer in the stopped, non-fired state so
  153. // that Subscription.Poll() can safely reset it and select on the timeout
  154. // channel. This ensures the timer is stopped and the channel drained.
  155. if !s.timeout.Stop() {
  156. <-s.timeout.C
  157. }
  158. l.subs = append(l.subs, s)
  159. l.mutex.Unlock()
  160. return s
  161. }
  162. func (l *Logger) Unsubscribe(s *Subscription) {
  163. l.mutex.Lock()
  164. dl.Debugln("unsubscribe")
  165. for i, ss := range l.subs {
  166. if s == ss {
  167. last := len(l.subs) - 1
  168. l.subs[i] = l.subs[last]
  169. l.subs[last] = nil
  170. l.subs = l.subs[:last]
  171. break
  172. }
  173. }
  174. close(s.events)
  175. l.mutex.Unlock()
  176. }
  177. // Poll returns an event from the subscription or an error if the poll times
  178. // out of the event channel is closed. Poll should not be called concurrently
  179. // from multiple goroutines for a single subscription.
  180. func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
  181. dl.Debugln("poll", timeout)
  182. s.timeout.Reset(timeout)
  183. select {
  184. case e, ok := <-s.events:
  185. if !ok {
  186. return e, ErrClosed
  187. }
  188. if !s.timeout.Stop() {
  189. // The timeout must be stopped and possibly drained to be ready
  190. // for reuse in the next call.
  191. <-s.timeout.C
  192. }
  193. return e, nil
  194. case <-s.timeout.C:
  195. return Event{}, ErrTimeout
  196. }
  197. }
  198. func (s *Subscription) C() <-chan Event {
  199. return s.events
  200. }
  201. type bufferedSubscription struct {
  202. sub *Subscription
  203. buf []Event
  204. next int
  205. cur int
  206. mut sync.Mutex
  207. cond *stdsync.Cond
  208. }
  209. type BufferedSubscription interface {
  210. Since(id int, into []Event) []Event
  211. }
  212. func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
  213. bs := &bufferedSubscription{
  214. sub: s,
  215. buf: make([]Event, size),
  216. mut: sync.NewMutex(),
  217. }
  218. bs.cond = stdsync.NewCond(bs.mut)
  219. go bs.pollingLoop()
  220. return bs
  221. }
  222. func (s *bufferedSubscription) pollingLoop() {
  223. for {
  224. ev, err := s.sub.Poll(60 * time.Second)
  225. if err == ErrTimeout {
  226. continue
  227. }
  228. if err == ErrClosed {
  229. return
  230. }
  231. if err != nil {
  232. panic("unexpected error: " + err.Error())
  233. }
  234. s.mut.Lock()
  235. s.buf[s.next] = ev
  236. s.next = (s.next + 1) % len(s.buf)
  237. s.cur = ev.ID
  238. s.cond.Broadcast()
  239. s.mut.Unlock()
  240. }
  241. }
  242. func (s *bufferedSubscription) Since(id int, into []Event) []Event {
  243. s.mut.Lock()
  244. defer s.mut.Unlock()
  245. for id >= s.cur {
  246. s.cond.Wait()
  247. }
  248. for i := s.next; i < len(s.buf); i++ {
  249. if s.buf[i].ID > id {
  250. into = append(into, s.buf[i])
  251. }
  252. }
  253. for i := 0; i < s.next; i++ {
  254. if s.buf[i].ID > id {
  255. into = append(into, s.buf[i])
  256. }
  257. }
  258. return into
  259. }
  260. // Error returns a string pointer suitable for JSON marshalling errors. It
  261. // retains the "null on success" semantics, but ensures the error result is a
  262. // string regardless of the underlying concrete error type.
  263. func Error(err error) *string {
  264. if err == nil {
  265. return nil
  266. }
  267. str := err.Error()
  268. return &str
  269. }