Просмотр исходного кода

Switching to an interface-based model.

Bart De Smet 8 лет назад
Родитель
Сommit
0837259afc

+ 0 - 55
Rx.NET/Source/src/System.Reactive/Observable.cs

@@ -1,55 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the Apache 2.0 License.
-// See the LICENSE file in the project root for more information. 
-
-using System.Runtime.CompilerServices;
-
-namespace System.Reactive
-{
-    /// <summary>
-    /// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
-    /// </summary>
-    /// <remarks>
-    /// This class implements a "task-like" type that can be used as the return type of an asynchronous
-    /// method in C# 7.0 and beyond. For example:
-    /// <code>
-    /// async Observable&lt;int&gt; RxAsync()
-    /// {
-    ///     var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
-    ///     return res * 2;
-    /// }
-    /// </code>
-    /// </remarks>
-    /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
-    [AsyncMethodBuilder(typeof(ObservableMethodBuilder<>))]
-    public sealed class Observable<T> : IObservable<T>
-    {
-        /// <summary>
-        /// The underlying observable sequence to subscribe to.
-        /// </summary>
-        private readonly IObservable<T> _inner;
-
-        /// <summary>
-        /// Creates a new task-like observable instance using the specified <paramref name="inner"/> observable sequence.
-        /// </summary>
-        /// <param name="inner">The underlying observable sequence to subscribe to.</param>
-        internal Observable(IObservable<T> inner)
-        {
-            _inner = inner;
-        }
-
-        /// <summary>
-        /// Subscribes the given observer to the observable sequence.
-        /// </summary>
-        /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
-        /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
-        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
-        public IDisposable Subscribe(IObserver<T> observer)
-        {
-            if (observer == null)
-                throw new ArgumentNullException(nameof(observer));
-
-            return _inner.Subscribe(observer);
-        }
-    }
-}

+ 172 - 40
Rx.NET/Source/src/System.Reactive/Runtime/CompilerServices/ObservableMethodBuilder.cs → Rx.NET/Source/src/System.Reactive/Runtime/CompilerServices/TaskObservableMethodBuilder.cs

@@ -4,35 +4,35 @@
 
 using System.Reactive;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using System.Reactive.Subjects;
-using System.Runtime.ExceptionServices;
 using System.Security;
 
 namespace System.Runtime.CompilerServices
 {
     /// <summary>
-    /// Represents a builder for asynchronous methods that return a task-like <see cref="Observable{T}"/>.
+    /// Represents a builder for asynchronous methods that return a task-like <see cref="ITaskObservable{T}"/>.
     /// </summary>
     /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
-    public struct ObservableMethodBuilder<T>
+    public struct TaskObservableMethodBuilder<T>
     {
         /// <summary>
         /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
-        /// method whose return type is a task-like <see cref="Observable{T}"/>.
+        /// method whose return type is a task-like <see cref="ITaskObservable{T}"/>.
         /// </summary>
         private IAsyncStateMachine _stateMachine;
 
         /// <summary>
         /// The underlying observable sequence representing the result produced by the asynchronous method.
         /// </summary>
-        private IObservable<T> _result;
+        private TaskObservable _inner;
 
         /// <summary>
-        /// Creates an instance of the <see cref="ObservableMethodBuilder{T}"/> struct.
+        /// Creates an instance of the <see cref="TaskObservableMethodBuilder{T}"/> struct.
         /// </summary>
         /// <returns>A new instance of the struct.</returns>
-        public static ObservableMethodBuilder<T> Create() => default(ObservableMethodBuilder<T>);
+        public static TaskObservableMethodBuilder<T> Create() => default(TaskObservableMethodBuilder<T>);
 
         /// <summary>
         /// Begins running the builder with the associated state machine.
@@ -73,26 +73,13 @@ namespace System.Runtime.CompilerServices
         /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
         public void SetResult(T result)
         {
-            if (_result == null)
+            if (_inner == null)
             {
-                _result = Observable.Return<T>(result);
+                _inner = new TaskObservable(result);
             }
             else
             {
-                var subject = _result as AsyncSubject<T>;
-
-                // NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
-                //
-                //     We can live with this limitation and merely put in this check to catch invalid
-                //     manual usage for which behavior is undefined. In the compiler-generated code that
-                //     interacts with the asynchronous method builder, no concurrent calls to the Set*
-                //     methods should occur.
-
-                if (subject == null || subject.IsCompleted)
-                    throw new InvalidOperationException();
-
-                subject.OnNext(result);
-                subject.OnCompleted();
+                _inner.SetResult(result);
             }
         }
 
@@ -107,32 +94,20 @@ namespace System.Runtime.CompilerServices
             if (exception == null)
                 throw new ArgumentNullException(nameof(exception));
 
-            if (_result == null)
+            if (_inner == null)
             {
-                _result = Observable.Throw<T>(exception);
+                _inner = new TaskObservable(exception);
             }
             else
             {
-                var subject = _result as AsyncSubject<T>;
-
-                // NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
-                //
-                //     We can live with this limitation and merely put in this check to catch invalid
-                //     manual usage for which behavior is undefined. In the compiler-generated code that
-                //     interacts with the asynchronous method builder, no concurrent calls to the Set*
-                //     methods should occur.
-
-                if (subject == null || subject.IsCompleted)
-                    throw new InvalidOperationException();
-
-                subject.OnError(exception);
+                _inner.SetException(exception);
             }
         }
 
         /// <summary>
         /// Gets the observable sequence for this builder.
         /// </summary>
-        public Observable<T> Task => new Observable<T>(_result ?? (_result = new AsyncSubject<T>()));
+        public ITaskObservable<T> Task => _inner ?? (_inner = new TaskObservable());
 
         /// <summary>
         /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
@@ -211,7 +186,164 @@ namespace System.Runtime.CompilerServices
         /// <param name="exception">The exception to rethrow.</param>
         private static void Rethrow(Exception exception)
         {
-            Scheduler.Default.Schedule(ExceptionDispatchInfo.Capture(exception), (state, recurse) => state.Throw());
+            Scheduler.Default.Schedule(exception, (ex, recurse) => ex.Throw());
+        }
+
+        /// <summary>
+        /// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
+        /// </summary>
+        /// <remarks>
+        /// This class implements a "task-like" type that can be used as the return type of an asynchronous
+        /// method in C# 7.0 and beyond. For example:
+        /// <code>
+        /// async Observable&lt;int&gt; RxAsync()
+        /// {
+        ///     var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
+        ///     return res * 2;
+        /// }
+        /// </code>
+        /// </remarks>
+        /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
+        internal sealed class TaskObservable : ITaskObservable<T>, ITaskObservableAwaiter<T>
+        {
+            /// <summary>
+            /// The underlying observable sequence to subscribe to in case the asynchronous method did not
+            /// finish synchronously.
+            /// </summary>
+            private readonly AsyncSubject<T> _subject;
+
+            /// <summary>
+            /// The result returned by the asynchronous method in case the method finished synchronously.
+            /// </summary>
+            private readonly T _result;
+
+            /// <summary>
+            /// The exception thrown by the asynchronous method in case the method finished synchronously.
+            /// </summary>
+            private readonly Exception _exception;
+
+            /// <summary>
+            /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that has not finished yet.
+            /// </summary>
+            public TaskObservable()
+            {
+                _subject = new AsyncSubject<T>();
+            }
+
+            /// <summary>
+            /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously returned
+            /// the specified <paramref name="result"/> value.
+            /// </summary>
+            /// <param name="result">The result returned by the asynchronous method.</param>
+            public TaskObservable(T result)
+            {
+                _result = result;
+            }
+
+            /// <summary>
+            /// Creates a new <see cref="TaskObservable"/> for an asynchronous method that synchronously threw
+            /// the specified <paramref name="exception"/>.
+            /// </summary>
+            /// <param name="exception">The exception thrown by the asynchronous method.</param>
+            public TaskObservable(Exception exception)
+            {
+                _exception = exception;
+            }
+
+            /// <summary>
+            /// Marks the observable as successfully completed.
+            /// </summary>
+            /// <param name="result">The result to use to complete the observable sequence.</param>
+            /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
+            public void SetResult(T result)
+            {
+                if (IsCompleted)
+                    throw new InvalidOperationException();
+
+                _subject.OnNext(result);
+                _subject.OnCompleted();
+            }
+
+            /// <summary>
+            /// Marks the observable as failed and binds the specified exception to the observable sequence.
+            /// </summary>
+            /// <param name="exception">The exception to bind to the observable sequence.</param>
+            /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
+            /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
+            public void SetException(Exception exception)
+            {
+                if (IsCompleted)
+                    throw new InvalidOperationException();
+
+                _subject.OnError(exception);
+            }
+
+            /// <summary>
+            /// Subscribes the given observer to the observable sequence.
+            /// </summary>
+            /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
+            /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
+            /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+            public IDisposable Subscribe(IObserver<T> observer)
+            {
+                if (_subject != null)
+                {
+                    return _subject.Subscribe(observer);
+                }
+                else if (_exception != null)
+                {
+                    observer.OnError(_exception);
+                    return Disposable.Empty;
+                }
+                else
+                {
+                    observer.OnNext(_result);
+                    return Disposable.Empty;
+                }
+            }
+
+            /// <summary>
+            /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
+            /// </summary>
+            /// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
+            public ITaskObservableAwaiter<T> GetAwaiter() => this;
+
+            /// <summary>
+            /// Gets a Boolean indicating whether the observable sequence has completed.
+            /// </summary>
+            public bool IsCompleted => _subject?.IsCompleted ?? true;
+
+            /// <summary>
+            /// Gets the result produced by the observable sequence.
+            /// </summary>
+            /// <returns>The result produced by the observable sequence.</returns>
+            public T GetResult()
+            {
+                if (_subject != null)
+                {
+                    return _subject.GetResult();
+                }
+
+                _exception.ThrowIfNotNull();
+
+                return _result;
+            }
+
+            /// <summary>
+            /// Attaches the specified <paramref name="continuation"/> to the observable sequence.
+            /// </summary>
+            /// <param name="continuation">The continuation to attach.</param>
+            public void OnCompleted(Action continuation)
+            {
+                if (_subject != null)
+                {
+                    _subject.OnCompleted(continuation);
+                }
+                else
+                {
+                    continuation();
+                }
+            }
         }
     }
 }

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Subjects/AsyncSubject.cs

@@ -297,7 +297,7 @@ namespace System.Reactive.Subjects
             this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
         }
 
-        class AwaitObserver : IObserver<T>
+        private sealed class AwaitObserver : IObserver<T>
         {
             private readonly SynchronizationContext _context;
             private readonly Action _callback;

+ 53 - 0
Rx.NET/Source/src/System.Reactive/TaskObservable.cs

@@ -0,0 +1,53 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Runtime.CompilerServices;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Extension of the <see cref="IObservable{T}"/> interface compatible with async method return types.
+    /// </summary>
+    /// <remarks>
+    /// This class implements a "task-like" type that can be used as the return type of an asynchronous
+    /// method in C# 7.0 and beyond. For example:
+    /// <code>
+    /// async ITaskObservable&lt;int&gt; RxAsync()
+    /// {
+    ///     var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
+    ///     return res * 2;
+    /// }
+    /// </code>
+    /// </remarks>
+    /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
+    [AsyncMethodBuilder(typeof(TaskObservableMethodBuilder<>))]
+    public interface ITaskObservable<out T> : IObservable<T>
+    {
+        // NB: An interface type is preferred to enable the use of covariance.
+
+        /// <summary>
+        /// Gets an awaiter that can be used to await the eventual completion of the observable sequence.
+        /// </summary>
+        /// <returns>An awaiter that can be used to await the eventual completion of the observable sequence.</returns>
+        ITaskObservableAwaiter<T> GetAwaiter();
+    }
+
+    /// <summary>
+    /// Interface representing an awaiter for an <see cref="ITaskObservable{T}"/>.
+    /// </summary>
+    /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
+    public interface ITaskObservableAwaiter<out T> : INotifyCompletion
+    {
+        /// <summary>
+        /// Gets a Boolean indicating whether the observable sequence has completed.
+        /// </summary>
+        bool IsCompleted { get; }
+
+        /// <summary>
+        /// Gets the result produced by the observable sequence.
+        /// </summary>
+        /// <returns>The result produced by the observable sequence.</returns>
+        T GetResult();
+    }
+}

+ 3 - 3
Rx.NET/Source/tests/Tests.System.Reactive/Tests/TaskLikeSupportTest.cs

@@ -19,7 +19,7 @@ namespace Tests.System.Reactive.Tests
         }
 
 #pragma warning disable 1998
-        private async Observable<int> ManOrBoy_Return()
+        private async ITaskObservable<int> ManOrBoy_Return()
         {
             return 42;
         }
@@ -32,7 +32,7 @@ namespace Tests.System.Reactive.Tests
         }
 
 #pragma warning disable 1998
-        private async Observable<int> ManOrBoy_Throw(int n, int d)
+        private async ITaskObservable<int> ManOrBoy_Throw(int n, int d)
         {
             return n / d;
         }
@@ -45,7 +45,7 @@ namespace Tests.System.Reactive.Tests
         }
 
 #pragma warning disable 1998
-        private async Observable<int> ManOrBoy_Basics()
+        private async ITaskObservable<int> ManOrBoy_Basics()
         {
             var res = 0;