Parcourir la source

Save an allocation in CreateWithTaskTokenObservable<TResult>.

Daniel Weber il y a 7 ans
Parent
commit
796a2c533f
1 fichiers modifiés avec 41 ajouts et 28 suppressions
  1. 41 28
      Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

+ 41 - 28
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -68,48 +68,61 @@ namespace System.Reactive.Linq
 
         private sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
         {
-            private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;
-
-            public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
+            private sealed class Subscription : IDisposable
             {
-                _subscribeAsync = subscribeAsync;
-            }
-
-            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
-            {
-                var cancellable = new CancellationDisposable();
+                private sealed class TaskCompletionObserver : IObserver<Unit>
+                {
+                    private readonly IObserver<TResult> _observer;
 
-                var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
-                var taskCompletionObserver = new TaskCompletionObserver(observer);
-                var subscription = taskObservable.Subscribe(taskCompletionObserver);
+                    public TaskCompletionObserver(IObserver<TResult> observer)
+                    {
+                        _observer = observer;
+                    }
 
-                return StableCompositeDisposable.Create(cancellable, subscription);
-            }
+                    public void OnCompleted()
+                    {
+                        _observer.OnCompleted();
+                    }
 
-            private sealed class TaskCompletionObserver : IObserver<Unit>
-            {
-                private readonly IObserver<TResult> _observer;
+                    public void OnError(Exception error)
+                    {
+                        _observer.OnError(error);
+                    }
 
-                public TaskCompletionObserver(IObserver<TResult> observer)
-                {
-                    _observer = observer;
+                    public void OnNext(Unit value)
+                    {
+                        // deliberately ignored
+                    }
                 }
 
-                public void OnCompleted()
-                {
-                    _observer.OnCompleted();
-                }
+                private readonly IDisposable _subscription;
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
-                public void OnError(Exception error)
+                public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
                 {
-                    _observer.OnError(error);
+                    _subscription = subscribeAsync(observer, _cts.Token)
+                        .ToObservable()
+                        .Subscribe(new TaskCompletionObserver(observer));
                 }
 
-                public void OnNext(Unit value)
+                public void Dispose()
                 {
-                    // deliberately ignored
+                    _cts.Cancel();
+                    _subscription.Dispose();
                 }
             }
+
+            private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;
+
+            public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
+            {
+                _subscribeAsync = subscribeAsync;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return new Subscription(_subscribeAsync, observer);
+            }
         }
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)