소스 검색

More noodling with AutoDetachAsyncObserver.

Bart De Smet 8 년 전
부모
커밋
47de311809
1개의 변경된 파일152개의 추가작업 그리고 10개의 파일을 삭제
  1. 152 10
      AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservableBase.cs

+ 152 - 10
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservableBase.cs

@@ -2,7 +2,6 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -28,33 +27,176 @@ namespace System.Reactive.Linq
         private sealed class AutoDetachAsyncObserver : AsyncObserverBase<T>, IAsyncDisposable
         {
             private readonly IAsyncObserver<T> _observer;
+            private readonly object _gate = new object();
+
+            private IAsyncDisposable _subscription;
+            private Task _task;
+            private bool _disposing;
 
             public AutoDetachAsyncObserver(IAsyncObserver<T> observer)
             {
                 _observer = observer;
             }
 
-            public Task AssignAsync(IAsyncDisposable subscription)
+            public async Task AssignAsync(IAsyncDisposable subscription)
             {
-                throw new NotImplementedException();
+                var shouldDispose = false;
+
+                lock (_gate)
+                {
+                    if (_disposing)
+                    {
+                        shouldDispose = true;
+                    }
+                    else
+                    {
+                        _subscription = subscription;
+                    }
+                }
+
+                if (shouldDispose)
+                {
+                    await subscription.DisposeAsync().ConfigureAwait(false);
+                }
             }
 
-            public Task DisposeAsync()
+            public async Task DisposeAsync()
             {
-                throw new NotImplementedException();
+                var task = default(Task);
+                var subscription = default(IAsyncDisposable);
+
+                lock (_gate)
+                {
+                    //
+                    // NB: The postcondition of awaiting the first DisposeAsync call to complete is that all message
+                    //     processing has ceased, i.e. no further On*AsyncCore calls will be made. This is achieved
+                    //     here by setting _disposing to true, which is checked by the On*AsyncCore calls upon
+                    //     entry, and by awaiting the task of any in-flight On*AsyncCore calls.
+                    //
+                    //     Timing of the disposal of the subscription is less deterministic due to the intersection
+                    //     with the AssignAsync code path. However, the auto-detach observer can only be returned
+                    //     from the SubscribeAsync call *after* a call to AssignAsync has been made and awaited, so
+                    //     either AssignAsync triggers the disposal and an already disposed instance is returned, or
+                    //     the user calling DisposeAsync will either encounter a busy observer which will be stopped
+                    //     in its tracks (as described above) or it will trigger a disposal of the subscription. In
+                    //     both these cases the result of awaiting DisposeAsync guarantees no further message flow.
+                    //
+
+                    if (!_disposing)
+                    {
+                        _disposing = true;
+
+                        task = _task;
+                        subscription = _subscription;
+                    }
+                }
+
+                try
+                {
+                    if (task != null)
+                    {
+                        await task.ConfigureAwait(false);
+                    }
+                }
+                finally
+                {
+                    if (subscription != null)
+                    {
+                        await subscription.DisposeAsync().ConfigureAwait(false);
+                    }
+                }
+            }
+
+            protected override async Task OnCompletedAsyncCore()
+            {
+                lock (_gate)
+                {
+                    if (_disposing)
+                    {
+                        return;
+                    }
+
+                    _task = _observer.OnCompletedAsync();
+                }
+
+                try
+                {
+                    await _task.ConfigureAwait(false);
+                }
+                finally
+                {
+                    await FinishAsync().ConfigureAwait(false);
+                }
             }
 
-            protected override Task OnCompletedAsyncCore()
+            protected override async Task OnErrorAsyncCore(Exception error)
             {
-                throw new NotImplementedException();
+                lock (_gate)
+                {
+                    if (_disposing)
+                    {
+                        return;
+                    }
+
+                    _task = _observer.OnErrorAsync(error);
+                }
+
+                try
+                {
+                    await _task.ConfigureAwait(false);
+                }
+                finally
+                {
+                    await FinishAsync().ConfigureAwait(false);
+                }
             }
 
-            protected override Task OnErrorAsyncCore(Exception error)
+            protected override async Task OnNextAsyncCore(T value)
             {
-                throw new NotImplementedException();
+                lock (_gate)
+                {
+                    if (_disposing)
+                    {
+                        return;
+                    }
+
+                    _task = _observer.OnNextAsync(value);
+                }
+
+                try
+                {
+                    await _task.ConfigureAwait(false);
+                }
+                finally
+                {
+                    lock (_gate)
+                    {
+                        _task = null;
+                    }
+                }
             }
 
-            protected override Task OnNextAsyncCore(T value) => _observer.OnNextAsync(value);
+            private async Task FinishAsync()
+            {
+                var subscription = default(IAsyncDisposable);
+
+                lock (_gate)
+                {
+                    if (!_disposing)
+                    {
+                        _disposing = true;
+
+                        subscription = _subscription;
+                    }
+
+                    _task = null;
+                }
+
+                if (subscription != null)
+                {
+                    await subscription.DisposeAsync().ConfigureAwait(false);
+                }
+            }
         }
     }
 }