events.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This program is free software: you can redistribute it and/or modify it
  4. // under the terms of the GNU General Public License as published by the Free
  5. // Software Foundation, either version 3 of the License, or (at your option)
  6. // any later version.
  7. //
  8. // This program is distributed in the hope that it will be useful, but WITHOUT
  9. // ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  10. // FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
  11. // more details.
  12. //
  13. // You should have received a copy of the GNU General Public License along
  14. // with this program. If not, see <http://www.gnu.org/licenses/>.
  15. // Package events provides event subscription and polling functionality.
  16. package events
  17. import (
  18. "errors"
  19. "sync"
  20. "time"
  21. )
  22. type EventType uint64
  23. const (
  24. Ping EventType = 1 << iota
  25. Starting
  26. StartupComplete
  27. DeviceDiscovered
  28. DeviceConnected
  29. DeviceDisconnected
  30. DeviceRejected
  31. LocalIndexUpdated
  32. RemoteIndexUpdated
  33. ItemStarted
  34. StateChanged
  35. FolderRejected
  36. ConfigSaved
  37. DownloadProgress
  38. AllEvents = (1 << iota) - 1
  39. )
  40. func (t EventType) String() string {
  41. switch t {
  42. case Ping:
  43. return "Ping"
  44. case Starting:
  45. return "Starting"
  46. case StartupComplete:
  47. return "StartupComplete"
  48. case DeviceDiscovered:
  49. return "DeviceDiscovered"
  50. case DeviceConnected:
  51. return "DeviceConnected"
  52. case DeviceDisconnected:
  53. return "DeviceDisconnected"
  54. case DeviceRejected:
  55. return "DeviceRejected"
  56. case LocalIndexUpdated:
  57. return "LocalIndexUpdated"
  58. case RemoteIndexUpdated:
  59. return "RemoteIndexUpdated"
  60. case ItemStarted:
  61. return "ItemStarted"
  62. case StateChanged:
  63. return "StateChanged"
  64. case FolderRejected:
  65. return "FolderRejected"
  66. case ConfigSaved:
  67. return "ConfigSaved"
  68. case DownloadProgress:
  69. return "DownloadProgress"
  70. default:
  71. return "Unknown"
  72. }
  73. }
  74. func (t EventType) MarshalText() ([]byte, error) {
  75. return []byte(t.String()), nil
  76. }
  77. const BufferSize = 64
  78. type Logger struct {
  79. subs map[int]*Subscription
  80. nextID int
  81. mutex sync.Mutex
  82. }
  83. type Event struct {
  84. ID int `json:"id"`
  85. Time time.Time `json:"time"`
  86. Type EventType `json:"type"`
  87. Data interface{} `json:"data"`
  88. }
  89. type Subscription struct {
  90. mask EventType
  91. id int
  92. events chan Event
  93. mutex sync.Mutex
  94. }
  95. var Default = NewLogger()
  96. var (
  97. ErrTimeout = errors.New("timeout")
  98. ErrClosed = errors.New("closed")
  99. )
  100. func NewLogger() *Logger {
  101. return &Logger{
  102. subs: make(map[int]*Subscription),
  103. }
  104. }
  105. func (l *Logger) Log(t EventType, data interface{}) {
  106. l.mutex.Lock()
  107. if debug {
  108. dl.Debugln("log", l.nextID, t.String(), data)
  109. }
  110. e := Event{
  111. ID: l.nextID,
  112. Time: time.Now(),
  113. Type: t,
  114. Data: data,
  115. }
  116. l.nextID++
  117. for _, s := range l.subs {
  118. if s.mask&t != 0 {
  119. select {
  120. case s.events <- e:
  121. default:
  122. // if s.events is not ready, drop the event
  123. }
  124. }
  125. }
  126. l.mutex.Unlock()
  127. }
  128. func (l *Logger) Subscribe(mask EventType) *Subscription {
  129. l.mutex.Lock()
  130. if debug {
  131. dl.Debugln("subscribe", mask)
  132. }
  133. s := &Subscription{
  134. mask: mask,
  135. id: l.nextID,
  136. events: make(chan Event, BufferSize),
  137. }
  138. l.nextID++
  139. l.subs[s.id] = s
  140. l.mutex.Unlock()
  141. return s
  142. }
  143. func (l *Logger) Unsubscribe(s *Subscription) {
  144. l.mutex.Lock()
  145. if debug {
  146. dl.Debugln("unsubscribe")
  147. }
  148. delete(l.subs, s.id)
  149. close(s.events)
  150. l.mutex.Unlock()
  151. }
  152. func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
  153. s.mutex.Lock()
  154. defer s.mutex.Unlock()
  155. if debug {
  156. dl.Debugln("poll", timeout)
  157. }
  158. to := time.After(timeout)
  159. select {
  160. case e, ok := <-s.events:
  161. if !ok {
  162. return e, ErrClosed
  163. }
  164. return e, nil
  165. case <-to:
  166. return Event{}, ErrTimeout
  167. }
  168. }
  169. func (s *Subscription) C() <-chan Event {
  170. return s.events
  171. }
  172. type BufferedSubscription struct {
  173. sub *Subscription
  174. buf []Event
  175. next int
  176. cur int
  177. mut sync.Mutex
  178. cond *sync.Cond
  179. }
  180. func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
  181. bs := &BufferedSubscription{
  182. sub: s,
  183. buf: make([]Event, size),
  184. }
  185. bs.cond = sync.NewCond(&bs.mut)
  186. go bs.pollingLoop()
  187. return bs
  188. }
  189. func (s *BufferedSubscription) pollingLoop() {
  190. for {
  191. ev, err := s.sub.Poll(60 * time.Second)
  192. if err == ErrTimeout {
  193. continue
  194. }
  195. if err == ErrClosed {
  196. return
  197. }
  198. if err != nil {
  199. panic("unexpected error: " + err.Error())
  200. }
  201. s.mut.Lock()
  202. s.buf[s.next] = ev
  203. s.next = (s.next + 1) % len(s.buf)
  204. s.cur = ev.ID
  205. s.cond.Broadcast()
  206. s.mut.Unlock()
  207. }
  208. }
  209. func (s *BufferedSubscription) Since(id int, into []Event) []Event {
  210. s.mut.Lock()
  211. defer s.mut.Unlock()
  212. for id >= s.cur {
  213. s.cond.Wait()
  214. }
  215. for i := s.next; i < len(s.buf); i++ {
  216. if s.buf[i].ID > id {
  217. into = append(into, s.buf[i])
  218. }
  219. }
  220. for i := 0; i < s.next; i++ {
  221. if s.buf[i].ID > id {
  222. into = append(into, s.buf[i])
  223. }
  224. }
  225. return into
  226. }