NextTest.cs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Reactive;
  9. using System.Reactive.Concurrency;
  10. using System.Reactive.Linq;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using Microsoft.VisualStudio.TestTools.UnitTesting;
  15. using Assert = Xunit.Assert;
  16. namespace ReactiveTests.Tests
  17. {
  18. [TestClass]
  19. public class NextTest : ReactiveTest
  20. {
  21. [TestMethod]
  22. public void Next_ArgumentChecking()
  23. {
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Next(default(IObservable<int>)));
  25. }
  26. [TestMethod]
  27. public void Next1()
  28. {
  29. var evt = new AutoResetEvent(false);
  30. var src = Observable.Create<int>(obs =>
  31. {
  32. Task.Run(() =>
  33. {
  34. evt.WaitOne();
  35. obs.OnNext(1);
  36. evt.WaitOne();
  37. obs.OnNext(2);
  38. evt.WaitOne();
  39. obs.OnCompleted();
  40. });
  41. return () => { };
  42. });
  43. var res = src.Next().GetEnumerator();
  44. void release() => Task.Run(async () =>
  45. {
  46. await Task.Delay(250);
  47. evt.Set();
  48. });
  49. release();
  50. Assert.True(res.MoveNext());
  51. Assert.Equal(1, res.Current);
  52. release();
  53. Assert.True(res.MoveNext());
  54. Assert.Equal(2, res.Current);
  55. release();
  56. Assert.False(res.MoveNext());
  57. }
  58. [TestMethod]
  59. public void Next2()
  60. {
  61. var scheduler = new TestScheduler();
  62. var xs = scheduler.CreateHotObservable(
  63. OnNext(210, 1),
  64. OnNext(220, 2),
  65. OnNext(230, 3),
  66. OnNext(240, 4),
  67. OnNext(250, 5),
  68. OnNext(260, 6),
  69. OnNext(270, 7),
  70. OnNext(280, 8),
  71. OnNext(290, 9),
  72. OnCompleted<int>(300)
  73. );
  74. var res = xs.Next();
  75. var e1 = default(IEnumerator<int>);
  76. scheduler.ScheduleAbsolute(200, () =>
  77. {
  78. e1 = res.GetEnumerator();
  79. });
  80. scheduler.ScheduleAbsolute(285, () => e1.Dispose());
  81. var e2 = default(IEnumerator);
  82. scheduler.ScheduleAbsolute(255, () =>
  83. {
  84. e2 = ((IEnumerable)res).GetEnumerator();
  85. });
  86. scheduler.Start();
  87. xs.Subscriptions.AssertEqual(
  88. Subscribe(200, 285),
  89. Subscribe(255, 300)
  90. );
  91. }
  92. [TestMethod]
  93. public void Next_DoesNotBlock()
  94. {
  95. var evt = new ManualResetEvent(false);
  96. var xs = Observable.Empty<int>().Do(_ => { }, () => evt.Set());
  97. var e = xs.Next().GetEnumerator();
  98. evt.WaitOne();
  99. Assert.False(e.MoveNext());
  100. }
  101. [TestMethod]
  102. public void Next_SomeResults()
  103. {
  104. var xs = Observable.Range(0, 100, Scheduler.Default);
  105. var res = xs.Next().ToList();
  106. Assert.True(res.All(x => x < 100));
  107. Assert.True(res.Count == res.Distinct().Count());
  108. }
  109. [TestMethod]
  110. public void Next_Error()
  111. {
  112. var ex = new Exception();
  113. var evt = new AutoResetEvent(false);
  114. var src = Observable.Create<int>(obs =>
  115. {
  116. new Thread(() =>
  117. {
  118. evt.WaitOne();
  119. obs.OnNext(1);
  120. evt.WaitOne();
  121. obs.OnError(ex);
  122. }).Start();
  123. return () => { };
  124. });
  125. var res = src.Next().GetEnumerator();
  126. void release() => new Thread(() =>
  127. {
  128. Thread.Sleep(250);
  129. evt.Set();
  130. }).Start();
  131. release();
  132. Assert.True(res.MoveNext());
  133. Assert.Equal(1, res.Current);
  134. release();
  135. ReactiveAssert.Throws(ex, () => res.MoveNext());
  136. }
  137. }
  138. }