NextTest.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Reactive;
  9. using System.Reactive.Concurrency;
  10. using System.Reactive.Linq;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using Xunit;
  15. namespace ReactiveTests.Tests
  16. {
  17. public class NextTest : ReactiveTest
  18. {
  19. [Fact]
  20. public void Next_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Next(default(IObservable<int>)));
  23. }
  24. [Fact]
  25. public void Next1()
  26. {
  27. var evt = new AutoResetEvent(false);
  28. var src = Observable.Create<int>(obs =>
  29. {
  30. Task.Run(() =>
  31. {
  32. evt.WaitOne();
  33. obs.OnNext(1);
  34. evt.WaitOne();
  35. obs.OnNext(2);
  36. evt.WaitOne();
  37. obs.OnCompleted();
  38. });
  39. return () => { };
  40. });
  41. var res = src.Next().GetEnumerator();
  42. void release() => Task.Run(async () =>
  43. {
  44. await Task.Delay(250);
  45. evt.Set();
  46. });
  47. release();
  48. Assert.True(res.MoveNext());
  49. Assert.Equal(1, res.Current);
  50. release();
  51. Assert.True(res.MoveNext());
  52. Assert.Equal(2, res.Current);
  53. release();
  54. Assert.False(res.MoveNext());
  55. }
  56. [Fact]
  57. public void Next2()
  58. {
  59. var scheduler = new TestScheduler();
  60. var xs = scheduler.CreateHotObservable<int>(
  61. OnNext(210, 1),
  62. OnNext(220, 2),
  63. OnNext(230, 3),
  64. OnNext(240, 4),
  65. OnNext(250, 5),
  66. OnNext(260, 6),
  67. OnNext(270, 7),
  68. OnNext(280, 8),
  69. OnNext(290, 9),
  70. OnCompleted<int>(300)
  71. );
  72. var res = xs.Next();
  73. var e1 = default(IEnumerator<int>);
  74. scheduler.ScheduleAbsolute(200, () =>
  75. {
  76. e1 = res.GetEnumerator();
  77. });
  78. scheduler.ScheduleAbsolute(285, () => e1.Dispose());
  79. var e2 = default(IEnumerator);
  80. scheduler.ScheduleAbsolute(255, () =>
  81. {
  82. e2 = ((IEnumerable)res).GetEnumerator();
  83. });
  84. scheduler.Start();
  85. xs.Subscriptions.AssertEqual(
  86. Subscribe(200, 285),
  87. Subscribe(255, 300)
  88. );
  89. }
  90. [Fact]
  91. public void Next_DoesNotBlock()
  92. {
  93. var evt = new ManualResetEvent(false);
  94. var xs = Observable.Empty<int>().Do(_ => { }, () => evt.Set());
  95. var e = xs.Next().GetEnumerator();
  96. evt.WaitOne();
  97. Assert.False(e.MoveNext());
  98. }
  99. [Fact]
  100. public void Next_SomeResults()
  101. {
  102. var xs = Observable.Range(0, 100, Scheduler.Default);
  103. var res = xs.Next().ToList();
  104. Assert.True(res.All(x => x < 100));
  105. Assert.True(res.Count == res.Distinct().Count());
  106. }
  107. #if !NO_THREAD
  108. [Fact]
  109. public void Next_Error()
  110. {
  111. var ex = new Exception();
  112. var evt = new AutoResetEvent(false);
  113. var src = Observable.Create<int>(obs =>
  114. {
  115. new Thread(() =>
  116. {
  117. evt.WaitOne();
  118. obs.OnNext(1);
  119. evt.WaitOne();
  120. obs.OnError(ex);
  121. }).Start();
  122. return () => { };
  123. });
  124. var res = src.Next().GetEnumerator();
  125. void release() => new Thread(() =>
  126. {
  127. Thread.Sleep(250);
  128. evt.Set();
  129. }).Start();
  130. release();
  131. Assert.True(res.MoveNext());
  132. Assert.Equal(1, res.Current);
  133. release();
  134. ReactiveAssert.Throws(ex, () => res.MoveNext());
  135. }
  136. #endif
  137. }
  138. }