Selaa lähdekoodia

Adding await support.

Bart De Smet 8 vuotta sitten
vanhempi
sitoutus
9a10aa34f3

+ 6 - 0
AsyncRx.NET/Playground/Program.cs

@@ -22,6 +22,7 @@ namespace Playground
 
         static async Task MainAsync()
         {
+            await AwaitAsync();
             await BufferTimeHoppingAsync();
             await BufferTimeSlidingAsync();
             await ConcatAsync();
@@ -34,6 +35,11 @@ namespace Playground
             await TimerAsync();
         }
 
+        static async Task AwaitAsync()
+        {
+            Console.WriteLine(await AsyncObservable.Range(0, 10));
+        }
+
         static async Task BufferTimeHoppingAsync()
         {
             await

+ 55 - 0
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/GetAwaiter.cs

@@ -0,0 +1,55 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive.Subjects;
+
+namespace System.Reactive.Linq
+{
+    partial class AsyncObservable
+    {
+        public static AsyncAsyncSubject<TSource> GetAwaiter<TSource>(this IAsyncObservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            var subject = new SequentialAsyncAsyncSubject<TSource>();
+
+            var subscribeTask = source.SubscribeSafeAsync(subject);
+
+            subscribeTask.ContinueWith(t =>
+            {
+                if (t.Exception != null)
+                {
+                    subject.OnErrorAsync(t.Exception); // NB: Should not occur due to use of SubscribeSafeAsync.
+                }
+            });
+
+            return subject;
+        }
+
+        public static AsyncAsyncSubject<TSource> GetAwaiter<TSource>(this IConnectableAsyncObservable<TSource> source)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            var subject = new SequentialAsyncAsyncSubject<TSource>();
+
+            var subscribeTask = source.SubscribeSafeAsync(subject);
+
+            subscribeTask.ContinueWith(t =>
+            {
+                if (t.Exception != null)
+                {
+                    subject.OnErrorAsync(t.Exception); // NB: Should not occur due to use of SubscribeSafeAsync.
+                }
+                else
+                {
+                    source.ConnectAsync();
+                }
+            });
+
+            return subject;
+        }
+    }
+}

+ 89 - 1
AsyncRx.NET/System.Reactive.Async/System/Reactive/Subjects/AsyncAsyncSubject.cs

@@ -4,11 +4,14 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
+using System.Runtime.CompilerServices;
+using System.Runtime.ExceptionServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Subjects
 {
-    public abstract class AsyncAsyncSubject<T> : IAsyncSubject<T>
+    public abstract class AsyncAsyncSubject<T> : IAsyncSubject<T>, INotifyCompletion
     {
         private readonly object _gate = new object();
         private readonly List<IAsyncObserver<T>> _observers = new List<IAsyncObserver<T>>();
@@ -146,5 +149,90 @@ namespace System.Reactive.Subjects
                 return Task.CompletedTask;
             });
         }
+
+        public AsyncAsyncSubject<T> GetAwaiter() => this;
+
+        public bool IsCompleted => _done || _error != null;
+
+        public T GetResult()
+        {
+            if (!IsCompleted)
+            {
+                var e = new ManualResetEventSlim(initialState: false);
+
+                OnCompleted(() => { e.Set(); }, originalContext: false);
+
+                e.Wait();
+            }
+
+            if (_error != null)
+            {
+                ExceptionDispatchInfo.Capture(_error).Throw();
+            }
+
+            if (!_hasValue)
+            {
+                throw new InvalidOperationException("The subject has no value.");
+            }
+
+            return _value;
+        }
+
+        public void OnCompleted(Action continuation)
+        {
+            if (continuation == null)
+                throw new ArgumentNullException(nameof(continuation));
+
+            OnCompleted(continuation, originalContext: true);
+        }
+
+        private void OnCompleted(Action continuation, bool originalContext)
+        {
+            var subscribeTask = SubscribeAsync(new AwaitObserver(continuation, originalContext));
+
+            subscribeTask.ContinueWith(t =>
+            {
+                if (t.Exception != null)
+                {
+                    // TODO: Trace?
+                }
+            });
+        }
+
+        private sealed class AwaitObserver : IAsyncObserver<T>
+        {
+            private readonly Action _continuation;
+            private readonly SynchronizationContext _context;
+
+            public AwaitObserver(Action continuation, bool originalContext)
+            {
+                _continuation = continuation;
+
+                if (originalContext)
+                {
+                    _context = SynchronizationContext.Current;
+                }
+            }
+
+            public Task OnCompletedAsync() => InvokeAsync();
+
+            public Task OnErrorAsync(Exception error) => InvokeAsync();
+
+            public Task OnNextAsync(T value) => Task.CompletedTask;
+
+            private Task InvokeAsync()
+            {
+                if (_context != null)
+                {
+                    _context.Post(c => ((Action)c)(), _continuation);
+                }
+                else
+                {
+                    _continuation();
+                }
+
+                return Task.CompletedTask;
+            }
+        }
     }
 }