events_test.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package events
  7. import (
  8. "encoding/json"
  9. "fmt"
  10. "testing"
  11. "time"
  12. )
  13. const timeout = time.Second
  14. func init() {
  15. runningTests = true
  16. }
  17. func TestNewLogger(t *testing.T) {
  18. l := NewLogger()
  19. if l == nil {
  20. t.Fatal("Unexpected nil Logger")
  21. }
  22. }
  23. func TestSubscriber(t *testing.T) {
  24. l := NewLogger()
  25. s := l.Subscribe(0)
  26. defer l.Unsubscribe(s)
  27. if s == nil {
  28. t.Fatal("Unexpected nil Subscription")
  29. }
  30. }
  31. func TestTimeout(t *testing.T) {
  32. l := NewLogger()
  33. s := l.Subscribe(0)
  34. defer l.Unsubscribe(s)
  35. _, err := s.Poll(timeout)
  36. if err != ErrTimeout {
  37. t.Fatal("Unexpected non-Timeout error:", err)
  38. }
  39. }
  40. func TestEventBeforeSubscribe(t *testing.T) {
  41. l := NewLogger()
  42. l.Log(DeviceConnected, "foo")
  43. s := l.Subscribe(0)
  44. defer l.Unsubscribe(s)
  45. _, err := s.Poll(timeout)
  46. if err != ErrTimeout {
  47. t.Fatal("Unexpected non-Timeout error:", err)
  48. }
  49. }
  50. func TestEventAfterSubscribe(t *testing.T) {
  51. l := NewLogger()
  52. s := l.Subscribe(AllEvents)
  53. defer l.Unsubscribe(s)
  54. l.Log(DeviceConnected, "foo")
  55. ev, err := s.Poll(timeout)
  56. if err != nil {
  57. t.Fatal("Unexpected error:", err)
  58. }
  59. if ev.Type != DeviceConnected {
  60. t.Error("Incorrect event type", ev.Type)
  61. }
  62. switch v := ev.Data.(type) {
  63. case string:
  64. if v != "foo" {
  65. t.Error("Incorrect Data string", v)
  66. }
  67. default:
  68. t.Errorf("Incorrect Data type %#v", v)
  69. }
  70. }
  71. func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
  72. l := NewLogger()
  73. s := l.Subscribe(DeviceDisconnected)
  74. defer l.Unsubscribe(s)
  75. l.Log(DeviceConnected, "foo")
  76. _, err := s.Poll(timeout)
  77. if err != ErrTimeout {
  78. t.Fatal("Unexpected non-Timeout error:", err)
  79. }
  80. }
  81. func TestBufferOverflow(t *testing.T) {
  82. l := NewLogger()
  83. s := l.Subscribe(AllEvents)
  84. defer l.Unsubscribe(s)
  85. // The first BufferSize events will be logged pretty much
  86. // instantaneously. The next BufferSize events will each block for up to
  87. // 15ms, plus overhead from race detector and thread scheduling latency
  88. // etc. This latency can sometimes be significant and is incurred for
  89. // each call. We just verify that the whole test completes in a
  90. // reasonable time, taking no more than 15 seconds in total.
  91. t0 := time.Now()
  92. const nEvents = BufferSize * 2
  93. for i := 0; i < nEvents; i++ {
  94. l.Log(DeviceConnected, "foo")
  95. }
  96. if d := time.Since(t0); d > 15*time.Second {
  97. t.Fatal("Logging took too long,", d, "avg", d/nEvents, "expected <", eventLogTimeout)
  98. }
  99. }
  100. func TestUnsubscribe(t *testing.T) {
  101. l := NewLogger()
  102. s := l.Subscribe(AllEvents)
  103. l.Log(DeviceConnected, "foo")
  104. _, err := s.Poll(timeout)
  105. if err != nil {
  106. t.Fatal("Unexpected error:", err)
  107. }
  108. l.Unsubscribe(s)
  109. l.Log(DeviceConnected, "foo")
  110. _, err = s.Poll(timeout)
  111. if err != ErrClosed {
  112. t.Fatal("Unexpected non-Closed error:", err)
  113. }
  114. }
  115. func TestGlobalIDs(t *testing.T) {
  116. l := NewLogger()
  117. s := l.Subscribe(AllEvents)
  118. defer l.Unsubscribe(s)
  119. l.Log(DeviceConnected, "foo")
  120. _ = l.Subscribe(AllEvents)
  121. l.Log(DeviceConnected, "bar")
  122. ev, err := s.Poll(timeout)
  123. if err != nil {
  124. t.Fatal("Unexpected error:", err)
  125. }
  126. if ev.Data.(string) != "foo" {
  127. t.Fatal("Incorrect event:", ev)
  128. }
  129. id := ev.GlobalID
  130. ev, err = s.Poll(timeout)
  131. if err != nil {
  132. t.Fatal("Unexpected error:", err)
  133. }
  134. if ev.Data.(string) != "bar" {
  135. t.Fatal("Incorrect event:", ev)
  136. }
  137. if ev.GlobalID != id+1 {
  138. t.Fatalf("ID not incremented (%d != %d)", ev.GlobalID, id+1)
  139. }
  140. }
  141. func TestSubscriptionIDs(t *testing.T) {
  142. l := NewLogger()
  143. s := l.Subscribe(DeviceConnected)
  144. defer l.Unsubscribe(s)
  145. l.Log(DeviceDisconnected, "a")
  146. l.Log(DeviceConnected, "b")
  147. l.Log(DeviceConnected, "c")
  148. l.Log(DeviceDisconnected, "d")
  149. ev, err := s.Poll(timeout)
  150. if err != nil {
  151. t.Fatal("Unexpected error:", err)
  152. }
  153. if ev.GlobalID != 2 {
  154. t.Fatal("Incorrect GlobalID:", ev.GlobalID)
  155. }
  156. if ev.SubscriptionID != 1 {
  157. t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
  158. }
  159. ev, err = s.Poll(timeout)
  160. if err != nil {
  161. t.Fatal("Unexpected error:", err)
  162. }
  163. if ev.GlobalID != 3 {
  164. t.Fatal("Incorrect GlobalID:", ev.GlobalID)
  165. }
  166. if ev.SubscriptionID != 2 {
  167. t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
  168. }
  169. ev, err = s.Poll(timeout)
  170. if err != ErrTimeout {
  171. t.Fatal("Unexpected error:", err)
  172. }
  173. }
  174. func TestBufferedSub(t *testing.T) {
  175. l := NewLogger()
  176. s := l.Subscribe(AllEvents)
  177. defer l.Unsubscribe(s)
  178. bs := NewBufferedSubscription(s, 10*BufferSize)
  179. go func() {
  180. for i := 0; i < 10*BufferSize; i++ {
  181. l.Log(DeviceConnected, fmt.Sprintf("event-%d", i))
  182. if i%30 == 0 {
  183. // Give the buffer routine time to pick up the events
  184. time.Sleep(20 * time.Millisecond)
  185. }
  186. }
  187. }()
  188. recv := 0
  189. for recv < 10*BufferSize {
  190. evs := bs.Since(recv, nil, time.Minute)
  191. for _, ev := range evs {
  192. if ev.GlobalID != recv+1 {
  193. t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1)
  194. }
  195. recv = ev.GlobalID
  196. }
  197. }
  198. }
  199. func BenchmarkBufferedSub(b *testing.B) {
  200. l := NewLogger()
  201. s := l.Subscribe(AllEvents)
  202. defer l.Unsubscribe(s)
  203. bufferSize := BufferSize
  204. bs := NewBufferedSubscription(s, bufferSize)
  205. // The coord channel paces the sender according to the receiver,
  206. // ensuring that no events are dropped. The benchmark measures sending +
  207. // receiving + synchronization overhead.
  208. coord := make(chan struct{}, bufferSize)
  209. for i := 0; i < bufferSize-1; i++ {
  210. coord <- struct{}{}
  211. }
  212. // Receive the events
  213. done := make(chan error)
  214. go func() {
  215. recv := 0
  216. var evs []Event
  217. for i := 0; i < b.N; {
  218. evs = bs.Since(recv, evs[:0], time.Minute)
  219. for _, ev := range evs {
  220. if ev.GlobalID != recv+1 {
  221. done <- fmt.Errorf("skipped event %v %v", ev.GlobalID, recv)
  222. return
  223. }
  224. recv = ev.GlobalID
  225. coord <- struct{}{}
  226. }
  227. i += len(evs)
  228. }
  229. done <- nil
  230. }()
  231. // Send the events
  232. eventData := map[string]string{
  233. "foo": "bar",
  234. "other": "data",
  235. "and": "something else",
  236. }
  237. for i := 0; i < b.N; i++ {
  238. l.Log(DeviceConnected, eventData)
  239. <-coord
  240. }
  241. if err := <-done; err != nil {
  242. b.Error(err)
  243. }
  244. b.ReportAllocs()
  245. }
  246. func TestSinceUsesSubscriptionId(t *testing.T) {
  247. l := NewLogger()
  248. s := l.Subscribe(DeviceConnected)
  249. defer l.Unsubscribe(s)
  250. bs := NewBufferedSubscription(s, 10*BufferSize)
  251. l.Log(DeviceConnected, "a") // SubscriptionID = 1
  252. l.Log(DeviceDisconnected, "b")
  253. l.Log(DeviceDisconnected, "c")
  254. l.Log(DeviceConnected, "d") // SubscriptionID = 2
  255. // We need to loop for the events, as they may not all have been
  256. // delivered to the buffered subscription when we get here.
  257. t0 := time.Now()
  258. for time.Since(t0) < time.Second {
  259. events := bs.Since(0, nil, time.Minute)
  260. if len(events) == 2 {
  261. break
  262. }
  263. if len(events) > 2 {
  264. t.Fatal("Incorrect number of events:", len(events))
  265. }
  266. }
  267. events := bs.Since(1, nil, time.Minute)
  268. if len(events) != 1 {
  269. t.Fatal("Incorrect number of events:", len(events))
  270. }
  271. }
  272. func TestUnmarshalEvent(t *testing.T) {
  273. var event Event
  274. s := `
  275. {
  276. "id": 1,
  277. "globalID": 1,
  278. "time": "2006-01-02T15:04:05.999999999Z",
  279. "type": "Starting",
  280. "data": {}
  281. }`
  282. if err := json.Unmarshal([]byte(s), &event); err != nil {
  283. t.Fatal("Failed to unmarshal event:", err)
  284. }
  285. }