Browse Source

Clean up conversions.

Bart De Smet 7 years ago
parent
commit
5838158e96

+ 3 - 3
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/AsyncEnumerableTests.cs

@@ -78,11 +78,11 @@ namespace Tests
             var moveNextThrows = new ValueTask<bool>(Task.FromException<bool>(exception));
 #endif
 
-            return AsyncEnumerable.CreateEnumerable(
+            return AsyncEnumerable.Create(
                 _ => AsyncEnumerator.Create<TValue>(
                     () => moveNextThrows,
-                    current: null,
-                    dispose: null)
+                    getCurrent: null,
+                    disposeAsync: null)
             );
         }
 

+ 1 - 1
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/CreateEnumerable.cs

@@ -13,7 +13,7 @@ namespace Tests
         [Fact]
         public void CreateEnumerable_Null()
         {
-            Assert.Throws<ArgumentNullException>(() => AsyncEnumerable.CreateEnumerable<int>(default));
+            Assert.Throws<ArgumentNullException>(() => AsyncEnumerable.Create<int>(default));
         }
     }
 }

+ 0 - 1
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/CreateEnumerator.cs

@@ -4,7 +4,6 @@
 
 using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Threading.Tasks;
 using Xunit;
 

+ 163 - 27
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToAsyncEnumerable.cs

@@ -5,6 +5,7 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 
@@ -21,7 +22,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable1Async()
+        public async Task ToAsyncEnumerable_Array1_Async()
         {
             var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
             var e = xs.GetAsyncEnumerator();
@@ -33,7 +34,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable2Async()
+        public async Task ToAsyncEnumerable_Enumerable1_Async()
         {
             var ex = new Exception("Bang");
             var xs = ToAsyncEnumerable_Sequence(ex).ToAsyncEnumerable();
@@ -49,7 +50,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable3Async()
+        public async Task ToAsyncEnumerable_Observable1_Async()
         {
             var subscribed = false;
 
@@ -67,14 +68,15 @@ namespace Tests
 
             var e = xs.GetAsyncEnumerator();
 
-            Assert.True(subscribed);
+            // NB: Breaking change to align with lazy nature of async iterators.
+            // Assert.True(subscribed);
 
             await HasNextAsync(e, 42);
             await NoNextAsync(e);
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable4Async()
+        public async Task ToAsyncEnumerable_Observable2_Async()
         {
             var ex = new Exception("Bang!");
             var subscribed = false;
@@ -92,13 +94,14 @@ namespace Tests
 
             var e = xs.GetAsyncEnumerator();
 
-            Assert.True(subscribed);
+            // NB: Breaking change to align with lazy nature of async iterators.
+            // Assert.True(subscribed);
 
             await AssertThrowsAsync(e.MoveNextAsync(), ex);
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable5Async()
+        public async Task ToAsyncEnumerable_Enumerable2_Async()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
 
@@ -112,7 +115,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable6()
+        public async Task ToAsyncEnumerable_Enumerable3_Async()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4, 5, 6, 7, 8 });
 
@@ -124,7 +127,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable7()
+        public async Task ToAsyncEnumerable_Enumerable4_Async()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
             var xs = set.ToAsyncEnumerable();
@@ -135,7 +138,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable8()
+        public async Task ToAsyncEnumerable_Enumerable5_Async()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
             var xs = set.ToAsyncEnumerable();
@@ -146,7 +149,7 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable9()
+        public async Task ToAsyncEnumerable_Enumerable6_Async()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
             var xs = set.ToAsyncEnumerable();
@@ -155,14 +158,14 @@ namespace Tests
         }
 
         [Fact]
-        public async Task ToAsyncEnumerable10()
+        public async Task ToAsyncEnumerable_Array2_Async()
         {
             var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
             await SequenceIdentity(xs);
         }
 
         [Fact]
-        public void ToAsyncEnumerable11()
+        public void ToAsyncEnumerable_Enumerable7()
         {
             var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
             var xs = set.ToAsyncEnumerable();
@@ -175,7 +178,6 @@ namespace Tests
 
             xc.Add(5);
 
-
             Assert.True(xc.Contains(5));
 
             Assert.True(xc.Remove(5));
@@ -188,7 +190,7 @@ namespace Tests
         }
 
         [Fact]
-        public void ToAsyncEnumerable12()
+        public void ToAsyncEnumerable_Enumerable8()
         {
             var set = new List<int> { 1, 2, 3, 4 };
             var xs = set.ToAsyncEnumerable();
@@ -260,34 +262,168 @@ namespace Tests
             await AssertThrowsAsync<TaskCanceledException>(e.MoveNextAsync().AsTask());
         }
 
-        private sealed class MyObservable<T> : IObservable<T>
+        [Fact]
+        public async Task ToAsyncEnumerable_Observable3_Async()
         {
-            private readonly Func<IObserver<T>, IDisposable> _subscribe;
+            var stop = new ManualResetEvent(false);
+
+            var xs = new MyObservable<int>(obs =>
+            {
+                var cts = new CancellationTokenSource();
+
+                Task.Run(async () =>
+                {
+                    for (var i = 0; !cts.IsCancellationRequested; i++)
+                    {
+                        await Task.Delay(10);
+                        obs.OnNext(i);
+                    }
+
+                    stop.Set();
+                });
 
-            public MyObservable(Func<IObserver<T>, IDisposable> subscribe)
+                return new MyDisposable(cts.Cancel);
+            }).ToAsyncEnumerable();
+
+            var e = xs.GetAsyncEnumerator();
+
+            for (var i = 0; i < 10; i++)
             {
-                _subscribe = subscribe;
+                await HasNextAsync(e, i);
             }
 
-            public IDisposable Subscribe(IObserver<T> observer)
+            await e.DisposeAsync();
+            stop.WaitOne();
+        }
+
+        [Fact]
+        public async Task ToAsyncEnumerable_Observable4_Async()
+        {
+            var subCount = 0;
+
+            var stop = new ManualResetEvent(false);
+
+            var xs = new MyObservable<int>(obs =>
+            {
+                subCount++;
+
+                var cts = new CancellationTokenSource();
+
+                Task.Run(async () =>
+                {
+                    for (var i = 0; !cts.IsCancellationRequested; i++)
+                    {
+                        await Task.Delay(10);
+                        obs.OnNext(i);
+                    }
+
+                    stop.Set();
+                });
+
+                return new MyDisposable(cts.Cancel);
+            }).ToAsyncEnumerable();
+
+            var e = xs.Zip(xs, (l, r) => l == r).GetAsyncEnumerator();
+
+            for (var i = 0; i < 10; i++)
             {
-                return _subscribe(observer);
+                await HasNextAsync(e, true);
             }
+
+            await e.DisposeAsync();
+            stop.WaitOne();
+
+            Assert.Equal(2, subCount);
         }
 
-        private sealed class MyDisposable : IDisposable
+        [Fact]
+        public async Task ToAsyncEnumerable_Observable5_Async()
         {
-            private readonly Action _dispose;
+            var stop = new ManualResetEvent(false);
+
+            var xs = new MyObservable<int>(obs =>
+            {
+                var cts = new CancellationTokenSource();
 
-            public MyDisposable(Action dispose)
+                Task.Run(async () =>
+                {
+                    for (var i = 0; !cts.IsCancellationRequested; i++)
+                    {
+                        await Task.Delay(10);
+                        obs.OnNext(i);
+                    }
+
+                    stop.Set();
+                });
+
+                return new MyDisposable(cts.Cancel);
+            }).ToAsyncEnumerable();
+
+            var c = new CancellationTokenSource();
+
+            var e = xs.GetAsyncEnumerator(c.Token);
+
+            for (var i = 0; i < 10; i++)
             {
-                _dispose = dispose;
+                await HasNextAsync(e, i);
             }
 
-            public void Dispose()
+            c.Cancel();
+            stop.WaitOne();
+        }
+
+        [Fact]
+        public async Task ToAsyncEnumerable_Observable6_Async()
+        {
+            var stop = new ManualResetEvent(false);
+
+            var xs = new MyObservable<int>(obs =>
+            {
+                var cts = new CancellationTokenSource();
+
+                Task.Run(async () =>
+                {
+                    for (var i = 0; !cts.IsCancellationRequested; i++)
+                    {
+                        await Task.Yield();
+                        obs.OnNext(i);
+                    }
+
+                    stop.Set();
+                });
+
+                return new MyDisposable(cts.Cancel);
+            }).ToAsyncEnumerable();
+
+            var e = xs.GetAsyncEnumerator();
+
+            for (var i = 0; i < 10_000; i++)
             {
-                _dispose();
+                await HasNextAsync(e, i);
             }
+
+            await e.DisposeAsync();
+            stop.WaitOne();
+        }
+
+        // TODO: Add more tests for Observable conversion.
+
+        private sealed class MyObservable<T> : IObservable<T>
+        {
+            private readonly Func<IObserver<T>, IDisposable> _subscribe;
+
+            public MyObservable(Func<IObserver<T>, IDisposable> subscribe) => _subscribe = subscribe;
+
+            public IDisposable Subscribe(IObserver<T> observer) => _subscribe(observer);
+        }
+
+        private sealed class MyDisposable : IDisposable
+        {
+            private readonly Action _dispose;
+
+            public MyDisposable(Action dispose) => _dispose = dispose;
+
+            public void Dispose() => _dispose();
         }
     }
 }

+ 3 - 3
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs

@@ -141,7 +141,7 @@ namespace Tests
             var fail = false;
             var evt = new ManualResetEvent(false);
 
-            var ae = AsyncEnumerable.CreateEnumerable(
+            var ae = AsyncEnumerable.Create(
                 _ => AsyncEnumerator.Create<int>(
                     () => new ValueTask<bool>(false),
                     () => { throw new InvalidOperationException(); },
@@ -175,7 +175,7 @@ namespace Tests
             var subscription = default(IDisposable);
             var subscriptionAssignedTcs = new TaskCompletionSource<object>();
 
-            var ae = AsyncEnumerable.CreateEnumerable(
+            var ae = AsyncEnumerable.Create(
                 _ => AsyncEnumerator.Create(
                     async () =>
                     {
@@ -221,7 +221,7 @@ namespace Tests
             var subscription = default(IDisposable);
             var subscriptionAssignedTcs = new TaskCompletionSource<object>();
 
-            var ae = AsyncEnumerable.CreateEnumerable(
+            var ae = AsyncEnumerable.Create(
                 _ => AsyncEnumerator.Create(
                     async () =>
                     {

+ 19 - 11
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerable.cs

@@ -3,31 +3,39 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
-using System.Diagnostics;
 using System.Threading;
 
 namespace System.Linq
 {
+    /// <summary>
+    /// Provides a set of extension methods for <see cref="IAsyncEnumerable{T}"/>.
+    /// </summary>
     public static partial class AsyncEnumerable
     {
-        public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
+        //
+        // REVIEW: Create methods may not belong in System.Linq.Async. Async iterators can be
+        //         used to implement these interfaces. Move to System.Interactive.Async?
+        //
+
+        /// <summary>
+        /// Creates a new enumerable using the specified delegates implementing the members of <see cref="IAsyncEnumerable{T}"/>.
+        /// </summary>
+        /// <typeparam name="T">The type of the elements returned by the enumerable sequence.</typeparam>
+        /// <param name="getAsyncEnumerator">The delegate implementing the <see cref="IAsyncEnumerable{T}.GetAsyncEnumerator"/> method.</param>
+        /// <returns>A new enumerable instance.</returns>
+        public static IAsyncEnumerable<T> Create<T>(Func<CancellationToken, IAsyncEnumerator<T>> getAsyncEnumerator)
         {
-            if (getEnumerator == null)
-                throw Error.ArgumentNull(nameof(getEnumerator));
+            if (getAsyncEnumerator == null)
+                throw Error.ArgumentNull(nameof(getAsyncEnumerator));
 
-            return new AnonymousAsyncEnumerable<T>(getEnumerator);
+            return new AnonymousAsyncEnumerable<T>(getAsyncEnumerator);
         }
 
         private sealed class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
         {
             private readonly Func<CancellationToken, IAsyncEnumerator<T>> _getEnumerator;
 
-            public AnonymousAsyncEnumerable(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
-            {
-                Debug.Assert(getEnumerator != null);
-
-                _getEnumerator = getEnumerator;
-            }
+            public AnonymousAsyncEnumerable(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator) => _getEnumerator = getEnumerator;
 
             public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
             {

+ 17 - 24
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerator.cs

@@ -14,22 +14,29 @@ namespace System.Collections.Generic
     /// </summary>
     public static partial class AsyncEnumerator
     {
+        //
+        // REVIEW: Create methods may not belong in System.Linq.Async. Async iterators can be
+        //         used to implement these interfaces. Move to System.Interactive.Async?
+        //
+
         /// <summary>
         /// Creates a new enumerator using the specified delegates implementing the members of <see cref="IAsyncEnumerator{T}"/>.
         /// </summary>
         /// <typeparam name="T">The type of the elements returned by the enumerator.</typeparam>
-        /// <param name="moveNext">The delegate implementing the <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> method.</param>
-        /// <param name="current">The delegate implementing the <see cref="IAsyncEnumerator{T}.Current"/> property getter.</param>
-        /// <param name="dispose">The delegate implementing the <see cref="IAsyncDisposable.DisposeAsync"/> method.</param>
+        /// <param name="moveNextAsync">The delegate implementing the <see cref="IAsyncEnumerator{T}.MoveNextAsync"/> method.</param>
+        /// <param name="getCurrent">The delegate implementing the <see cref="IAsyncEnumerator{T}.Current"/> property getter.</param>
+        /// <param name="disposeAsync">The delegate implementing the <see cref="IAsyncDisposable.DisposeAsync"/> method.</param>
         /// <returns>A new enumerator instance.</returns>
-        public static IAsyncEnumerator<T> Create<T>(Func<ValueTask<bool>> moveNext, Func<T> current, Func<ValueTask> dispose)
+        public static IAsyncEnumerator<T> Create<T>(Func<ValueTask<bool>> moveNextAsync, Func<T> getCurrent, Func<ValueTask> disposeAsync)
         {
-            if (moveNext == null)
-                throw Error.ArgumentNull(nameof(moveNext));
-
-            // Note: Many methods pass null in for the second two params. We're assuming
-            // That the caller is responsible and knows what they're doing
-            return new AnonymousAsyncIterator<T>(moveNext, current, dispose);
+            if (moveNextAsync == null)
+                throw Error.ArgumentNull(nameof(moveNextAsync));
+
+            //
+            // NB: Callers can pass null for the second two parameters, which can be useful to
+            //     implement enumerators that throw or yield no results.
+            //
+            return new AnonymousAsyncIterator<T>(moveNextAsync, getCurrent, disposeAsync);
         }
 
         /// <summary>
@@ -52,20 +59,6 @@ namespace System.Collections.Generic
             return source.MoveNextAsync();
         }
 
-        internal static IAsyncEnumerator<T> Create<T>(Func<TaskCompletionSource<bool>, ValueTask<bool>> moveNext, Func<T> current, Func<ValueTask> dispose)
-        {
-            return new AnonymousAsyncIterator<T>(
-                async () =>
-                {
-                    var tcs = new TaskCompletionSource<bool>();
-
-                    return await moveNext(tcs).ConfigureAwait(false);
-                },
-                current,
-                dispose
-            );
-        }
-
         private sealed class AnonymousAsyncIterator<T> : AsyncIterator<T>
         {
             private readonly Func<T> _currentFunc;

+ 130 - 97
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.Observable.cs

@@ -2,7 +2,9 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Concurrent;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -14,146 +16,177 @@ namespace System.Linq
             if (source == null)
                 throw Error.ArgumentNull(nameof(source));
 
-            return CreateEnumerable(
-                ct =>
-                {
-                    var observer = new ToAsyncEnumerableObserver<TSource>();
+            return new ObservableAsyncEnumerable<TSource>(source);
+        }
+
+        private sealed class ObservableAsyncEnumerable<TSource> : AsyncIterator<TSource>, IObserver<TSource>
+        {
+            private readonly IObservable<TSource> _source;
 
-                    var subscription = source.Subscribe(observer);
+            private ConcurrentQueue<TSource> _values = new ConcurrentQueue<TSource>();
+            private Exception _error;
+            private bool _completed;
+            private TaskCompletionSource<bool> _signal;
+            private IDisposable _subscription;
+            private CancellationTokenRegistration _ctr;
 
-                    // REVIEW: Review possible concurrency issues with Dispose calls.
+            public ObservableAsyncEnumerable(IObservable<TSource> source) => _source = source;
 
-                    var ctr = ct.Register(subscription.Dispose);
+            public override AsyncIteratorBase<TSource> Clone() => new ObservableAsyncEnumerable<TSource>(_source);
+
+            public override ValueTask DisposeAsync()
+            {
+                Dispose();
+
+                return base.DisposeAsync();
+            }
 
-                    return AsyncEnumerator.Create(
-                        tcs =>
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                //
+                // REVIEW: How often should we check? At the very least, we want to prevent
+                //         subscribing if cancellation is requested. A case may be made to
+                //         check for each iteration, namely because this operator is a bridge
+                //         with another interface. However, we also wire up cancellation to
+                //         the observable subscription, so there's redundancy here.
+                //
+                _cancellationToken.ThrowIfCancellationRequested();
+
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        //
+                        // NB: Breaking change to align with lazy nature of async iterators.
+                        //
+                        //     In previous implementations, the Subscribe call happened during
+                        //     the call to GetAsyncEnumerator.
+                        //
+                        // REVIEW: Confirm this design point. This implementation is compatible
+                        //         with an async iterator using "yield return", e.g. subscribing
+                        //         to the observable sequence and yielding values out of a local
+                        //         queue filled by observer callbacks. However, it departs from
+                        //         the dual treatment of Subscribe/GetEnumerator.
+                        //
+
+                        _subscription = _source.Subscribe(this);
+                        _ctr = _cancellationToken.Register(OnCanceled, state: null);
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (true)
                         {
-                            var hasValue = false;
-                            var hasCompleted = false;
-                            var error = default(Exception);
+                            var completed = Volatile.Read(ref _completed);
 
-                            lock (observer.SyncRoot)
+                            if (_values.TryDequeue(out _current))
                             {
-                                if (observer.Values.Count > 0)
-                                {
-                                    hasValue = true;
-                                    observer.Current = observer.Values.Dequeue();
-                                }
-                                else if (observer.HasCompleted)
-                                {
-                                    hasCompleted = true;
-                                }
-                                else if (observer.Error != null)
-                                {
-                                    error = observer.Error;
-                                }
-                                else
+                                return true;
+                            }
+                            else if (completed)
+                            {
+                                var error = _error;
+
+                                if (error != null)
                                 {
-                                    observer.TaskCompletionSource = tcs;
+                                    throw error;
                                 }
-                            }
 
-                            if (hasValue)
-                            {
-                                tcs.TrySetResult(true);
-                            }
-                            else if (hasCompleted)
-                            {
-                                tcs.TrySetResult(false);
-                            }
-                            else if (error != null)
-                            {
-                                tcs.TrySetException(error);
+                                return false;
                             }
 
-                            return new ValueTask<bool>(tcs.Task);
-                        },
-                        () => observer.Current,
-                        () =>
-                        {
-                            ctr.Dispose();
-                            subscription.Dispose();
-                            // Should we cancel in-flight operations somehow?
-                            return default;
-                        });
-                });
-        }
-
-        private sealed class ToAsyncEnumerableObserver<T> : IObserver<T>
-        {
-            public readonly Queue<T> Values;
+                            await Resume().ConfigureAwait(false);
+                            Volatile.Write(ref _signal, null);
+                        }
+                }
 
-            public T Current;
-            public Exception Error;
-            public bool HasCompleted;
-            public TaskCompletionSource<bool> TaskCompletionSource;
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
 
-            public ToAsyncEnumerableObserver()
+            public void OnCompleted()
             {
-                Values = new Queue<T>();
+                Volatile.Write(ref _completed, true);
+
+                DisposeSubscription();
+                OnNotification();
             }
 
-            public object SyncRoot
+            public void OnError(Exception error)
             {
-                get { return Values; }
+                _error = error;
+                Volatile.Write(ref _completed, true);
+
+                DisposeSubscription();
+                OnNotification();
             }
 
-            public void OnCompleted()
+            public void OnNext(TSource value)
             {
-                var tcs = default(TaskCompletionSource<bool>);
+                _values?.Enqueue(value);
 
-                lock (SyncRoot)
+                OnNotification();
+            }
+
+            private void OnNotification()
+            {
+                while (true)
                 {
-                    HasCompleted = true;
+                    var signal = Volatile.Read(ref _signal);
 
-                    if (TaskCompletionSource != null)
+                    if (signal == TaskExt.True)
                     {
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
+                        return;
                     }
-                }
 
-                tcs?.TrySetResult(false);
-            }
-
-            public void OnError(Exception error)
-            {
-                var tcs = default(TaskCompletionSource<bool>);
-
-                lock (SyncRoot)
-                {
-                    Error = error;
+                    if (signal != null)
+                    {
+                        signal.TrySetResult(true);
+                        return;
+                    }
 
-                    if (TaskCompletionSource != null)
+                    if (Interlocked.CompareExchange(ref _signal, TaskExt.True, null) == null)
                     {
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
+                        return;
                     }
                 }
+            }
+
+            private void Dispose()
+            {
+                _ctr.Dispose();
+                DisposeSubscription();
 
-                tcs?.TrySetException(error);
+                _values = null;
+                _error = null;
             }
 
-            public void OnNext(T value)
+            private void DisposeSubscription() => Interlocked.Exchange(ref _subscription, null)?.Dispose();
+
+            private void OnCanceled(object state) => Dispose();
+
+            private Task Resume()
             {
-                var tcs = default(TaskCompletionSource<bool>);
+                TaskCompletionSource<bool> newSignal = null;
 
-                lock (SyncRoot)
+                while (true)
                 {
-                    if (TaskCompletionSource == null)
+                    var signal = Volatile.Read(ref _signal);
+
+                    if (signal != null)
                     {
-                        Values.Enqueue(value);
+                        return signal.Task;
                     }
-                    else
+
+                    if (newSignal == null)
                     {
-                        Current = value;
+                        newSignal = new TaskCompletionSource<bool>();
+                    }
 
-                        tcs = TaskCompletionSource;
-                        TaskCompletionSource = null;
+                    if (Interlocked.CompareExchange(ref _signal, newSignal, null) == null)
+                    {
+                        return newSignal.Task;
                     }
                 }
-
-                tcs?.TrySetResult(true);
             }
         }
     }

+ 8 - 0
Ix.NET/Source/System.Linq.Async/System/Threading/Tasks/TaskExt.cs

@@ -7,5 +7,13 @@ namespace System.Threading.Tasks
     internal static class TaskExt
     {
         public static readonly Task<int> MinusOne = Task.FromResult(-1);
+
+        public static readonly TaskCompletionSource<bool> True;
+
+        static TaskExt()
+        {
+            True = new TaskCompletionSource<bool>();
+            True.SetResult(true);
+        }
     }
 }