123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Text;
- using Xunit;
- using System.Collections;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Diagnostics;
- namespace Tests
- {
- public partial class AsyncTests
- {
- public AsyncTests()
- {
- TaskScheduler.UnobservedTaskException += (o, e) =>
- {
- };
- }
- /*
- [Fact]
- public void TestPushPopAsync()
- {
- var stack = new Stack<int>();
- var count = 10;
- var observable = Observable.Generate(
- 0,
- i => i < count,
- i => i + 1,
- i => i,
- i => TimeSpan.FromMilliseconds(1), // change this to 0 to avoid the problem [1]
- Scheduler.ThreadPool);
- var task = DoSomethingAsync(observable, stack);
- // we give it a timeout so the test can fail instead of hang
- task.Wait(TimeSpan.FromSeconds(2));
- Assert.Equal(10, stack.Count);
- }
- private Task DoSomethingAsync(IObservable<int> observable, Stack<int> stack)
- {
- var ae = observable
- .ToAsyncEnumerable()
- //.Do(i => Debug.WriteLine("Bug-fixing side effect: " + i)) // [2]
- .GetEnumerator();
- var tcs = new TaskCompletionSource<object>();
- var a = default(Action);
- a = new Action(() =>
- {
- ae.MoveNext().ContinueWith(t =>
- {
- if (t.Result)
- {
- var i = ae.Current;
- Debug.WriteLine("Doing something with " + i);
- Thread.Sleep(50);
- stack.Push(i);
- a();
- }
- else
- tcs.TrySetResult(null);
- });
- });
- a();
- return tcs.Task;
- }
- */
- #if !NO_THREAD
- static IEnumerable<int> Xs(Action a)
- {
- try
- {
- var rnd = new Random();
- while (true)
- {
- yield return rnd.Next(0, 43);
- Thread.Sleep(rnd.Next(0, 500));
- }
- }
- finally
- {
- a();
- }
- }
- #endif
- [Fact]
- public void CorrectDispose()
- {
- var disposed = new TaskCompletionSource<bool>();
- var xs = new[] { 1, 2, 3 }.WithDispose(() =>
- {
- disposed.TrySetResult(true);
- }).ToAsyncEnumerable();
- var ys = xs.Select(x => x + 1);
- var e = ys.GetEnumerator();
- e.Dispose();
- Assert.True(disposed.Task.Result);
- Assert.False(e.MoveNext().Result);
- }
- [Fact]
- public async Task DisposesUponError()
- {
- var disposed = new TaskCompletionSource<bool>();
- var xs = new[] { 1, 2, 3 }.WithDispose(() =>
- {
- disposed.SetResult(true);
- }).ToAsyncEnumerable();
- var ex = new Exception("Bang!");
- var ys = xs.Select(x => { if (x == 1) throw ex; return x; });
- var e = ys.GetEnumerator();
- await Assert.ThrowsAsync<Exception>(() => e.MoveNext());
- Assert.True(disposed.Task.Result);
- }
- [Fact]
- public void CorrectCancel()
- {
- var disposed = new TaskCompletionSource<bool>();
- var xs = new[] { 1, 2, 3 }.WithDispose(() =>
- {
- disposed.SetResult(true);
- }).ToAsyncEnumerable();
- var ys = xs.Select(x => x + 1).Where(x => true);
- var e = ys.GetEnumerator();
- var cts = new CancellationTokenSource();
- var t = e.MoveNext(cts.Token);
- cts.Cancel();
- try
- {
- t.Wait(WaitTimeoutMs);
- }
- catch
- {
- // Don't care about the outcome; we could have made it to element 1
- // but we could also have cancelled the MoveNext-calling task. Either
- // way, we want to wait for the task to be completed and check that
- }
- finally
- {
- // the cancellation bubbled all the way up to the source to dispose
- // it. This design is chosen because cancelling a MoveNext call leaves
- // the enumerator in an indeterminate state. Further interactions with
- // it should be forbidden.
- Assert.True(disposed.Task.Result);
- }
- Assert.False(e.MoveNext().Result);
- }
- [Fact]
- public void CanCancelMoveNext()
- {
- var xs = new CancellationTestEnumerable().Select(x => x).Where(x => true);
- var e = xs.GetEnumerator();
- var cts = new CancellationTokenSource();
- var t = e.MoveNext(cts.Token);
- cts.Cancel();
- try
- {
- t.Wait(WaitTimeoutMs);
- Assert.True(false);
- }
- catch
- {
- Assert.True(t.IsCanceled);
- }
- }
- /// <summary>
- /// Waits WaitTimeoutMs or until cancellation is requested. If cancellation was not requested, MoveNext returns true.
- /// </summary>
- private sealed class CancellationTestEnumerable : IAsyncEnumerable<object>
- {
- public IAsyncEnumerator<object> GetEnumerator() => new TestEnumerator();
- private sealed class TestEnumerator : IAsyncEnumerator<object>
- {
- public void Dispose()
- {
- }
-
- public object Current { get; }
-
- public async Task<bool> MoveNext(CancellationToken cancellationToken)
- {
- await Task.Delay(WaitTimeoutMs, cancellationToken);
- cancellationToken.ThrowIfCancellationRequested();
- return true;
- }
- }
- }
- [Fact]
- public void ToAsyncEnumeratorCannotCancelOnceRunning()
- {
- var evt = new ManualResetEvent(false);
- var isRunningEvent = new ManualResetEvent(false);
- var xs = Blocking(evt, isRunningEvent).ToAsyncEnumerable();
- var e = xs.GetEnumerator();
- var cts = new CancellationTokenSource();
- var t = e.MoveNext(cts.Token);
- isRunningEvent.WaitOne();
- cts.Cancel();
- try
- {
- t.Wait(0);
- Assert.False(t.IsCanceled);
- }
- catch
- {
- Assert.False(true);
- }
- evt.Set();
- }
- static IEnumerable<int> Blocking(ManualResetEvent evt, ManualResetEvent blockingStarted)
- {
- blockingStarted.Set();
- evt.WaitOne();
- yield return 42;
- }
- [Fact]
- public void TakeOneFromSelectMany()
- {
- var enumerable = AsyncEnumerable
- .Return(0)
- .SelectMany(_ => AsyncEnumerable.Return("Check"))
- .Take(1)
- .Do(_ => { });
- Assert.Equal("Check", enumerable.First().Result);
- }
- [Fact]
- public void SelectManyDisposeInvokedOnlyOnce()
- {
- var disposeCounter = new DisposeCounter();
- var result = AsyncEnumerable.Return(1).SelectMany(i => disposeCounter).Select(i => i).ToList().Result;
- Assert.Equal(0, result.Count);
- Assert.Equal(1, disposeCounter.DisposeCount);
- }
- [Fact]
- public void SelectManyInnerDispose()
- {
- var disposes = Enumerable.Range(0, 10).Select(_ => new DisposeCounter()).ToList();
- var result = AsyncEnumerable.Range(0, 10).SelectMany(i => disposes[i]).Select(i => i).ToList().Result;
- Assert.Equal(0, result.Count);
- Assert.True(disposes.All(d => d.DisposeCount == 1));
- }
- private class DisposeCounter : IAsyncEnumerable<object>
- {
- public int DisposeCount { get; private set; }
- public IAsyncEnumerator<object> GetEnumerator()
- {
- return new Enumerator(this);
- }
- private class Enumerator : IAsyncEnumerator<object>
- {
- private readonly DisposeCounter _disposeCounter;
- public Enumerator(DisposeCounter disposeCounter)
- {
- _disposeCounter = disposeCounter;
- }
- public void Dispose()
- {
- _disposeCounter.DisposeCount++;
- }
- public Task<bool> MoveNext(CancellationToken _)
- {
- return Task.Factory.StartNew(() => false);
- }
- public object Current { get; private set; }
- }
- }
- }
- static class MyExt
- {
- public static IEnumerable<T> WithDispose<T>(this IEnumerable<T> source, Action a)
- {
- return EnumerableEx.Create(() =>
- {
- var e = source.GetEnumerator();
- return new Enumerator<T>(e.MoveNext, () => e.Current, () => { e.Dispose(); a(); });
- });
- }
- class Enumerator<T> : IEnumerator<T>
- {
- private readonly Func<bool> _moveNext;
- private readonly Func<T> _current;
- private readonly Action _dispose;
- public Enumerator(Func<bool> moveNext, Func<T> current, Action dispose)
- {
- _moveNext = moveNext;
- _current = current;
- _dispose = dispose;
- }
- public T Current
- {
- get { return _current(); }
- }
- public void Dispose()
- {
- _dispose();
- }
- object IEnumerator.Current
- {
- get { return Current; }
- }
- public bool MoveNext()
- {
- return _moveNext();
- }
- public void Reset()
- {
- throw new NotImplementedException();
- }
- }
- }
- }
|