Browse Source

Adding some more ReplayAsyncSubject constructor overloads.

Bart De Smet 8 years ago
parent
commit
9cac1379c4

+ 9 - 125
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/FastImmediateAsyncObserver.cs

@@ -2,143 +2,27 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Collections.Generic;
-using System.Threading;
+using System.Reactive.Disposables;
+using System.Runtime.CompilerServices;
 using System.Threading.Tasks;
 
 namespace System.Reactive
 {
-    internal sealed class FastImmediateAsyncObserver<T> : IScheduledAsyncObserver<T>
+    internal sealed class FastImmediateAsyncObserver<T> : ScheduledAsyncObserverBase<T>
     {
-        private readonly IAsyncObserver<T> _observer;
-
-        private readonly AsyncLock _lock = new AsyncLock();
-        private readonly Queue<T> _queue = new Queue<T>();
-
-        private bool _hasFaulted = false;
-        private bool _busy = false;
-        private bool _done;
-        private Exception _error;
+        private readonly CancellationAsyncDisposable _disposable = new CancellationAsyncDisposable();
 
         public FastImmediateAsyncObserver(IAsyncObserver<T> observer)
+            : base(observer)
         {
-            _observer = observer;
         }
 
-        public Task EnsureActive() => EnsureActive(1);
-
-        public async Task EnsureActive(int count)
-        {
-            var shouldRun = false;
-
-            using (await _lock.LockAsync().ConfigureAwait(false))
-            {
-                if (!_hasFaulted && !_busy)
-                {
-                    _busy = true;
-                    shouldRun = true;
-                }
-            }
-
-            if (shouldRun)
-            {
-                while (true)
-                {
-                    var values = default(T[]);
-                    var error = default(Exception);
-                    var done = false;
-
-                    using (await _lock.LockAsync().ConfigureAwait(false))
-                    {
-                        if (_queue.Count > 0)
-                        {
-                            values = _queue.ToArray();
-                            _queue.Clear();
-                        }
-
-                        if (_done)
-                        {
-                            done = _done;
-                            error = _error;
-                        }
-
-                        if (values == null && !done)
-                        {
-                            _busy = false;
-                            break;
-                        }
-                    }
-
-                    try
-                    {
-                        if (values != null)
-                        {
-                            foreach (var value in values)
-                            {
-                                await _observer.OnNextAsync(value).ConfigureAwait(false);
-                            }
-                        }
+        public override Task DisposeAsync() => _disposable.DisposeAsync();
 
-                        if (done)
-                        {
-                            if (error != null)
-                            {
-                                await _observer.OnErrorAsync(error).ConfigureAwait(false);
-                            }
-                            else
-                            {
-                                await _observer.OnCompletedAsync().ConfigureAwait(false);
-                            }
+        protected override ConfiguredTaskAwaitable RendezVous(Task task) => task.ConfigureAwait(false);
 
-                            break;
-                        }
-                    }
-                    catch
-                    {
-                        using (await _lock.LockAsync().ConfigureAwait(false))
-                        {
-                            _hasFaulted = true;
-                            _queue.Clear();
-                        }
+        protected override ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task) => task.ConfigureAwait(false);
 
-                        throw;
-                    }
-                }
-            }
-        }
-
-        public async Task OnCompletedAsync()
-        {
-            using (await _lock.LockAsync().ConfigureAwait(false))
-            {
-                if (!_hasFaulted)
-                {
-                    _done = true;
-                }
-            }
-        }
-
-        public async Task OnErrorAsync(Exception error)
-        {
-            using (await _lock.LockAsync().ConfigureAwait(false))
-            {
-                if (!_hasFaulted)
-                {
-                    _done = true;
-                    _error = error;
-                }
-            }
-        }
-
-        public async Task OnNextAsync(T value)
-        {
-            using (await _lock.LockAsync().ConfigureAwait(false))
-            {
-                if (!_hasFaulted)
-                {
-                    _queue.Enqueue(value);
-                }
-            }
-        }
+        protected override Task ScheduleAsync() => RunAsync(_disposable.Token);
     }
 }

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ISchedulerAsyncObserver.cs

@@ -6,7 +6,7 @@ using System.Threading.Tasks;
 
 namespace System.Reactive
 {
-    internal interface IScheduledAsyncObserver<T> : IAsyncObserver<T> // TODO: implement IAsyncDisposable
+    internal interface IScheduledAsyncObserver<T> : IAsyncObserver<T>, IAsyncDisposable
     {
         Task EnsureActive();
 

+ 39 - 0
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserver.cs

@@ -0,0 +1,39 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
+using System.Runtime.CompilerServices;
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    internal sealed class ScheduledAsyncObserver<T> : ScheduledAsyncObserverBase<T>
+    {
+        private readonly IAsyncObserver<T> _observer;
+        private readonly IAsyncScheduler _scheduler;
+
+        private readonly SerialAsyncDisposable _disposable = new SerialAsyncDisposable();
+
+        public ScheduledAsyncObserver(IAsyncObserver<T> observer, IAsyncScheduler scheduler)
+            : base(observer)
+        {
+            _scheduler = scheduler;
+        }
+
+        public override Task DisposeAsync() => _disposable.DisposeAsync();
+
+        // TODO: Implement proper RendezVous semantics.
+
+        protected override ConfiguredTaskAwaitable RendezVous(Task task) => task.ConfigureAwait(false);
+
+        protected override ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task) => task.ConfigureAwait(false);
+
+        protected override async Task ScheduleAsync()
+        {
+            var d = await _scheduler.ScheduleAsync(RunAsync).ConfigureAwait(false);
+            await _disposable.AssignAsync(d).ConfigureAwait(false);
+        }
+    }
+}

+ 158 - 0
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/ScheduledAsyncObserverBase.cs

@@ -0,0 +1,158 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    internal abstract class ScheduledAsyncObserverBase<T> : IScheduledAsyncObserver<T>
+    {
+        private readonly IAsyncObserver<T> _observer;
+
+        private readonly AsyncLock _lock = new AsyncLock();
+        private readonly Queue<T> _queue = new Queue<T>();
+
+        private bool _hasFaulted = false;
+        private bool _busy = false;
+        private bool _done;
+        private Exception _error;
+
+        public ScheduledAsyncObserverBase(IAsyncObserver<T> observer)
+        {
+            _observer = observer;
+        }
+
+        public Task EnsureActive() => EnsureActive(1);
+
+        public async Task EnsureActive(int count)
+        {
+            var shouldRun = false;
+
+            using (await _lock.LockAsync().ConfigureAwait(false))
+            {
+                if (!_hasFaulted && !_busy)
+                {
+                    _busy = true;
+                    shouldRun = true;
+                }
+            }
+
+            if (shouldRun)
+            {
+                await ScheduleAsync().ConfigureAwait(false);
+            }
+        }
+
+        protected abstract Task ScheduleAsync();
+
+        protected async Task RunAsync(CancellationToken token)
+        {
+            while (!token.IsCancellationRequested)
+            {
+                var values = default(T[]);
+                var error = default(Exception);
+                var done = false;
+
+                using (await RendezVous(_lock.LockAsync()))
+                {
+                    if (_queue.Count > 0)
+                    {
+                        values = _queue.ToArray();
+                        _queue.Clear();
+                    }
+
+                    if (_done)
+                    {
+                        done = _done;
+                        error = _error;
+                    }
+
+                    if (values == null && !done)
+                    {
+                        _busy = false;
+                        break;
+                    }
+                }
+
+                try
+                {
+                    if (values != null)
+                    {
+                        foreach (var value in values)
+                        {
+                            await RendezVous(_observer.OnNextAsync(value));
+                        }
+                    }
+
+                    if (done)
+                    {
+                        if (error != null)
+                        {
+                            await RendezVous(_observer.OnErrorAsync(error));
+                        }
+                        else
+                        {
+                            await RendezVous(_observer.OnCompletedAsync());
+                        }
+
+                        break;
+                    }
+                }
+                catch
+                {
+                    using (await RendezVous(_lock.LockAsync()))
+                    {
+                        _hasFaulted = true;
+                        _queue.Clear();
+                    }
+
+                    throw;
+                }
+            }
+        }
+
+        public async Task OnCompletedAsync()
+        {
+            using (await _lock.LockAsync().ConfigureAwait(false))
+            {
+                if (!_hasFaulted)
+                {
+                    _done = true;
+                }
+            }
+        }
+
+        public async Task OnErrorAsync(Exception error)
+        {
+            using (await _lock.LockAsync().ConfigureAwait(false))
+            {
+                if (!_hasFaulted)
+                {
+                    _done = true;
+                    _error = error;
+                }
+            }
+        }
+
+        public async Task OnNextAsync(T value)
+        {
+            using (await _lock.LockAsync().ConfigureAwait(false))
+            {
+                if (!_hasFaulted)
+                {
+                    _queue.Enqueue(value);
+                }
+            }
+        }
+
+        protected abstract ConfiguredTaskAwaitable RendezVous(Task task);
+
+        protected abstract ConfiguredTaskAwaitable<R> RendezVous<R>(Task<R> task);
+
+        public abstract Task DisposeAsync();
+    }
+}

+ 67 - 13
AsyncRx.NET/System.Reactive.Async.Subjects/System/Reactive/Subjects/ReplayAsyncSubject.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Linq;
+using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading.Tasks;
@@ -21,6 +22,16 @@ namespace System.Reactive.Subjects
             : base(false, bufferSize)
         {
         }
+
+        public SequentialReplayAsyncSubject(IAsyncScheduler scheduler)
+            : base(false, scheduler)
+        {
+        }
+
+        public SequentialReplayAsyncSubject(int bufferSize, IAsyncScheduler scheduler)
+            : base(false, bufferSize, scheduler)
+        {
+        }
     }
 
     public sealed class ConcurrentReplayAsyncSubject<T> : ReplayAsyncSubject<T>
@@ -34,6 +45,16 @@ namespace System.Reactive.Subjects
             : base(true, bufferSize)
         {
         }
+
+        public ConcurrentReplayAsyncSubject(IAsyncScheduler scheduler)
+            : base(true, scheduler)
+        {
+        }
+
+        public ConcurrentReplayAsyncSubject(int bufferSize, IAsyncScheduler scheduler)
+            : base(true, bufferSize, scheduler)
+        {
+        }
     }
 
     public abstract class ReplayAsyncSubject<T> : IAsyncSubject<T>
@@ -52,18 +73,48 @@ namespace System.Reactive.Subjects
 
             if (bufferSize == 1)
             {
-                _impl = new ReplayOne(concurrent);
+                _impl = new ReplayOne(concurrent, CreateImmediateObserver);
             }
             else if (bufferSize == int.MaxValue)
             {
-                _impl = new ReplayAll(concurrent);
+                _impl = new ReplayAll(concurrent, CreateImmediateObserver);
             }
             else
             {
-                _impl = new ReplayMany(concurrent, bufferSize);
+                _impl = new ReplayMany(concurrent, CreateImmediateObserver, bufferSize);
             }
         }
 
+        public ReplayAsyncSubject(bool concurrent, IAsyncScheduler scheduler)
+            : this(concurrent, int.MaxValue, scheduler)
+        {
+        }
+
+        public ReplayAsyncSubject(bool concurrent, int bufferSize, IAsyncScheduler scheduler)
+        {
+            if (bufferSize < 0)
+                throw new ArgumentNullException(nameof(bufferSize));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            if (bufferSize == 1)
+            {
+                _impl = new ReplayOne(concurrent, o => CreateScheduledObserver(o, scheduler));
+            }
+            else if (bufferSize == int.MaxValue)
+            {
+                _impl = new ReplayAll(concurrent, o => CreateScheduledObserver(o, scheduler));
+            }
+            else
+            {
+                _impl = new ReplayMany(concurrent, o => CreateScheduledObserver(o, scheduler), bufferSize);
+            }
+        }
+
+        private static IScheduledAsyncObserver<T> CreateImmediateObserver(IAsyncObserver<T> observer) => new FastImmediateAsyncObserver<T>(observer);
+
+        private static IScheduledAsyncObserver<T> CreateScheduledObserver(IAsyncObserver<T> observer, IAsyncScheduler scheduler) => new ScheduledAsyncObserver<T>(observer, scheduler);
+
         public Task OnCompletedAsync() => _impl.OnCompletedAsync();
 
         public Task OnErrorAsync(Exception error) => _impl.OnErrorAsync(error ?? throw new ArgumentNullException(nameof(error)));
@@ -271,12 +322,15 @@ namespace System.Reactive.Subjects
 
         private abstract class ReplayBufferBase : ReplayBase
         {
-            public ReplayBufferBase(bool concurrent)
+            private readonly Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> _createObserver;
+
+            public ReplayBufferBase(bool concurrent, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver)
                 : base(concurrent)
             {
+                _createObserver = createObserver;
             }
 
-            protected override IScheduledAsyncObserver<T> CreateScheduledObserver(IAsyncObserver<T> observer) => new FastImmediateAsyncObserver<T>(observer);
+            protected override IScheduledAsyncObserver<T> CreateScheduledObserver(IAsyncObserver<T> observer) => _createObserver(observer);
         }
 
         private sealed class ReplayOne : ReplayBufferBase
@@ -284,8 +338,8 @@ namespace System.Reactive.Subjects
             private bool _hasValue;
             private T _value;
 
-            public ReplayOne(bool concurrent)
-                : base(concurrent)
+            public ReplayOne(bool concurrent, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver)
+                : base(concurrent, createObserver)
             {
             }
 
@@ -315,8 +369,8 @@ namespace System.Reactive.Subjects
         {
             protected readonly Queue<T> _values = new Queue<T>();
 
-            public ReplayManyBase(bool concurrent)
-                : base(concurrent)
+            public ReplayManyBase(bool concurrent, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver)
+                : base(concurrent, createObserver)
             {
             }
 
@@ -344,8 +398,8 @@ namespace System.Reactive.Subjects
         {
             private readonly int _bufferSize;
 
-            public ReplayMany(bool concurrent, int bufferSize)
-                : base(concurrent)
+            public ReplayMany(bool concurrent, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver, int bufferSize)
+                : base(concurrent, createObserver)
             {
                 _bufferSize = bufferSize;
             }
@@ -363,8 +417,8 @@ namespace System.Reactive.Subjects
 
         private sealed class ReplayAll : ReplayManyBase
         {
-            public ReplayAll(bool concurrent)
-                : base(concurrent)
+            public ReplayAll(bool concurrent, Func<IAsyncObserver<T>, IScheduledAsyncObserver<T>> createObserver)
+                : base(concurrent, createObserver)
             {
             }