LatestTest.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Linq;
  9. using System.Threading;
  10. using System.Threading.Tasks;
  11. using Microsoft.Reactive.Testing;
  12. using Microsoft.VisualStudio.TestTools.UnitTesting;
  13. using Assert = Xunit.Assert;
  14. namespace ReactiveTests.Tests
  15. {
  16. [TestClass]
  17. public class LatestTest : ReactiveTest
  18. {
  19. [TestMethod]
  20. public void Latest_ArgumentChecking()
  21. {
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Latest(default(IObservable<int>)));
  23. }
  24. [TestMethod]
  25. public void Latest1()
  26. {
  27. var disposed = false;
  28. var evt = new AutoResetEvent(false);
  29. var src = Observable.Create<int>(obs =>
  30. {
  31. Task.Run(() =>
  32. {
  33. evt.WaitOne();
  34. obs.OnNext(1);
  35. evt.WaitOne();
  36. obs.OnNext(2);
  37. evt.WaitOne();
  38. obs.OnCompleted();
  39. });
  40. return () => { disposed = true; };
  41. });
  42. var res = src.Latest().GetEnumerator();
  43. Task.Run(async () =>
  44. {
  45. await Task.Delay(250);
  46. evt.Set();
  47. });
  48. Assert.True(res.MoveNext());
  49. Assert.Equal(1, res.Current);
  50. evt.Set();
  51. Assert.True(res.MoveNext());
  52. Assert.Equal(2, ((IEnumerator)res).Current);
  53. evt.Set();
  54. Assert.False(res.MoveNext());
  55. ReactiveAssert.Throws<NotSupportedException>(() => res.Reset());
  56. res.Dispose();
  57. //ReactiveAssert.Throws<ObjectDisposedException>(() => res.MoveNext());
  58. Assert.True(disposed);
  59. }
  60. [TestMethod]
  61. public void Latest2()
  62. {
  63. var scheduler = new TestScheduler();
  64. var xs = scheduler.CreateHotObservable(
  65. OnNext(210, 1),
  66. OnNext(220, 2),
  67. OnNext(230, 3),
  68. OnNext(240, 4),
  69. OnNext(250, 5),
  70. OnNext(260, 6),
  71. OnNext(270, 7),
  72. OnNext(280, 8),
  73. OnNext(290, 9),
  74. OnCompleted<int>(300)
  75. );
  76. var res = xs.Latest();
  77. var e1 = default(IEnumerator<int>);
  78. scheduler.ScheduleAbsolute(205, () =>
  79. {
  80. e1 = res.GetEnumerator();
  81. });
  82. var o1 = new List<int>();
  83. scheduler.ScheduleAbsolute(235, () =>
  84. {
  85. Assert.True(e1.MoveNext());
  86. o1.Add(e1.Current);
  87. });
  88. scheduler.ScheduleAbsolute(265, () =>
  89. {
  90. Assert.True(e1.MoveNext());
  91. o1.Add(e1.Current);
  92. });
  93. scheduler.ScheduleAbsolute(285, () => e1.Dispose());
  94. var e2 = default(IEnumerator);
  95. scheduler.ScheduleAbsolute(255, () =>
  96. {
  97. e2 = ((IEnumerable)res).GetEnumerator();
  98. });
  99. var o2 = new List<int>();
  100. scheduler.ScheduleAbsolute(265, () =>
  101. {
  102. Assert.True(e2.MoveNext());
  103. o2.Add((int)e2.Current);
  104. });
  105. scheduler.ScheduleAbsolute(275, () =>
  106. {
  107. Assert.True(e2.MoveNext());
  108. o2.Add((int)e2.Current);
  109. });
  110. scheduler.Start();
  111. xs.Subscriptions.AssertEqual(
  112. Subscribe(205, 285),
  113. Subscribe(255, 300)
  114. );
  115. o1.AssertEqual(3, 6);
  116. o2.AssertEqual(6, 7);
  117. }
  118. [TestMethod]
  119. public void Latest_Error()
  120. {
  121. SynchronizationContext.SetSynchronizationContext(null);
  122. var ex = new Exception();
  123. var evt = new AutoResetEvent(false);
  124. var src = Observable.Create<int>(obs =>
  125. {
  126. Task.Run(() =>
  127. {
  128. evt.WaitOne();
  129. obs.OnNext(1);
  130. evt.WaitOne();
  131. obs.OnError(ex);
  132. });
  133. return () => { };
  134. });
  135. var res = src.Latest().GetEnumerator();
  136. Task.Run(async () =>
  137. {
  138. await Task.Delay(250);
  139. evt.Set();
  140. });
  141. Assert.True(res.MoveNext());
  142. Assert.Equal(1, res.Current);
  143. evt.Set();
  144. ReactiveAssert.Throws(ex, () => res.MoveNext());
  145. }
  146. }
  147. }