Преглед изворни кода

Merge pull request #744 from danielcweber/ReviewQueryLanguageCreation

Review QueryLanguage.Creation
Daniel C. Weber пре 7 година
родитељ
комит
fcd8d279c0
1 измењених фајлова са 138 додато и 104 уклоњено
  1. 138 104
      Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

+ 138 - 104
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -68,48 +68,61 @@ namespace System.Reactive.Linq
 
         private sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
         {
-            private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;
-
-            public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
-            {
-                _subscribeAsync = subscribeAsync;
-            }
-
-            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            private sealed class Subscription : IDisposable
             {
-                var cancellable = new CancellationDisposable();
+                private sealed class TaskCompletionObserver : IObserver<Unit>
+                {
+                    private readonly IObserver<TResult> _observer;
 
-                var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
-                var taskCompletionObserver = new TaskCompletionObserver(observer);
-                var subscription = taskObservable.Subscribe(taskCompletionObserver);
+                    public TaskCompletionObserver(IObserver<TResult> observer)
+                    {
+                        _observer = observer;
+                    }
 
-                return StableCompositeDisposable.Create(cancellable, subscription);
-            }
+                    public void OnCompleted()
+                    {
+                        _observer.OnCompleted();
+                    }
 
-            private sealed class TaskCompletionObserver : IObserver<Unit>
-            {
-                private readonly IObserver<TResult> _observer;
+                    public void OnError(Exception error)
+                    {
+                        _observer.OnError(error);
+                    }
 
-                public TaskCompletionObserver(IObserver<TResult> observer)
-                {
-                    _observer = observer;
+                    public void OnNext(Unit value)
+                    {
+                        // deliberately ignored
+                    }
                 }
 
-                public void OnCompleted()
-                {
-                    _observer.OnCompleted();
-                }
+                private readonly IDisposable _subscription;
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
-                public void OnError(Exception error)
+                public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
                 {
-                    _observer.OnError(error);
+                    _subscription = subscribeAsync(observer, _cts.Token)
+                        .ToObservable()
+                        .Subscribe(new TaskCompletionObserver(observer));
                 }
 
-                public void OnNext(Unit value)
+                public void Dispose()
                 {
-                    // deliberately ignored
+                    _cts.Cancel();
+                    _subscription.Dispose();
                 }
             }
+
+            private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;
+
+            public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
+            {
+                _subscribeAsync = subscribeAsync;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return new Subscription(_subscribeAsync, observer);
+            }
         }
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
@@ -124,59 +137,70 @@ namespace System.Reactive.Linq
 
         private sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
         {
-            private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;
-
-            public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
+            private sealed class Subscription : IDisposable
             {
-                _subscribeAsync = subscribeAsync;
-            }
+                private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
+                {
+                    private readonly IObserver<TResult> _observer;
+                    private IDisposable _disposable;
 
-            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
-            {
-                var cancellable = new CancellationDisposable();
+                    public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                    {
+                        _observer = observer;
+                    }
 
-                var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
+                    public void Dispose()
+                    {
+                        Disposable.TryDispose(ref _disposable);
+                    }
 
-                var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
+                    public void OnCompleted()
+                    {
+                        _observer.OnCompleted();
+                    }
 
-                //
-                // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
-                // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
-                //
-                taskObservable.Subscribe(taskCompletionObserver);
+                    public void OnError(Exception error)
+                    {
+                        _observer.OnError(error);
+                    }
 
-                return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
-            }
+                    public void OnNext(IDisposable value)
+                    {
+                        Disposable.SetSingle(ref _disposable, value);
+                    }
+                }
 
-            private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
-            {
-                private readonly IObserver<TResult> _observer;
-                private IDisposable _disposable;
+                private readonly TaskDisposeCompletionObserver _observer;
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
-                public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                public Subscription(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync, IObserver<TResult> observer)
                 {
-                    _observer = observer;
+                    //
+                    // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
+                    // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
+                    //
+                    subscribeAsync(observer, _cts.Token)
+                        .ToObservable()
+                        .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 
                 public void Dispose()
                 {
-                    Disposable.TryDispose(ref _disposable);
+                    _cts.Cancel();
+                    _observer.Dispose();
                 }
+            }
 
-                public void OnCompleted()
-                {
-                    _observer.OnCompleted();
-                }
+            private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;
 
-                public void OnError(Exception error)
-                {
-                    _observer.OnError(error);
-                }
+            public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
+            {
+                _subscribeAsync = subscribeAsync;
+            }
 
-                public void OnNext(IDisposable value)
-                {
-                    Disposable.SetSingle(ref _disposable, value);
-                }
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return new Subscription(_subscribeAsync, observer);
             }
         }
 
@@ -192,63 +216,73 @@ namespace System.Reactive.Linq
 
         private sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
         {
-            private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;
-
-            public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
+            private sealed class Subscription : IDisposable
             {
-                _subscribeAsync = subscribeAsync;
-            }
+                private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
+                {
+                    private readonly IObserver<TResult> _observer;
+                    private Action _disposable;
 
-            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
-            {
-                var cancellable = new CancellationDisposable();
+                    public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                    {
+                        _observer = observer;
+                    }
 
-                var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
+                    public void Dispose()
+                    {
+                        Interlocked.Exchange(ref _disposable, Stubs.Nop)?.Invoke();
+                    }
 
-                var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
+                    public void OnCompleted()
+                    {
+                        _observer.OnCompleted();
+                    }
 
-                //
-                // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
-                // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
-                //
-                taskObservable.Subscribe(taskCompletionObserver);
+                    public void OnError(Exception error)
+                    {
+                        _observer.OnError(error);
+                    }
 
-                return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
-            }
+                    public void OnNext(Action value)
+                    {
+                        if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
+                        {
+                            value?.Invoke();
+                        }
+                    }
+                }
 
-            private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
-            {
-                private readonly IObserver<TResult> _observer;
-                private Action _disposable;
-                private static readonly Action DisposedAction = () => { };
+                private readonly TaskDisposeCompletionObserver _observer;
+                private readonly CancellationTokenSource _cts = new CancellationTokenSource();
 
-                public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                public Subscription(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync, IObserver<TResult> observer)
                 {
-                    _observer = observer;
+                    //
+                    // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
+                    // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
+                    //
+                    subscribeAsync(observer, _cts.Token)
+                        .ToObservable()
+                        .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 
                 public void Dispose()
                 {
-                    Interlocked.Exchange(ref _disposable, DisposedAction)?.Invoke();
+                    _cts.Cancel();
+                    _observer.Dispose();
                 }
+            }
 
-                public void OnCompleted()
-                {
-                    _observer.OnCompleted();
-                }
+            private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;
 
-                public void OnError(Exception error)
-                {
-                    _observer.OnError(error);
-                }
+            public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
+            {
+                _subscribeAsync = subscribeAsync;
+            }
 
-                public void OnNext(Action value)
-                {
-                    if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
-                    {
-                        value?.Invoke();
-                    }
-                }
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return new Subscription(_subscribeAsync, observer);
             }
         }