ToObservable.cs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. using Xunit;
  10. namespace Tests
  11. {
  12. public class ToObservable : AsyncEnumerableTests
  13. {
  14. [Fact]
  15. public void ToObservable_Null()
  16. {
  17. Assert.Throws<ArgumentNullException>(() => AsyncEnumerable.ToObservable<int>(null));
  18. }
  19. [Fact]
  20. public void ToObservable1()
  21. {
  22. using var evt = new ManualResetEvent(false);
  23. var fail = false;
  24. var xs = AsyncEnumerable.Empty<int>().ToObservable();
  25. xs.Subscribe(new MyObserver<int>(
  26. x =>
  27. {
  28. fail = true;
  29. },
  30. ex =>
  31. {
  32. fail = true;
  33. evt.Set();
  34. },
  35. () =>
  36. {
  37. evt.Set();
  38. }
  39. ));
  40. evt.WaitOne();
  41. Assert.False(fail);
  42. }
  43. [Fact]
  44. public void ToObservable2()
  45. {
  46. using var evt = new ManualResetEvent(false);
  47. var lst = new List<int>();
  48. var fail = false;
  49. var xs = Return42.ToObservable();
  50. xs.Subscribe(new MyObserver<int>(
  51. x =>
  52. {
  53. lst.Add(x);
  54. },
  55. ex =>
  56. {
  57. fail = true;
  58. evt.Set();
  59. },
  60. () =>
  61. {
  62. evt.Set();
  63. }
  64. ));
  65. evt.WaitOne();
  66. Assert.False(fail);
  67. Assert.True(lst.SequenceEqual([42]));
  68. }
  69. [Fact]
  70. public void ToObservable3()
  71. {
  72. using var evt = new ManualResetEvent(false);
  73. var lst = new List<int>();
  74. var fail = false;
  75. var xs = AsyncEnumerable.Range(0, 10).ToObservable();
  76. xs.Subscribe(new MyObserver<int>(
  77. x =>
  78. {
  79. lst.Add(x);
  80. },
  81. ex =>
  82. {
  83. fail = true;
  84. evt.Set();
  85. },
  86. () =>
  87. {
  88. evt.Set();
  89. }
  90. ));
  91. evt.WaitOne();
  92. Assert.False(fail);
  93. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  94. }
  95. [Fact]
  96. public void ToObservable_ThrowOnMoveNext()
  97. {
  98. using var evt = new ManualResetEvent(false);
  99. var ex1 = new Exception("Bang!");
  100. var ex_ = default(Exception);
  101. var fail = false;
  102. var xs = Throw<int>(ex1).ToObservable();
  103. xs.Subscribe(new MyObserver<int>(
  104. x =>
  105. {
  106. fail = true;
  107. },
  108. ex =>
  109. {
  110. ex_ = ex;
  111. evt.Set();
  112. },
  113. () =>
  114. {
  115. fail = true;
  116. evt.Set();
  117. }
  118. ));
  119. evt.WaitOne();
  120. Assert.False(fail);
  121. Assert.Equal(ex1, ex_);
  122. }
  123. [Fact]
  124. public void ToObservable_ThrowOnCurrent()
  125. {
  126. var ex1 = new Exception("Bang!");
  127. var ex_ = default(Exception);
  128. var fail = false;
  129. var ae = AsyncEnumerable.Create(
  130. _ => new ThrowOnCurrentAsyncEnumerator(ex1)
  131. );
  132. ae.ToObservable()
  133. .Subscribe(new MyObserver<int>(
  134. x =>
  135. {
  136. fail = true;
  137. },
  138. ex =>
  139. {
  140. ex_ = ex;
  141. },
  142. () =>
  143. {
  144. fail = true;
  145. }
  146. ));
  147. Assert.False(fail);
  148. Assert.Equal(ex1, ex_);
  149. }
  150. [Fact]
  151. public void ToObservable_DisposesEnumeratorOnCompletion()
  152. {
  153. using var evt = new ManualResetEvent(false);
  154. var fail = false;
  155. var ae = AsyncEnumerable.Create(
  156. _ => AsyncEnumerator.Create<int>(
  157. () => new ValueTask<bool>(false),
  158. () => { throw new InvalidOperationException(); },
  159. () => { evt.Set(); return default; }));
  160. ae
  161. .ToObservable()
  162. .Subscribe(new MyObserver<int>(
  163. x =>
  164. {
  165. fail = true;
  166. },
  167. ex =>
  168. {
  169. fail = true;
  170. },
  171. () =>
  172. {
  173. }
  174. ));
  175. evt.WaitOne();
  176. Assert.False(fail);
  177. }
  178. [Fact]
  179. public void ToObservable_DisposesEnumeratorWhenSubscriptionIsDisposed()
  180. {
  181. using var evt = new ManualResetEvent(false);
  182. var fail = false;
  183. var subscription = default(IDisposable);
  184. var subscriptionAssignedTcs = new TaskCompletionSource<object>();
  185. var ae = AsyncEnumerable.Create(
  186. _ => AsyncEnumerator.Create(
  187. async () =>
  188. {
  189. await subscriptionAssignedTcs.Task;
  190. return true;
  191. },
  192. () => 1,
  193. () =>
  194. {
  195. evt.Set();
  196. return default;
  197. }));
  198. subscription = ae
  199. .ToObservable()
  200. .Subscribe(new MyObserver<int>(
  201. x =>
  202. {
  203. Assert.NotNull(subscription);
  204. subscription!.Dispose();
  205. },
  206. ex =>
  207. {
  208. fail = true;
  209. },
  210. () =>
  211. {
  212. fail = true;
  213. }
  214. ));
  215. subscriptionAssignedTcs.SetResult(null);
  216. evt.WaitOne();
  217. Assert.False(fail);
  218. }
  219. [Fact]
  220. public void ToObservable_DesNotCallMoveNextAgainWhenSubscriptionIsDisposed()
  221. {
  222. using var evt = new ManualResetEvent(false);
  223. var fail = false;
  224. var moveNextCount = 0;
  225. var subscription = default(IDisposable);
  226. var subscriptionAssignedTcs = new TaskCompletionSource<object>();
  227. var ae = AsyncEnumerable.Create(
  228. _ => AsyncEnumerator.Create(
  229. async () =>
  230. {
  231. await subscriptionAssignedTcs.Task;
  232. moveNextCount++;
  233. return true;
  234. },
  235. () => 1,
  236. () =>
  237. {
  238. evt.Set();
  239. return default;
  240. }));
  241. subscription = ae
  242. .ToObservable()
  243. .Subscribe(new MyObserver<int>(
  244. x =>
  245. {
  246. Assert.NotNull(subscription);
  247. subscription!.Dispose();
  248. },
  249. ex =>
  250. {
  251. fail = true;
  252. },
  253. () =>
  254. {
  255. fail = true;
  256. }
  257. ));
  258. subscriptionAssignedTcs.SetResult(null);
  259. evt.WaitOne();
  260. Assert.Equal(1, moveNextCount);
  261. Assert.False(fail);
  262. }
  263. [Fact]
  264. public void ToObservable_SupportsLargeEnumerable()
  265. {
  266. using var evt = new ManualResetEvent(false);
  267. var fail = false;
  268. var xs = AsyncEnumerable.Range(0, 10000).ToObservable();
  269. xs.Subscribe(new MyObserver<int>(
  270. x =>
  271. {
  272. // ok
  273. },
  274. ex =>
  275. {
  276. fail = true;
  277. evt.Set();
  278. },
  279. () =>
  280. {
  281. evt.Set();
  282. }
  283. ));
  284. evt.WaitOne();
  285. Assert.False(fail);
  286. }
  287. private sealed class MyObserver<T> : IObserver<T>
  288. {
  289. private readonly Action<T> _onNext;
  290. private readonly Action<Exception> _onError;
  291. private readonly Action _onCompleted;
  292. public MyObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  293. {
  294. _onNext = onNext;
  295. _onError = onError;
  296. _onCompleted = onCompleted;
  297. }
  298. public void OnCompleted() => _onCompleted();
  299. public void OnError(Exception error) => _onError(error);
  300. public void OnNext(T value) => _onNext(value);
  301. }
  302. private sealed class ThrowOnCurrentAsyncEnumerator : IAsyncEnumerator<int>
  303. {
  304. private readonly Exception _exception;
  305. public ThrowOnCurrentAsyncEnumerator(Exception ex)
  306. {
  307. _exception = ex;
  308. }
  309. public int Current => throw _exception;
  310. public ValueTask DisposeAsync() => default;
  311. public ValueTask<bool> MoveNextAsync() => new(true);
  312. }
  313. }
  314. }