Browse Source

Improving AutoDetachAsyncObserver.

Bart De Smet 8 years ago
parent
commit
f222d23467

+ 15 - 6
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/AsyncObservable.cs

@@ -32,12 +32,14 @@ namespace System.Reactive.Linq
 
 
                 var autoDetach = new AutoDetachAsyncObserver(observer);
                 var autoDetach = new AutoDetachAsyncObserver(observer);
 
 
-                await _subscribeAsync(autoDetach).ConfigureAwait(false);
+                var subscription = await _subscribeAsync(autoDetach).ConfigureAwait(false);
 
 
-                throw new NotImplementedException();
+                await autoDetach.AssignAsync(subscription);
+
+                return autoDetach;
             }
             }
 
 
-            private sealed class AutoDetachAsyncObserver : IAsyncObserver<T>
+            private sealed class AutoDetachAsyncObserver : AsyncObserverBase<T>, IAsyncDisposable
             {
             {
                 private readonly IAsyncObserver<T> _observer;
                 private readonly IAsyncObserver<T> _observer;
 
 
@@ -46,20 +48,27 @@ namespace System.Reactive.Linq
                     _observer = observer;
                     _observer = observer;
                 }
                 }
 
 
-                public Task OnCompletedAsync()
+                public Task AssignAsync(IAsyncDisposable subscription)
+                {
+                    throw new NotImplementedException();
+                }
+
+                public Task DisposeAsync()
                 {
                 {
                     throw new NotImplementedException();
                     throw new NotImplementedException();
                 }
                 }
 
 
-                public Task OnErrorAsync(Exception error)
+                protected override Task OnCompletedAsyncCore()
                 {
                 {
                     throw new NotImplementedException();
                     throw new NotImplementedException();
                 }
                 }
 
 
-                public Task OnNextAsync(T value)
+                protected override Task OnErrorAsyncCore(Exception error)
                 {
                 {
                     throw new NotImplementedException();
                     throw new NotImplementedException();
                 }
                 }
+
+                protected override Task OnNextAsyncCore(T value) => _observer.OnNextAsync(value);
             }
             }
         }
         }
     }
     }