events_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package events
  7. import (
  8. "fmt"
  9. "testing"
  10. "time"
  11. )
  12. const timeout = 100 * time.Millisecond
  13. func init() {
  14. runningTests = true
  15. }
  16. func TestNewLogger(t *testing.T) {
  17. l := NewLogger()
  18. if l == nil {
  19. t.Fatal("Unexpected nil Logger")
  20. }
  21. }
  22. func TestSubscriber(t *testing.T) {
  23. l := NewLogger()
  24. s := l.Subscribe(0)
  25. defer l.Unsubscribe(s)
  26. if s == nil {
  27. t.Fatal("Unexpected nil Subscription")
  28. }
  29. }
  30. func TestTimeout(t *testing.T) {
  31. l := NewLogger()
  32. s := l.Subscribe(0)
  33. defer l.Unsubscribe(s)
  34. _, err := s.Poll(timeout)
  35. if err != ErrTimeout {
  36. t.Fatal("Unexpected non-Timeout error:", err)
  37. }
  38. }
  39. func TestEventBeforeSubscribe(t *testing.T) {
  40. l := NewLogger()
  41. l.Log(DeviceConnected, "foo")
  42. s := l.Subscribe(0)
  43. defer l.Unsubscribe(s)
  44. _, err := s.Poll(timeout)
  45. if err != ErrTimeout {
  46. t.Fatal("Unexpected non-Timeout error:", err)
  47. }
  48. }
  49. func TestEventAfterSubscribe(t *testing.T) {
  50. l := NewLogger()
  51. s := l.Subscribe(AllEvents)
  52. defer l.Unsubscribe(s)
  53. l.Log(DeviceConnected, "foo")
  54. ev, err := s.Poll(timeout)
  55. if err != nil {
  56. t.Fatal("Unexpected error:", err)
  57. }
  58. if ev.Type != DeviceConnected {
  59. t.Error("Incorrect event type", ev.Type)
  60. }
  61. switch v := ev.Data.(type) {
  62. case string:
  63. if v != "foo" {
  64. t.Error("Incorrect Data string", v)
  65. }
  66. default:
  67. t.Errorf("Incorrect Data type %#v", v)
  68. }
  69. }
  70. func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
  71. l := NewLogger()
  72. s := l.Subscribe(DeviceDisconnected)
  73. defer l.Unsubscribe(s)
  74. l.Log(DeviceConnected, "foo")
  75. _, err := s.Poll(timeout)
  76. if err != ErrTimeout {
  77. t.Fatal("Unexpected non-Timeout error:", err)
  78. }
  79. }
  80. func TestBufferOverflow(t *testing.T) {
  81. l := NewLogger()
  82. s := l.Subscribe(AllEvents)
  83. defer l.Unsubscribe(s)
  84. t0 := time.Now()
  85. for i := 0; i < BufferSize*2; i++ {
  86. l.Log(DeviceConnected, "foo")
  87. }
  88. if time.Since(t0) > timeout {
  89. t.Fatalf("Logging took too long")
  90. }
  91. }
  92. func TestUnsubscribe(t *testing.T) {
  93. l := NewLogger()
  94. s := l.Subscribe(AllEvents)
  95. l.Log(DeviceConnected, "foo")
  96. _, err := s.Poll(timeout)
  97. if err != nil {
  98. t.Fatal("Unexpected error:", err)
  99. }
  100. l.Unsubscribe(s)
  101. l.Log(DeviceConnected, "foo")
  102. _, err = s.Poll(timeout)
  103. if err != ErrClosed {
  104. t.Fatal("Unexpected non-Closed error:", err)
  105. }
  106. }
  107. func TestGlobalIDs(t *testing.T) {
  108. l := NewLogger()
  109. s := l.Subscribe(AllEvents)
  110. defer l.Unsubscribe(s)
  111. l.Log(DeviceConnected, "foo")
  112. _ = l.Subscribe(AllEvents)
  113. l.Log(DeviceConnected, "bar")
  114. ev, err := s.Poll(timeout)
  115. if err != nil {
  116. t.Fatal("Unexpected error:", err)
  117. }
  118. if ev.Data.(string) != "foo" {
  119. t.Fatal("Incorrect event:", ev)
  120. }
  121. id := ev.GlobalID
  122. ev, err = s.Poll(timeout)
  123. if err != nil {
  124. t.Fatal("Unexpected error:", err)
  125. }
  126. if ev.Data.(string) != "bar" {
  127. t.Fatal("Incorrect event:", ev)
  128. }
  129. if ev.GlobalID != id+1 {
  130. t.Fatalf("ID not incremented (%d != %d)", ev.GlobalID, id+1)
  131. }
  132. }
  133. func TestSubscriptionIDs(t *testing.T) {
  134. l := NewLogger()
  135. s := l.Subscribe(DeviceConnected)
  136. defer l.Unsubscribe(s)
  137. l.Log(DeviceDisconnected, "a")
  138. l.Log(DeviceConnected, "b")
  139. l.Log(DeviceConnected, "c")
  140. l.Log(DeviceDisconnected, "d")
  141. ev, err := s.Poll(timeout)
  142. if err != nil {
  143. t.Fatal("Unexpected error:", err)
  144. }
  145. if ev.GlobalID != 2 {
  146. t.Fatal("Incorrect GlobalID:", ev.GlobalID)
  147. }
  148. if ev.SubscriptionID != 1 {
  149. t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
  150. }
  151. ev, err = s.Poll(timeout)
  152. if err != nil {
  153. t.Fatal("Unexpected error:", err)
  154. }
  155. if ev.GlobalID != 3 {
  156. t.Fatal("Incorrect GlobalID:", ev.GlobalID)
  157. }
  158. if ev.SubscriptionID != 2 {
  159. t.Fatal("Incorrect SubscriptionID:", ev.SubscriptionID)
  160. }
  161. ev, err = s.Poll(timeout)
  162. if err != ErrTimeout {
  163. t.Fatal("Unexpected error:", err)
  164. }
  165. }
  166. func TestBufferedSub(t *testing.T) {
  167. l := NewLogger()
  168. s := l.Subscribe(AllEvents)
  169. defer l.Unsubscribe(s)
  170. bs := NewBufferedSubscription(s, 10*BufferSize)
  171. go func() {
  172. for i := 0; i < 10*BufferSize; i++ {
  173. l.Log(DeviceConnected, fmt.Sprintf("event-%d", i))
  174. if i%30 == 0 {
  175. // Give the buffer routine time to pick up the events
  176. time.Sleep(20 * time.Millisecond)
  177. }
  178. }
  179. }()
  180. recv := 0
  181. for recv < 10*BufferSize {
  182. evs := bs.Since(recv, nil)
  183. for _, ev := range evs {
  184. if ev.GlobalID != recv+1 {
  185. t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1)
  186. }
  187. recv = ev.GlobalID
  188. }
  189. }
  190. }
  191. func BenchmarkBufferedSub(b *testing.B) {
  192. l := NewLogger()
  193. s := l.Subscribe(AllEvents)
  194. defer l.Unsubscribe(s)
  195. bufferSize := BufferSize
  196. bs := NewBufferedSubscription(s, bufferSize)
  197. // The coord channel paces the sender according to the receiver,
  198. // ensuring that no events are dropped. The benchmark measures sending +
  199. // receiving + synchronization overhead.
  200. coord := make(chan struct{}, bufferSize)
  201. for i := 0; i < bufferSize-1; i++ {
  202. coord <- struct{}{}
  203. }
  204. // Receive the events
  205. done := make(chan struct{})
  206. go func() {
  207. defer close(done)
  208. recv := 0
  209. var evs []Event
  210. for i := 0; i < b.N; {
  211. evs = bs.Since(recv, evs[:0])
  212. for _, ev := range evs {
  213. if ev.GlobalID != recv+1 {
  214. b.Fatal("skipped event", ev.GlobalID, recv)
  215. }
  216. recv = ev.GlobalID
  217. coord <- struct{}{}
  218. }
  219. i += len(evs)
  220. }
  221. }()
  222. // Send the events
  223. eventData := map[string]string{
  224. "foo": "bar",
  225. "other": "data",
  226. "and": "something else",
  227. }
  228. for i := 0; i < b.N; i++ {
  229. l.Log(DeviceConnected, eventData)
  230. <-coord
  231. }
  232. <-done
  233. b.ReportAllocs()
  234. }
  235. func TestSinceUsesSubscriptionId(t *testing.T) {
  236. l := NewLogger()
  237. s := l.Subscribe(DeviceConnected)
  238. defer l.Unsubscribe(s)
  239. bs := NewBufferedSubscription(s, 10*BufferSize)
  240. l.Log(DeviceConnected, "a") // SubscriptionID = 1
  241. l.Log(DeviceDisconnected, "b")
  242. l.Log(DeviceDisconnected, "c")
  243. l.Log(DeviceConnected, "d") // SubscriptionID = 2
  244. // We need to loop for the events, as they may not all have been
  245. // delivered to the buffered subscription when we get here.
  246. t0 := time.Now()
  247. for time.Since(t0) < time.Second {
  248. events := bs.Since(0, nil)
  249. if len(events) == 2 {
  250. break
  251. }
  252. if len(events) > 2 {
  253. t.Fatal("Incorrect number of events:", len(events))
  254. }
  255. }
  256. events := bs.Since(1, nil)
  257. if len(events) != 1 {
  258. t.Fatal("Incorrect number of events:", len(events))
  259. }
  260. }