AsyncTests.Bugs.cs 14 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections;
  6. using System.Collections.Generic;
  7. using System.Linq;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. using Xunit;
  11. namespace Tests
  12. {
  13. public partial class AsyncTests
  14. {
  15. public AsyncTests()
  16. {
  17. TaskScheduler.UnobservedTaskException += (o, e) =>
  18. {
  19. };
  20. }
  21. /*
  22. [Fact]
  23. public void TestPushPopAsync()
  24. {
  25. var stack = new Stack<int>();
  26. var count = 10;
  27. var observable = Observable.Generate(
  28. 0,
  29. i => i < count,
  30. i => i + 1,
  31. i => i,
  32. i => TimeSpan.FromMilliseconds(1), // change this to 0 to avoid the problem [1]
  33. Scheduler.ThreadPool);
  34. var task = DoSomethingAsync(observable, stack);
  35. // we give it a timeout so the test can fail instead of hang
  36. task.Wait(TimeSpan.FromSeconds(2));
  37. Assert.Equal(10, stack.Count);
  38. }
  39. private Task DoSomethingAsync(IObservable<int> observable, Stack<int> stack)
  40. {
  41. var ae = observable
  42. .ToAsyncEnumerable()
  43. //.Do(i => Debug.WriteLine("Bug-fixing side effect: " + i)) // [2]
  44. .GetEnumerator();
  45. var tcs = new TaskCompletionSource<object>();
  46. var a = default(Action);
  47. a = new Action(() =>
  48. {
  49. ae.MoveNext().ContinueWith(t =>
  50. {
  51. if (t.Result)
  52. {
  53. var i = ae.Current;
  54. Debug.WriteLine("Doing something with " + i);
  55. Thread.Sleep(50);
  56. stack.Push(i);
  57. a();
  58. }
  59. else
  60. tcs.TrySetResult(null);
  61. });
  62. });
  63. a();
  64. return tcs.Task;
  65. }
  66. */
  67. #if !NO_THREAD
  68. private static IEnumerable<int> Xs(Action a)
  69. {
  70. try
  71. {
  72. var rnd = new Random();
  73. while (true)
  74. {
  75. yield return rnd.Next(0, 43);
  76. Thread.Sleep(rnd.Next(0, 500));
  77. }
  78. }
  79. finally
  80. {
  81. a();
  82. }
  83. }
  84. #endif
  85. [Fact]
  86. public async void CorrectDispose()
  87. {
  88. var disposed = new TaskCompletionSource<bool>();
  89. var xs = new[] { 1, 2, 3 }.WithDispose(() =>
  90. {
  91. disposed.TrySetResult(true);
  92. }).ToAsyncEnumerable();
  93. var ys = xs.Select(x => x + 1);
  94. var e = ys.GetEnumerator();
  95. // We have to call move next because otherwise the internal enumerator is never allocated
  96. await e.MoveNext();
  97. e.Dispose();
  98. await disposed.Task;
  99. Assert.True(disposed.Task.Result);
  100. Assert.False(e.MoveNext().Result);
  101. var next = await e.MoveNext();
  102. Assert.False(next);
  103. }
  104. [Fact]
  105. public async Task DisposesUponError()
  106. {
  107. var disposed = new TaskCompletionSource<bool>();
  108. var xs = new[] { 1, 2, 3 }.WithDispose(() =>
  109. {
  110. disposed.SetResult(true);
  111. }).ToAsyncEnumerable();
  112. var ex = new Exception("Bang!");
  113. var ys = xs.Select(x => { if (x == 1) { throw ex; } return x; });
  114. var e = ys.GetEnumerator();
  115. await Assert.ThrowsAsync<Exception>(() => e.MoveNext());
  116. var result = await disposed.Task;
  117. Assert.True(result);
  118. }
  119. [Fact]
  120. public async Task CorrectCancel()
  121. {
  122. var disposed = new TaskCompletionSource<bool>();
  123. var xs = new CancellationTestAsyncEnumerable().WithDispose(() =>
  124. {
  125. disposed.TrySetResult(true);
  126. });
  127. var ys = xs.Select(x => x + 1).Where(x => true);
  128. var e = ys.GetEnumerator();
  129. var cts = new CancellationTokenSource();
  130. var t = e.MoveNext(cts.Token);
  131. cts.Cancel();
  132. try
  133. {
  134. t.Wait(WaitTimeoutMs);
  135. }
  136. catch
  137. {
  138. // Don't care about the outcome; we could have made it to element 1
  139. // but we could also have cancelled the MoveNext-calling task. Either
  140. // way, we want to wait for the task to be completed and check that
  141. }
  142. finally
  143. {
  144. // the cancellation bubbled all the way up to the source to dispose
  145. // it. This design is chosen because cancelling a MoveNext call leaves
  146. // the enumerator in an indeterminate state. Further interactions with
  147. // it should be forbidden.
  148. var result = await disposed.Task;
  149. Assert.True(result);
  150. }
  151. Assert.False(await e.MoveNext());
  152. }
  153. [Fact]
  154. public void CanCancelMoveNext()
  155. {
  156. var xs = new CancellationTestAsyncEnumerable().Select(x => x).Where(x => true);
  157. var e = xs.GetEnumerator();
  158. var cts = new CancellationTokenSource();
  159. var t = e.MoveNext(cts.Token);
  160. cts.Cancel();
  161. try
  162. {
  163. t.Wait(WaitTimeoutMs);
  164. Assert.True(false);
  165. }
  166. catch
  167. {
  168. Assert.True(t.IsCanceled);
  169. }
  170. }
  171. /// <summary>
  172. /// Waits WaitTimeoutMs or until cancellation is requested. If cancellation was not requested, MoveNext returns true.
  173. /// </summary>
  174. internal sealed class CancellationTestAsyncEnumerable : IAsyncEnumerable<int>
  175. {
  176. private readonly int _iterationsBeforeDelay;
  177. public CancellationTestAsyncEnumerable(int iterationsBeforeDelay = 0)
  178. {
  179. _iterationsBeforeDelay = iterationsBeforeDelay;
  180. }
  181. IAsyncEnumerator<int> IAsyncEnumerable<int>.GetEnumerator() => GetEnumerator();
  182. public TestEnumerator GetEnumerator() => new TestEnumerator(_iterationsBeforeDelay);
  183. internal sealed class TestEnumerator : IAsyncEnumerator<int>
  184. {
  185. private readonly int _iterationsBeforeDelay;
  186. public TestEnumerator(int iterationsBeforeDelay)
  187. {
  188. _iterationsBeforeDelay = iterationsBeforeDelay;
  189. }
  190. private int _i = -1;
  191. public void Dispose()
  192. {
  193. }
  194. public CancellationToken LastToken { get; private set; }
  195. public bool MoveNextWasCalled { get; private set; }
  196. public int Current => _i;
  197. public async Task<bool> MoveNext(CancellationToken cancellationToken)
  198. {
  199. LastToken = cancellationToken;
  200. MoveNextWasCalled = true;
  201. _i++;
  202. if (Current >= _iterationsBeforeDelay)
  203. {
  204. await Task.Delay(WaitTimeoutMs, cancellationToken);
  205. }
  206. cancellationToken.ThrowIfCancellationRequested();
  207. return true;
  208. }
  209. }
  210. }
  211. /// <summary>
  212. /// Waits WaitTimeoutMs or until cancellation is requested. If cancellation was not requested, MoveNext returns true.
  213. /// </summary>
  214. private sealed class CancellationTestEnumerable<T> : IEnumerable<T>
  215. {
  216. public CancellationTestEnumerable()
  217. {
  218. }
  219. public IEnumerator<T> GetEnumerator() => new TestEnumerator();
  220. private sealed class TestEnumerator : IEnumerator<T>
  221. {
  222. private readonly CancellationTokenSource _cancellationTokenSource;
  223. public TestEnumerator()
  224. {
  225. _cancellationTokenSource = new CancellationTokenSource();
  226. }
  227. public void Dispose()
  228. {
  229. _cancellationTokenSource.Cancel();
  230. }
  231. public void Reset()
  232. {
  233. }
  234. object IEnumerator.Current => Current;
  235. public T Current { get; }
  236. public bool MoveNext()
  237. {
  238. Task.Delay(WaitTimeoutMs, _cancellationTokenSource.Token).Wait();
  239. _cancellationTokenSource.Token.ThrowIfCancellationRequested();
  240. return true;
  241. }
  242. }
  243. IEnumerator IEnumerable.GetEnumerator()
  244. {
  245. return GetEnumerator();
  246. }
  247. }
  248. [Fact]
  249. public void ToAsyncEnumeratorCannotCancelOnceRunning()
  250. {
  251. var evt = new ManualResetEvent(false);
  252. var isRunningEvent = new ManualResetEvent(false);
  253. var xs = Blocking(evt, isRunningEvent).ToAsyncEnumerable();
  254. var e = xs.GetEnumerator();
  255. var cts = new CancellationTokenSource();
  256. Task<bool> t = null;
  257. var tMoveNext = Task.Run(
  258. () =>
  259. {
  260. // This call *will* block
  261. t = e.MoveNext(cts.Token);
  262. });
  263. isRunningEvent.WaitOne();
  264. cts.Cancel();
  265. try
  266. {
  267. tMoveNext.Wait(0);
  268. Assert.False(t.IsCanceled);
  269. }
  270. catch
  271. {
  272. // T will still be null
  273. Assert.Null(t);
  274. }
  275. // enable it to finish
  276. evt.Set();
  277. }
  278. private static IEnumerable<int> Blocking(ManualResetEvent evt, ManualResetEvent blockingStarted)
  279. {
  280. blockingStarted.Set();
  281. evt.WaitOne();
  282. yield return 42;
  283. }
  284. [Fact]
  285. public async Task TakeOneFromSelectMany()
  286. {
  287. var enumerable = AsyncEnumerable
  288. .Return(0)
  289. .SelectMany(_ => AsyncEnumerable.Return("Check"))
  290. .Take(1)
  291. .Do(_ => { });
  292. Assert.Equal("Check", await enumerable.First());
  293. }
  294. [Fact]
  295. public void SelectManyDisposeInvokedOnlyOnce()
  296. {
  297. var disposeCounter = new DisposeCounter();
  298. var result = AsyncEnumerable.Return(1).SelectMany(i => disposeCounter).Select(i => i).ToList().Result;
  299. Assert.Empty(result);
  300. Assert.Equal(1, disposeCounter.DisposeCount);
  301. }
  302. [Fact]
  303. public void SelectManyInnerDispose()
  304. {
  305. var disposes = Enumerable.Range(0, 10).Select(_ => new DisposeCounter()).ToList();
  306. var result = AsyncEnumerable.Range(0, 10).SelectMany(i => disposes[i]).Select(i => i).ToList().Result;
  307. Assert.Empty(result);
  308. Assert.True(disposes.All(d => d.DisposeCount == 1));
  309. }
  310. [Fact]
  311. public void DisposeAfterCreation()
  312. {
  313. var enumerable = AsyncEnumerable.Return(0) as IDisposable;
  314. enumerable?.Dispose();
  315. }
  316. private class DisposeCounter : IAsyncEnumerable<object>
  317. {
  318. public int DisposeCount { get; private set; }
  319. public IAsyncEnumerator<object> GetEnumerator()
  320. {
  321. return new Enumerator(this);
  322. }
  323. private class Enumerator : IAsyncEnumerator<object>
  324. {
  325. private readonly DisposeCounter _disposeCounter;
  326. public Enumerator(DisposeCounter disposeCounter)
  327. {
  328. _disposeCounter = disposeCounter;
  329. }
  330. public void Dispose()
  331. {
  332. _disposeCounter.DisposeCount++;
  333. }
  334. public Task<bool> MoveNext(CancellationToken _)
  335. {
  336. return Task.Factory.StartNew(() => false);
  337. }
  338. public object Current { get; private set; }
  339. }
  340. }
  341. }
  342. internal static class MyExt
  343. {
  344. public static IEnumerable<T> WithDispose<T>(this IEnumerable<T> source, Action a)
  345. {
  346. return EnumerableEx.Create(() =>
  347. {
  348. var e = source.GetEnumerator();
  349. return new Enumerator<T>(e.MoveNext, () => e.Current, () => { e.Dispose(); a(); });
  350. });
  351. }
  352. public static IAsyncEnumerable<T> WithDispose<T>(this IAsyncEnumerable<T> source, Action a)
  353. {
  354. return AsyncEnumerable.CreateEnumerable<T>(() =>
  355. {
  356. var e = source.GetEnumerator();
  357. return AsyncEnumerable.CreateEnumerator<T>(e.MoveNext, () => e.Current, () => { e.Dispose(); a(); });
  358. });
  359. }
  360. private class Enumerator<T> : IEnumerator<T>
  361. {
  362. private readonly Func<bool> _moveNext;
  363. private readonly Func<T> _current;
  364. private readonly Action _dispose;
  365. public Enumerator(Func<bool> moveNext, Func<T> current, Action dispose)
  366. {
  367. _moveNext = moveNext;
  368. _current = current;
  369. _dispose = dispose;
  370. }
  371. public T Current
  372. {
  373. get { return _current(); }
  374. }
  375. public void Dispose()
  376. {
  377. _dispose();
  378. }
  379. object IEnumerator.Current
  380. {
  381. get { return Current; }
  382. }
  383. public bool MoveNext()
  384. {
  385. return _moveNext();
  386. }
  387. public void Reset()
  388. {
  389. throw new NotImplementedException();
  390. }
  391. }
  392. }
  393. }