Răsfoiți Sursa

Save an allocation in CreateWithTaskDisposable.

Daniel Weber 7 ani în urmă
părinte
comite
7323bdffbe

+ 47 - 36
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -137,59 +137,70 @@ namespace System.Reactive.Linq
 
         private sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
         {
-            private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;
-
-            public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
+            private sealed class Subscription : IDisposable
             {
-                _subscribeAsync = subscribeAsync;
-            }
+                private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
+                {
+                    private readonly IObserver<TResult> _observer;
+                    private IDisposable _disposable;
 
-            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
-            {
-                var cancellable = new CancellationDisposable();
+                    public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                    {
+                        _observer = observer;
+                    }
 
-                var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
+                    public void Dispose()
+                    {
+                        Disposable.TryDispose(ref _disposable);
+                    }
 
-                var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
+                    public void OnCompleted()
+                    {
+                        _observer.OnCompleted();
+                    }
 
-                //
-                // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
-                // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
-                //
-                taskObservable.Subscribe(taskCompletionObserver);
+                    public void OnError(Exception error)
+                    {
+                        _observer.OnError(error);
+                    }
 
-                return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
-            }
+                    public void OnNext(IDisposable value)
+                    {
+                        Disposable.SetSingle(ref _disposable, value);
+                    }
+                }
 
-            private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
-            {
-                private readonly IObserver<TResult> _observer;
-                private IDisposable _disposable;
+                private readonly TaskDisposeCompletionObserver _observer;
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
-                public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                public Subscription(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync, IObserver<TResult> observer)
                 {
-                    _observer = observer;
+                    //
+                    // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
+                    // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
+                    //
+                    subscribeAsync(observer, _cts.Token)
+                        .ToObservable()
+                        .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 
                 public void Dispose()
                 {
-                    Disposable.TryDispose(ref _disposable);
+                    _cts.Cancel();
+                    _observer.Dispose();
                 }
+            }
 
-                public void OnCompleted()
-                {
-                    _observer.OnCompleted();
-                }
+            private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;
 
-                public void OnError(Exception error)
-                {
-                    _observer.OnError(error);
-                }
+            public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
+            {
+                _subscribeAsync = subscribeAsync;
+            }
 
-                public void OnNext(IDisposable value)
-                {
-                    Disposable.SetSingle(ref _disposable, value);
-                }
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return new Subscription(_subscribeAsync, observer);
             }
         }