Selaa lähdekoodia

Enable application-defined Synchronize gate

Our AsyncGate does not support cancellation, and for now at least we don't want to add it. (For one thing, it opens the can of worms of whether we want to attempt to support cancellation across the board in AsyncRx.NET. But also, more or less everyone who tries to add cancellation support to this sort of primitive ends up creating subtle bugs.)

So this defines an IAsyncGate interface and Synchronize overloads that accept it, enabling them to work with application-supplied implementations.
Ian Griffiths 1 vuosi sitten
vanhempi
sitoutus
beb2a3f422

+ 3 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Reactive.Threading;
 using System.Threading;
 
 namespace System.Reactive.Linq
@@ -16,7 +17,7 @@ namespace System.Reactive.Linq
             return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer)));
         }
 
-        public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source, AsyncGate gate)
+        public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source, IAsyncGate gate)
         {
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
@@ -40,7 +41,7 @@ namespace System.Reactive.Linq
             return Synchronize(observer, new AsyncGate());
         }
 
-        public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, AsyncGate gate)
+        public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, IAsyncGate gate)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));

+ 6 - 14
AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs

@@ -3,17 +3,18 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Diagnostics;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Threading
 {
-    public sealed class AsyncGate
+    public sealed class AsyncGate : IAsyncGate
     {
         private readonly object _gate = new();
         private readonly SemaphoreSlim _semaphore = new(1, 1);
         private readonly AsyncLocal<int> _recursionCount = new();
 
-        public ValueTask<Releaser> LockAsync()
+        public ValueTask<AsyncGateReleaser> LockAsync()
         {
             var shouldAcquire = false;
 
@@ -32,13 +33,13 @@ namespace System.Threading
 
             if (shouldAcquire)
             {
-                return new ValueTask<Releaser>(_semaphore.WaitAsync().ContinueWith(_ => new Releaser(this)));
+                return new ValueTask<AsyncGateReleaser>(_semaphore.WaitAsync().ContinueWith(_ => new AsyncGateReleaser(this)));
             }
 
-            return new ValueTask<Releaser>(new Releaser(this));
+            return new ValueTask<AsyncGateReleaser>(new AsyncGateReleaser(this));
         }
 
-        private void Release()
+        void IAsyncGate.Release()
         {
             lock (_gate)
             {
@@ -50,14 +51,5 @@ namespace System.Threading
                 }
             }
         }
-
-        public readonly struct Releaser : IDisposable
-        {
-            private readonly AsyncGate _parent;
-
-            public Releaser(AsyncGate parent) => _parent = parent;
-
-            public void Dispose() => _parent.Release();
-        }
     }
 }

+ 15 - 0
AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs

@@ -0,0 +1,15 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Threading
+{
+    public readonly struct AsyncGateReleaser : IDisposable
+    {
+        private readonly IAsyncGate _parent;
+
+        public AsyncGateReleaser(IAsyncGate parent) => _parent = parent;
+
+        public void Dispose() => _parent.Release();
+    }
+}

+ 73 - 0
AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs

@@ -0,0 +1,73 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace System.Reactive.Threading
+{
+    /// <summary>
+    /// Synchronization primitive that provides <see cref="System.Threading.Monitor"/>-style
+    /// exclusive access semantics, but with an asynchronous API.
+    /// </summary>
+    /// <remarks>
+    /// <para>
+    /// This enables <see cref="AsyncObservable.Synchronize{TSource}(IAsyncObservable{TSource}, IAsyncGate)"/>
+    /// and <see cref="AsyncObserver.Synchronize{TSource}(IAsyncObserver{TSource}, IAsyncGate)"/>
+    /// to be used to synchronize access to an observer with a custom synchronization primitive.
+    /// </para>
+    /// <para>
+    /// These methods model the equivalents for <see cref="IObservable{T}"/> and <see cref="IObserver{T}"/>
+    /// in <c>System.Reactive</c>. Those offer overloads accepting a 'gate' parameter, and if you pass
+    /// the same object to multiple calls to these methods, they will all synchronize their operation
+    /// through that same gate object. The <c>gate</c> parameter in those methods is of type
+    /// <see cref="System.Object"/>, which works because all .NET objects have an associated monitor.
+    /// (It's created on demand when you first use <c>lock</c> or something equivalent.)
+    /// </para>
+    /// <para>
+    /// That approach is problematic in an async world, because this built-in monitor blocks the
+    /// calling thread when contention occurs. The basic idea of AsyncRx.NET is to avoid such
+    /// blocking. It can't always be avoided, and in cases where we can be certain that lock
+    /// acquisition times will be short, the conventional .NET monitor is still a good choice.
+    /// But since these <c>Synchronize</c> operators allow the caller to pass a gate which the
+    /// application code itself might lock, we have no control over how long the lock might be
+    /// held. So it would be inappropriate to use a monitor here.
+    /// </para>
+    /// <para>
+    /// Since the .NET runtime does not currently offer any asynchronous direct equivalent to
+    /// monitor, this interface defines the required API. The <see cref="AsyncGate"/> class
+    /// provide a basic implementation. If applications require additional features, (e.g.
+    /// if they want cancellation support when the application tries to acquire the lock)
+    /// they can provide their own implementation.
+    /// </para>
+    /// </remarks>
+    public interface IAsyncGate
+    {
+        /// <summary>
+        /// Acquires the lock.
+        /// </summary>
+        /// <returns>
+        /// A task that completes when the lock has been acquired, returning an <see cref="AsyncGateReleaser"/>
+        /// which can be disposed to release the lock.
+        /// </returns>
+        /// <remarks>
+        /// <para>
+        /// Applications release the lock by disposing the <see cref="AsyncGateReleaser"/> returned by this
+        /// method. Typically this is done with a <c>using</c> statement or declaration.
+        /// </para>
+        /// </remarks>
+        public ValueTask<AsyncGateReleaser> LockAsync();
+
+        /// <summary>
+        /// Releases the lock. Applications typically won't call this directly, and will use
+        /// the <see cref="AsyncGateReleaser"/> returned by <see cref="LockAsync"/> instead.
+        /// </summary>
+        /// <remarks>
+        /// This method needs to be publicly accessible so that a single <see cref="AsyncGateReleaser"/>
+        /// can be shared by all implementations of this interface.
+        /// </remarks>
+        public void Release();
+    }
+}