events.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. // Package events provides event subscription and polling functionality.
  7. package events
  8. import (
  9. "errors"
  10. stdsync "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/sync"
  13. )
  14. type EventType int
  15. const (
  16. Ping EventType = 1 << iota
  17. Starting
  18. StartupComplete
  19. DeviceDiscovered
  20. DeviceConnected
  21. DeviceDisconnected
  22. DeviceRejected
  23. DevicePaused
  24. DeviceResumed
  25. LocalIndexUpdated
  26. RemoteIndexUpdated
  27. ItemStarted
  28. ItemFinished
  29. StateChanged
  30. FolderRejected
  31. ConfigSaved
  32. DownloadProgress
  33. FolderSummary
  34. FolderCompletion
  35. FolderErrors
  36. FolderScanProgress
  37. ExternalPortMappingChanged
  38. RelayStateChanged
  39. AllEvents = (1 << iota) - 1
  40. )
  41. func (t EventType) String() string {
  42. switch t {
  43. case Ping:
  44. return "Ping"
  45. case Starting:
  46. return "Starting"
  47. case StartupComplete:
  48. return "StartupComplete"
  49. case DeviceDiscovered:
  50. return "DeviceDiscovered"
  51. case DeviceConnected:
  52. return "DeviceConnected"
  53. case DeviceDisconnected:
  54. return "DeviceDisconnected"
  55. case DeviceRejected:
  56. return "DeviceRejected"
  57. case LocalIndexUpdated:
  58. return "LocalIndexUpdated"
  59. case RemoteIndexUpdated:
  60. return "RemoteIndexUpdated"
  61. case ItemStarted:
  62. return "ItemStarted"
  63. case ItemFinished:
  64. return "ItemFinished"
  65. case StateChanged:
  66. return "StateChanged"
  67. case FolderRejected:
  68. return "FolderRejected"
  69. case ConfigSaved:
  70. return "ConfigSaved"
  71. case DownloadProgress:
  72. return "DownloadProgress"
  73. case FolderSummary:
  74. return "FolderSummary"
  75. case FolderCompletion:
  76. return "FolderCompletion"
  77. case FolderErrors:
  78. return "FolderErrors"
  79. case DevicePaused:
  80. return "DevicePaused"
  81. case DeviceResumed:
  82. return "DeviceResumed"
  83. case FolderScanProgress:
  84. return "FolderScanProgress"
  85. case ExternalPortMappingChanged:
  86. return "ExternalPortMappingChanged"
  87. case RelayStateChanged:
  88. return "RelayStateChanged"
  89. default:
  90. return "Unknown"
  91. }
  92. }
  93. func (t EventType) MarshalText() ([]byte, error) {
  94. return []byte(t.String()), nil
  95. }
  96. const BufferSize = 64
  97. type Logger struct {
  98. subs []*Subscription
  99. nextID int
  100. mutex sync.Mutex
  101. }
  102. type Event struct {
  103. ID int `json:"id"`
  104. Time time.Time `json:"time"`
  105. Type EventType `json:"type"`
  106. Data interface{} `json:"data"`
  107. }
  108. type Subscription struct {
  109. mask EventType
  110. events chan Event
  111. timeout *time.Timer
  112. }
  113. var Default = NewLogger()
  114. var (
  115. ErrTimeout = errors.New("timeout")
  116. ErrClosed = errors.New("closed")
  117. )
  118. func NewLogger() *Logger {
  119. return &Logger{
  120. mutex: sync.NewMutex(),
  121. }
  122. }
  123. func (l *Logger) Log(t EventType, data interface{}) {
  124. l.mutex.Lock()
  125. dl.Debugln("log", l.nextID, t, data)
  126. l.nextID++
  127. e := Event{
  128. ID: l.nextID,
  129. Time: time.Now(),
  130. Type: t,
  131. Data: data,
  132. }
  133. for _, s := range l.subs {
  134. if s.mask&t != 0 {
  135. select {
  136. case s.events <- e:
  137. default:
  138. // if s.events is not ready, drop the event
  139. }
  140. }
  141. }
  142. l.mutex.Unlock()
  143. }
  144. func (l *Logger) Subscribe(mask EventType) *Subscription {
  145. l.mutex.Lock()
  146. dl.Debugln("subscribe", mask)
  147. s := &Subscription{
  148. mask: mask,
  149. events: make(chan Event, BufferSize),
  150. timeout: time.NewTimer(0),
  151. }
  152. l.subs = append(l.subs, s)
  153. l.mutex.Unlock()
  154. return s
  155. }
  156. func (l *Logger) Unsubscribe(s *Subscription) {
  157. l.mutex.Lock()
  158. dl.Debugln("unsubscribe")
  159. for i, ss := range l.subs {
  160. if s == ss {
  161. last := len(l.subs) - 1
  162. l.subs[i] = l.subs[last]
  163. l.subs[last] = nil
  164. l.subs = l.subs[:last]
  165. break
  166. }
  167. }
  168. close(s.events)
  169. l.mutex.Unlock()
  170. }
  171. // Poll returns an event from the subscription or an error if the poll times
  172. // out of the event channel is closed. Poll should not be called concurrently
  173. // from multiple goroutines for a single subscription.
  174. func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
  175. dl.Debugln("poll", timeout)
  176. if !s.timeout.Reset(timeout) {
  177. select {
  178. case <-s.timeout.C:
  179. default:
  180. }
  181. }
  182. select {
  183. case e, ok := <-s.events:
  184. if !ok {
  185. return e, ErrClosed
  186. }
  187. s.timeout.Stop()
  188. return e, nil
  189. case <-s.timeout.C:
  190. return Event{}, ErrTimeout
  191. }
  192. }
  193. func (s *Subscription) C() <-chan Event {
  194. return s.events
  195. }
  196. type BufferedSubscription struct {
  197. sub *Subscription
  198. buf []Event
  199. next int
  200. cur int
  201. mut sync.Mutex
  202. cond *stdsync.Cond
  203. }
  204. func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
  205. bs := &BufferedSubscription{
  206. sub: s,
  207. buf: make([]Event, size),
  208. mut: sync.NewMutex(),
  209. }
  210. bs.cond = stdsync.NewCond(bs.mut)
  211. go bs.pollingLoop()
  212. return bs
  213. }
  214. func (s *BufferedSubscription) pollingLoop() {
  215. for {
  216. ev, err := s.sub.Poll(60 * time.Second)
  217. if err == ErrTimeout {
  218. continue
  219. }
  220. if err == ErrClosed {
  221. return
  222. }
  223. if err != nil {
  224. panic("unexpected error: " + err.Error())
  225. }
  226. s.mut.Lock()
  227. s.buf[s.next] = ev
  228. s.next = (s.next + 1) % len(s.buf)
  229. s.cur = ev.ID
  230. s.cond.Broadcast()
  231. s.mut.Unlock()
  232. }
  233. }
  234. func (s *BufferedSubscription) Since(id int, into []Event) []Event {
  235. s.mut.Lock()
  236. defer s.mut.Unlock()
  237. for id >= s.cur {
  238. s.cond.Wait()
  239. }
  240. for i := s.next; i < len(s.buf); i++ {
  241. if s.buf[i].ID > id {
  242. into = append(into, s.buf[i])
  243. }
  244. }
  245. for i := 0; i < s.next; i++ {
  246. if s.buf[i].ID > id {
  247. into = append(into, s.buf[i])
  248. }
  249. }
  250. return into
  251. }
  252. // Error returns a string pointer suitable for JSON marshalling errors. It
  253. // retains the "null on sucess" semantics, but ensures the error result is a
  254. // string regardless of the underlying concrete error type.
  255. func Error(err error) *string {
  256. if err == nil {
  257. return nil
  258. }
  259. str := err.Error()
  260. return &str
  261. }