ChunkifyTest.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class ChunkifyTest : ReactiveTest
  19. {
  20. [Fact]
  21. public void Chunkify_ArgumentChecking()
  22. {
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Chunkify(default(IObservable<int>)));
  24. }
  25. [Fact]
  26. public void Chunkify_Regular1()
  27. {
  28. var scheduler = new TestScheduler();
  29. var xs = scheduler.CreateHotObservable(
  30. OnNext(100, 1),
  31. OnNext(200, 2),
  32. OnNext(300, 3),
  33. OnNext(400, 4),
  34. OnNext(500, 5),
  35. OnNext(600, 6),
  36. OnNext(700, 7),
  37. OnNext(800, 8),
  38. OnCompleted<int>(900)
  39. );
  40. var ys = xs.Chunkify();
  41. var e = default(IEnumerator<IList<int>>);
  42. var res = new List<IList<int>>();
  43. var log = new Action(() =>
  44. {
  45. Assert.True(e.MoveNext());
  46. res.Add(e.Current);
  47. });
  48. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  49. scheduler.ScheduleAbsolute(270, log);
  50. scheduler.ScheduleAbsolute(310, log);
  51. scheduler.ScheduleAbsolute(450, log);
  52. scheduler.ScheduleAbsolute(470, log);
  53. scheduler.ScheduleAbsolute(750, log);
  54. scheduler.ScheduleAbsolute(850, log);
  55. scheduler.ScheduleAbsolute(950, log);
  56. scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext()));
  57. scheduler.Start();
  58. xs.Subscriptions.AssertEqual(
  59. Subscribe(250, 900)
  60. );
  61. Assert.Equal(7, res.Count);
  62. Assert.True(res[0].SequenceEqual(new int[] { }));
  63. Assert.True(res[1].SequenceEqual(new int[] { 3 }));
  64. Assert.True(res[2].SequenceEqual(new int[] { 4 }));
  65. Assert.True(res[3].SequenceEqual(new int[] { }));
  66. Assert.True(res[4].SequenceEqual(new int[] { 5, 6, 7 }));
  67. Assert.True(res[5].SequenceEqual(new int[] { 8 }));
  68. Assert.True(res[6].SequenceEqual(new int[] { }));
  69. }
  70. [Fact]
  71. public void Chunkify_Regular2()
  72. {
  73. var scheduler = new TestScheduler();
  74. var xs = scheduler.CreateHotObservable(
  75. OnNext(100, 1),
  76. OnNext(200, 2),
  77. OnNext(300, 3),
  78. OnNext(400, 4),
  79. OnNext(500, 5),
  80. OnNext(600, 6),
  81. OnNext(700, 7),
  82. OnNext(800, 8),
  83. OnCompleted<int>(900)
  84. );
  85. var ys = xs.Chunkify();
  86. var e = default(IEnumerator<IList<int>>);
  87. var res = new List<IList<int>>();
  88. var log = new Action(() =>
  89. {
  90. Assert.True(e.MoveNext());
  91. res.Add(e.Current);
  92. });
  93. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  94. scheduler.ScheduleAbsolute(550, log);
  95. scheduler.ScheduleAbsolute(950, log);
  96. scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext()));
  97. scheduler.Start();
  98. xs.Subscriptions.AssertEqual(
  99. Subscribe(250, 900)
  100. );
  101. Assert.Equal(2, res.Count);
  102. Assert.True(res[0].SequenceEqual(new int[] { 3, 4, 5 }));
  103. Assert.True(res[1].SequenceEqual(new int[] { 6, 7, 8 }));
  104. }
  105. [Fact]
  106. public void Chunkify_Error()
  107. {
  108. var ex = new Exception();
  109. var scheduler = new TestScheduler();
  110. var xs = scheduler.CreateHotObservable(
  111. OnNext(100, 1),
  112. OnNext(200, 2),
  113. OnNext(300, 3),
  114. OnNext(400, 4),
  115. OnNext(500, 5),
  116. OnNext(600, 6),
  117. OnError<int>(700, ex)
  118. );
  119. var ys = xs.Chunkify();
  120. var e = default(IEnumerator<IList<int>>);
  121. var res = new List<IList<int>>();
  122. var log = new Action(() =>
  123. {
  124. Assert.True(e.MoveNext());
  125. res.Add(e.Current);
  126. });
  127. scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); });
  128. scheduler.ScheduleAbsolute(270, log);
  129. scheduler.ScheduleAbsolute(310, log);
  130. scheduler.ScheduleAbsolute(450, log);
  131. scheduler.ScheduleAbsolute(470, log);
  132. scheduler.ScheduleAbsolute(750, () =>
  133. {
  134. try
  135. {
  136. e.MoveNext();
  137. Assert.True(false);
  138. }
  139. catch (Exception error)
  140. {
  141. Assert.Same(ex, error);
  142. }
  143. });
  144. scheduler.Start();
  145. xs.Subscriptions.AssertEqual(
  146. Subscribe(250, 700)
  147. );
  148. Assert.Equal(4, res.Count);
  149. Assert.True(res[0].SequenceEqual(new int[] { }));
  150. Assert.True(res[1].SequenceEqual(new int[] { 3 }));
  151. Assert.True(res[2].SequenceEqual(new int[] { 4 }));
  152. Assert.True(res[3].SequenceEqual(new int[] { }));
  153. }
  154. }
  155. }