1
0
Эх сурвалжийг харах

Initial implementation of C# 7.0 task-like support for observable sequences.

Bart De Smet 8 жил өмнө
parent
commit
91e26a619b

+ 55 - 0
Rx.NET/Source/src/System.Reactive/Observable.cs

@@ -0,0 +1,55 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Runtime.CompilerServices;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Implementation of the IObservable&lt;T&gt; interface compatible with async method return types.
+    /// </summary>
+    /// <remarks>
+    /// This class implements a "task-like" type that can be used as the return type of an asynchronous
+    /// method in C# 7.0 and beyond. For example:
+    /// <code>
+    /// async Observable&lt;int&gt; RxAsync()
+    /// {
+    ///     var res = await Observable.Return(21).Delay(TimeSpan.FromSeconds(1));
+    ///     return res * 2;
+    /// }
+    /// </code>
+    /// </remarks>
+    /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
+    [AsyncMethodBuilder(typeof(ObservableMethodBuilder<>))]
+    public sealed class Observable<T> : IObservable<T>
+    {
+        /// <summary>
+        /// The underlying observable sequence to subscribe to.
+        /// </summary>
+        private readonly IObservable<T> _inner;
+
+        /// <summary>
+        /// Creates a new task-like observable instance using the specified <paramref name="inner"/> observable sequence.
+        /// </summary>
+        /// <param name="inner">The underlying observable sequence to subscribe to.</param>
+        internal Observable(IObservable<T> inner)
+        {
+            _inner = inner;
+        }
+
+        /// <summary>
+        /// Subscribes the given observer to the observable sequence.
+        /// </summary>
+        /// <param name="observer">Observer that will receive notifications from the observable sequence.</param>
+        /// <returns>Disposable object representing an observer's subscription to the observable sequence.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="observer"/> is null.</exception>
+        public IDisposable Subscribe(IObserver<T> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            return _inner.Subscribe(observer);
+        }
+    }
+}

+ 2 - 2
Rx.NET/Source/src/System.Reactive/ObservableBase.cs

@@ -8,7 +8,7 @@ using System.Reactive.Disposables;
 namespace System.Reactive
 {
     /// <summary>
-    /// Abstract base class for implementations of the IObservable&lt;T&gt; interface.
+    /// Abstract base class for implementations of the <see cref="IObservable{T}"/> interface.
     /// </summary>
     /// <remarks>
     /// If you don't need a named type to create an observable sequence (i.e. you rather need
@@ -30,7 +30,7 @@ namespace System.Reactive
                 throw new ArgumentNullException(nameof(observer));
 
             var autoDetachObserver = new AutoDetachObserver<T>(observer);
-            
+
             if (CurrentThreadScheduler.IsScheduleRequired)
             {
                 //

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Runtime/CompilerServices/AsyncMethodBuilderAttribute.cs

@@ -0,0 +1,20 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Runtime.CompilerServices
+{
+    /// <summary>
+    /// Attribute to decorate a task-like type to specify a compatible asynchronous method builder.
+    /// </summary>
+    internal sealed class AsyncMethodBuilderAttribute : Attribute
+    {
+        /// <summary>
+        /// Creates a new instance of the attribute using the specified <paramref name="type"/>.
+        /// </summary>
+        /// <param name="type">The type implementing the asynchronous method builder.</param>
+        public AsyncMethodBuilderAttribute(Type type)
+        {
+        }
+    }
+}

+ 217 - 0
Rx.NET/Source/src/System.Reactive/Runtime/CompilerServices/ObservableMethodBuilder.cs

@@ -0,0 +1,217 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+using System.Runtime.ExceptionServices;
+using System.Security;
+
+namespace System.Runtime.CompilerServices
+{
+    /// <summary>
+    /// Represents a builder for asynchronous methods that return a task-like <see cref="Observable{T}"/>.
+    /// </summary>
+    /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
+    public struct ObservableMethodBuilder<T>
+    {
+        /// <summary>
+        /// The compiler-generated asynchronous state machine representing the execution flow of the asynchronous
+        /// method whose return type is a task-like <see cref="Observable{T}"/>.
+        /// </summary>
+        private IAsyncStateMachine _stateMachine;
+
+        /// <summary>
+        /// The underlying observable sequence representing the result produced by the asynchronous method.
+        /// </summary>
+        private IObservable<T> _result;
+
+        /// <summary>
+        /// Creates an instance of the <see cref="ObservableMethodBuilder{T}"/> struct.
+        /// </summary>
+        /// <returns>A new instance of the struct.</returns>
+        public static ObservableMethodBuilder<T> Create() => default(ObservableMethodBuilder<T>);
+
+        /// <summary>
+        /// Begins running the builder with the associated state machine.
+        /// </summary>
+        /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
+        /// <param name="stateMachine">The state machine instance, passed by reference.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
+        public void Start<TStateMachine>(ref TStateMachine stateMachine)
+            where TStateMachine : IAsyncStateMachine
+        {
+            if (stateMachine == null)
+                throw new ArgumentNullException(nameof(stateMachine));
+
+            stateMachine.MoveNext();
+        }
+
+        /// <summary>
+        /// Associates the builder with the specified state machine.
+        /// </summary>
+        /// <param name="stateMachine">The state machine instance to associate with the builder.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="stateMachine"/> is <c>null</c>.</exception>
+        /// <exception cref="InvalidOperationException">The state machine was previously set.</exception>
+        public void SetStateMachine(IAsyncStateMachine stateMachine)
+        {
+            if (stateMachine == null)
+                throw new ArgumentNullException(nameof(stateMachine));
+
+            if (_stateMachine != null)
+                throw new InvalidOperationException();
+
+            _stateMachine = stateMachine;
+        }
+
+        /// <summary>
+        /// Marks the observable as successfully completed.
+        /// </summary>
+        /// <param name="result">The result to use to complete the observable sequence.</param>
+        /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
+        public void SetResult(T result)
+        {
+            if (_result == null)
+            {
+                _result = Observable.Return<T>(result);
+            }
+            else
+            {
+                var subject = _result as AsyncSubject<T>;
+
+                // NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
+                //
+                //     We can live with this limitation and merely put in this check to catch invalid
+                //     manual usage for which behavior is undefined. In the compiler-generated code that
+                //     interacts with the asynchronous method builder, no concurrent calls to the Set*
+                //     methods should occur.
+
+                if (subject == null || subject.IsCompleted)
+                    throw new InvalidOperationException();
+
+                subject.OnNext(result);
+                subject.OnCompleted();
+            }
+        }
+
+        /// <summary>
+        /// Marks the observable as failed and binds the specified exception to the observable sequence.
+        /// </summary>
+        /// <param name="exception">The exception to bind to the observable sequence.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="exception"/> is <c>null</c>.</exception>
+        /// <exception cref="InvalidOperationException">The observable has already completed.</exception>
+        public void SetException(Exception exception)
+        {
+            if (exception == null)
+                throw new ArgumentNullException(nameof(exception));
+
+            if (_result == null)
+            {
+                _result = Observable.Throw<T>(exception);
+            }
+            else
+            {
+                var subject = _result as AsyncSubject<T>;
+
+                // NB: The IsCompleted is not protected by the subject's lock, so we could get a dirty read.
+                //
+                //     We can live with this limitation and merely put in this check to catch invalid
+                //     manual usage for which behavior is undefined. In the compiler-generated code that
+                //     interacts with the asynchronous method builder, no concurrent calls to the Set*
+                //     methods should occur.
+
+                if (subject == null || subject.IsCompleted)
+                    throw new InvalidOperationException();
+
+                subject.OnError(exception);
+            }
+        }
+
+        /// <summary>
+        /// Gets the observable sequence for this builder.
+        /// </summary>
+        public Observable<T> Task => new Observable<T>(_result ?? (_result = new AsyncSubject<T>()));
+
+        /// <summary>
+        /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
+        /// </summary>
+        /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
+        /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
+        /// <param name="awaiter">The awaiter.</param>
+        /// <param name="stateMachine">The state machine.</param>
+        public void AwaitOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
+            where TAwaiter : INotifyCompletion
+            where TStateMachine : IAsyncStateMachine
+        {
+            try
+            {
+                if (_stateMachine == null)
+                {
+                    var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
+
+                    _stateMachine = stateMachine;
+                    _stateMachine.SetStateMachine(_stateMachine);
+                }
+
+                // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
+
+                awaiter.OnCompleted(_stateMachine.MoveNext);
+            }
+            catch (Exception ex)
+            {
+                // NB: Prevent reentrancy into the async state machine when an exception would be observed
+                //     by the caller. This could cause concurrent execution of the async method. Instead,
+                //     rethrow the exception elsewhere.
+
+                Rethrow(ex);
+            }
+        }
+
+        /// <summary>
+        /// Schedules the state machine to proceed to the next action when the specified awaiter completes.
+        /// </summary>
+        /// <typeparam name="TAwaiter">The type of the awaiter.</typeparam>
+        /// <typeparam name="TStateMachine">The type of the state machine.</typeparam>
+        /// <param name="awaiter">The awaiter.</param>
+        /// <param name="stateMachine">The state machine.</param>
+        [SecuritySafeCritical]
+        public void AwaitUnsafeOnCompleted<TAwaiter, TStateMachine>(ref TAwaiter awaiter, ref TStateMachine stateMachine)
+            where TAwaiter : ICriticalNotifyCompletion
+            where TStateMachine : IAsyncStateMachine
+        {
+            try
+            {
+                if (_stateMachine == null)
+                {
+                    var ignored = Task; // NB: Ensure we have the observable backed by an async subject ready.
+
+                    _stateMachine = stateMachine;
+                    _stateMachine.SetStateMachine(_stateMachine);
+                }
+
+                // NB: Rx has historically not bothered with execution contexts, so we don't do it here either.
+
+                awaiter.UnsafeOnCompleted(_stateMachine.MoveNext);
+            }
+            catch (Exception ex)
+            {
+                // NB: Prevent reentrancy into the async state machine when an exception would be observed
+                //     by the caller. This could cause concurrent execution of the async method. Instead,
+                //     rethrow the exception elsewhere.
+
+                Rethrow(ex);
+            }
+        }
+
+        /// <summary>
+        /// Rethrows an exception that was thrown from an awaiter's OnCompleted methods.
+        /// </summary>
+        /// <param name="exception">The exception to rethrow.</param>
+        private static void Rethrow(Exception exception)
+        {
+            Scheduler.Default.Schedule(ExceptionDispatchInfo.Capture(exception), (state, recurse) => state.Throw());
+        }
+    }
+}

+ 75 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/TaskLikeSupportTest.cs

@@ -0,0 +1,75 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Reactive;
+using System.Reactive.Linq;
+using System.Threading.Tasks;
+using Xunit;
+
+namespace Tests.System.Reactive.Tests
+{
+    public class TaskLikeSupportTest
+    {
+        [Fact]
+        public async Task Return()
+        {
+            Assert.Equal(42, await ManOrBoy_Return());
+        }
+
+#pragma warning disable 1998
+        private async Observable<int> ManOrBoy_Return()
+        {
+            return 42;
+        }
+#pragma warning restore 1998
+
+        [Fact]
+        public async Task Throw()
+        {
+            await Assert.ThrowsAsync<DivideByZeroException>(async () => await ManOrBoy_Throw(42, 0));
+        }
+
+#pragma warning disable 1998
+        private async Observable<int> ManOrBoy_Throw(int n, int d)
+        {
+            return n / d;
+        }
+#pragma warning restore 1998
+
+        [Fact]
+        public async Task Basics()
+        {
+            Assert.Equal(45, await ManOrBoy_Basics());
+        }
+
+#pragma warning disable 1998
+        private async Observable<int> ManOrBoy_Basics()
+        {
+            var res = 0;
+
+            for (var i = 0; i < 10; i++)
+            {
+                switch (i % 4)
+                {
+                    case 0:
+                        res += await Observable.Return(i);
+                        break;
+                    case 1:
+                        res += await Observable.Return(i).Delay(TimeSpan.FromMilliseconds(50));
+                        break;
+                    case 2:
+                        res += await Task.FromResult(i);
+                        break;
+                    case 3:
+                        res += await Task.Run(() => { Task.Delay(50).Wait(); return i; });
+                        break;
+                }
+            }
+
+            return res;
+        }
+#pragma warning restore 1998
+    }
+}