Browse Source

Add IAsyncGateReleaser

Also removed public AsyncGate ctor, and reworked code using AsyncGate directly to work in terms of IAsyncGate.
Ian Griffiths 5 months ago
parent
commit
d00ad42db7
44 changed files with 270 additions and 212 deletions
  1. 2 2
      AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs
  2. 2 1
      AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs
  3. 2 2
      AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs
  4. 2 1
      AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs
  5. 3 3
      AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs
  6. 2 2
      AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs
  7. 3 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs
  8. 6 5
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs
  9. 29 28
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs
  10. 3 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt
  11. 2 1
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs
  12. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs
  13. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs
  14. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs
  15. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs
  16. 2 1
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs
  17. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs
  18. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs
  19. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/SelectMany.cs
  20. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/SequenceEqual.cs
  21. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs
  22. 3 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs
  23. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Switch.cs
  24. 1 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs
  25. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs
  26. 3 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs
  27. 3 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Throttle.cs
  28. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Timeout.cs
  29. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs
  30. 6 5
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs
  31. 3 3
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs
  32. 29 28
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.cs
  33. 3 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt
  34. 2 2
      AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs
  35. 2 2
      AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs
  36. 2 2
      AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs
  37. 2 2
      AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs
  38. 37 7
      AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs
  39. 43 0
      AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs
  40. 0 54
      AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs
  41. 2 1
      AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs
  42. 15 0
      AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs
  43. 6 16
      AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs
  44. 24 0
      AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Disposables/CompositeAsyncDisposable.cs

@@ -4,14 +4,14 @@
 
 using System.Collections.Generic;
 using System.Linq;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Disposables
 {
     public sealed class CompositeAsyncDisposable : IAsyncDisposable
     {
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
         private readonly List<IAsyncDisposable> _disposables;
         private bool _disposed;
 

+ 2 - 1
AsyncRx.NET/System.Reactive.Async/Disposables/RefCountAsyncDisposable.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -9,7 +10,7 @@ namespace System.Reactive.Disposables
 {
     public sealed class RefCountAsyncDisposable : IAsyncDisposable
     {
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
         private IAsyncDisposable _disposable;
         private bool _primaryDisposed;
         private int _count;

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Disposables/SerialAsyncDisposable.cs

@@ -2,14 +2,14 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Disposables
 {
     public sealed class SerialAsyncDisposable : IAsyncDisposable
     {
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
 
         private IAsyncDisposable _disposable;
         private bool _disposed;

+ 2 - 1
AsyncRx.NET/System.Reactive.Async/Internal/ScheduledAsyncObserverBase.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -12,7 +13,7 @@ namespace System.Reactive
     {
         private readonly IAsyncObserver<T> _observer;
 
-        private readonly AsyncGate _lock = new();
+        private readonly IAsyncGate _lock = AsyncGate.Create();
         private readonly Queue<T> _queue = new();
 
         private bool _hasFaulted = false;

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Joins/AsyncJoinObserver.cs

@@ -5,7 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Reactive.Linq;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Joins
@@ -18,7 +18,7 @@ namespace System.Reactive.Joins
         private readonly List<ActiveAsyncPlan> _activePlans = new();
         private readonly SingleAssignmentAsyncDisposable _subscription = new();
 
-        private AsyncGate _gate;
+        private IAsyncGate _gate;
         private bool _isDisposed;
 
         public AsyncJoinObserver(IAsyncObservable<T> source, Func<Exception, ValueTask> onError)
@@ -56,7 +56,7 @@ namespace System.Reactive.Joins
             }
         }
 
-        public async Task SubscribeAsync(AsyncGate gate)
+        public async Task SubscribeAsync(IAsyncGate gate)
         {
             _gate = gate;
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Joins/IAsyncJoinObserver.cs

@@ -2,14 +2,14 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Joins
 {
     internal interface IAsyncJoinObserver : IAsyncDisposable
     {
-        Task SubscribeAsync(AsyncGate gate);
+        Task SubscribeAsync(IAsyncGate gate);
 
         void Dequeue();
     }

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Amb.cs

@@ -5,7 +5,7 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -83,7 +83,7 @@ namespace System.Reactive.Linq
             if (second == null)
                 throw new ArgumentNullException(nameof(second));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var state = AmbState.None;
 
@@ -199,7 +199,7 @@ namespace System.Reactive.Linq
             if (subscriptions == null)
                 throw new ArgumentNullException(nameof(subscriptions));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var winner = default(int?);
 

+ 6 - 5
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Buffer.cs

@@ -5,6 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -306,7 +307,7 @@ namespace System.Reactive.Linq
 
             async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
             {
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 var buffer = new List<TSource>();
 
@@ -378,7 +379,7 @@ namespace System.Reactive.Linq
 
             async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
             {
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 var queue = new Queue<List<TSource>>();
 
@@ -509,7 +510,7 @@ namespace System.Reactive.Linq
 
             async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
             {
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 var timer = new SerialAsyncDisposable();
 
@@ -586,7 +587,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var buffer = new List<TSource>();
 
@@ -660,7 +661,7 @@ namespace System.Reactive.Linq
             {
                 var closeSubscription = new SerialAsyncDisposable();
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
                 var queueLock = new AsyncQueueLock();
 
                 var buffer = new List<TSource>();

+ 29 - 28
AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -1827,7 +1828,7 @@ namespace System.Reactive.Linq
             bool isDone2 = false;
             T2 latestValue2 = default(T2);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -1946,7 +1947,7 @@ namespace System.Reactive.Linq
             bool isDone2 = false;
             T2 latestValue2 = default(T2);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -2078,7 +2079,7 @@ namespace System.Reactive.Linq
             bool isDone3 = false;
             T3 latestValue3 = default(T3);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -2243,7 +2244,7 @@ namespace System.Reactive.Linq
             bool isDone3 = false;
             T3 latestValue3 = default(T3);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -2432,7 +2433,7 @@ namespace System.Reactive.Linq
             bool isDone4 = false;
             T4 latestValue4 = default(T4);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -2643,7 +2644,7 @@ namespace System.Reactive.Linq
             bool isDone4 = false;
             T4 latestValue4 = default(T4);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -2889,7 +2890,7 @@ namespace System.Reactive.Linq
             bool isDone5 = false;
             T5 latestValue5 = default(T5);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -3146,7 +3147,7 @@ namespace System.Reactive.Linq
             bool isDone5 = false;
             T5 latestValue5 = default(T5);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -3449,7 +3450,7 @@ namespace System.Reactive.Linq
             bool isDone6 = false;
             T6 latestValue6 = default(T6);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -3752,7 +3753,7 @@ namespace System.Reactive.Linq
             bool isDone6 = false;
             T6 latestValue6 = default(T6);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -4112,7 +4113,7 @@ namespace System.Reactive.Linq
             bool isDone7 = false;
             T7 latestValue7 = default(T7);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -4461,7 +4462,7 @@ namespace System.Reactive.Linq
             bool isDone7 = false;
             T7 latestValue7 = default(T7);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -4878,7 +4879,7 @@ namespace System.Reactive.Linq
             bool isDone8 = false;
             T8 latestValue8 = default(T8);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -5273,7 +5274,7 @@ namespace System.Reactive.Linq
             bool isDone8 = false;
             T8 latestValue8 = default(T8);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -5747,7 +5748,7 @@ namespace System.Reactive.Linq
             bool isDone9 = false;
             T9 latestValue9 = default(T9);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -6188,7 +6189,7 @@ namespace System.Reactive.Linq
             bool isDone9 = false;
             T9 latestValue9 = default(T9);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -6719,7 +6720,7 @@ namespace System.Reactive.Linq
             bool isDone10 = false;
             T10 latestValue10 = default(T10);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -7206,7 +7207,7 @@ namespace System.Reactive.Linq
             bool isDone10 = false;
             T10 latestValue10 = default(T10);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -7794,7 +7795,7 @@ namespace System.Reactive.Linq
             bool isDone11 = false;
             T11 latestValue11 = default(T11);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -8327,7 +8328,7 @@ namespace System.Reactive.Linq
             bool isDone11 = false;
             T11 latestValue11 = default(T11);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -8972,7 +8973,7 @@ namespace System.Reactive.Linq
             bool isDone12 = false;
             T12 latestValue12 = default(T12);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -9551,7 +9552,7 @@ namespace System.Reactive.Linq
             bool isDone12 = false;
             T12 latestValue12 = default(T12);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -10253,7 +10254,7 @@ namespace System.Reactive.Linq
             bool isDone13 = false;
             T13 latestValue13 = default(T13);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -10878,7 +10879,7 @@ namespace System.Reactive.Linq
             bool isDone13 = false;
             T13 latestValue13 = default(T13);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -11637,7 +11638,7 @@ namespace System.Reactive.Linq
             bool isDone14 = false;
             T14 latestValue14 = default(T14);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -12308,7 +12309,7 @@ namespace System.Reactive.Linq
             bool isDone14 = false;
             T14 latestValue14 = default(T14);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -13124,7 +13125,7 @@ namespace System.Reactive.Linq
             bool isDone15 = false;
             T15 latestValue15 = default(T15);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -13841,7 +13842,7 @@ namespace System.Reactive.Linq
             bool isDone15 = false;
             T15 latestValue15 = default(T15);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (

+ 3 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/CombineLatest.Generated.tt

@@ -9,6 +9,7 @@
 <#@ import namespace="System.Collections.Generic" #>
 <#@ output extension=".cs" #>
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -161,7 +162,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (
@@ -248,7 +249,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
             (

+ 2 - 1
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Delay.cs

@@ -5,6 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -76,7 +77,7 @@ namespace System.Reactive.Linq
 
             var semaphore = new SemaphoreSlim(0);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var queue = new Queue<TimeInterval<TSource>>();
             var isDone = false;

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupByUntil.cs

@@ -6,7 +6,7 @@ using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -609,7 +609,7 @@ namespace System.Reactive.Linq
                     groups = new ConcurrentDictionary<TKey, IAsyncSubject<TElement>>(Environment.ProcessorCount * 4, capacity, comparer);
                 }
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 var nullGate = new object();
                 var nullGroup = default(IAsyncSubject<TElement>);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/GroupJoin.cs

@@ -5,7 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -57,7 +57,7 @@ namespace System.Reactive.Linq
             if (resultSelector == null)
                 throw new ArgumentNullException(nameof(resultSelector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var group = new CompositeAsyncDisposable(subscriptions);
             var refCount = new RefCountAsyncDisposable(group);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Join.cs

@@ -4,7 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -59,7 +59,7 @@ namespace System.Reactive.Linq
             if (resultSelector == null)
                 throw new ArgumentNullException(nameof(resultSelector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var group = new CompositeAsyncDisposable(subscriptions);
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Merge.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -35,7 +35,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var count = 1;
 

+ 2 - 1
AsyncRx.NET/System.Reactive.Async/Linq/Operators/ObserveOn.cs

@@ -5,6 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -44,7 +45,7 @@ namespace System.Reactive.Linq
 
             var semaphore = new SemaphoreSlim(0);
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var queue = new Queue<TSource>();
             var error = default(Exception);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/RefCount.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
-using System.Threading;
+using System.Reactive.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -15,7 +15,7 @@ namespace System.Reactive.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
             var count = 0;
             var connectable = default(IAsyncDisposable);
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Sample.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -78,7 +78,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var hasValue = false;
             var value = default(TSource);

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/SelectMany.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -222,7 +222,7 @@ namespace System.Reactive.Linq
             if (resultSelector == null)
                 throw new ArgumentNullException(nameof(resultSelector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var count = 1;
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/SequenceEqual.cs

@@ -4,7 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -84,7 +84,7 @@ namespace System.Reactive.Linq
             if (comparer == null)
                 throw new ArgumentNullException(nameof(comparer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var queueLeft = new Queue<TSource>();
             var queueRight = new Queue<TSource>();

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Skip.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -130,7 +130,7 @@ namespace System.Reactive.Linq
                 // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want Skip on the observer?
                 // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
                 var open = false;
 
                 return

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/SkipUntil.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -87,7 +87,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
             var open = false;
 
             return
@@ -154,7 +154,7 @@ namespace System.Reactive.Linq
                 // REVIEW: May be easier to just use SkipUntil with a Timer parameter. Do we want SkipUntil on the observer?
                 // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
                 var open = false;
 
                 return

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Switch.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -34,7 +34,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var isStopped = false;
             var hasLatest = false;

+ 1 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Synchronize.cs

@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Threading;
-using System.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -38,7 +37,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            return Synchronize(observer, new AsyncGate());
+            return Synchronize(observer, AsyncGate.Create());
         }
 
         public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, IAsyncGate gate)

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Take.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -130,7 +130,7 @@ namespace System.Reactive.Linq
                 // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
                 // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 return
                     (

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/TakeUntil.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -87,7 +87,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             return
                 (
@@ -150,7 +150,7 @@ namespace System.Reactive.Linq
                 // REVIEW: May be easier to just use TakeUntil with a Timer parameter. Do we want TakeUntil on the observer?
                 // DESIGN: It seems that if an observer would be an IAsyncDisposable, this could get a bit easier ("inject" the inner disposable).
 
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 return
                     (

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Throttle.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -104,7 +104,7 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var timer = new SerialAsyncDisposable();
 
@@ -187,7 +187,7 @@ namespace System.Reactive.Linq
             if (throttleSelector == null)
                 throw new ArgumentNullException(nameof(throttleSelector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var throttler = new SerialAsyncDisposable();
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Timeout.cs

@@ -4,7 +4,7 @@
 
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -159,7 +159,7 @@ namespace System.Reactive.Linq
 
             async Task<(IAsyncObserver<TSource>, IAsyncDisposable)> CoreAsync()
             {
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
 
                 var switched = false;
                 var id = 0UL;

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/When.cs

@@ -5,7 +5,7 @@
 using System.Collections.Generic;
 using System.Reactive.Disposables;
 using System.Reactive.Joins;
-using System.Threading;
+using System.Reactive.Threading;
 
 namespace System.Reactive.Linq
 {
@@ -19,7 +19,7 @@ namespace System.Reactive.Linq
             return Create<TResult>(async observer =>
             {
                 var externalSubscriptions = new Dictionary<object, IAsyncJoinObserver>();
-                var gate = new AsyncGate();
+                var gate = AsyncGate.Create();
                 var activePlans = new List<ActiveAsyncPlan>();
 
                 var outputObserver = AsyncObserver.Create<TResult>(

+ 6 - 5
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Window.cs

@@ -6,6 +6,7 @@ using System.Collections.Generic;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -296,7 +297,7 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var window = default(IAsyncSubject<TSource>);
             var d = new CompositeAsyncDisposable();
@@ -382,7 +383,7 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var d = new CompositeAsyncDisposable();
             var timer = new SerialAsyncDisposable();
@@ -538,7 +539,7 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var n = 0;
             var window = default(IAsyncSubject<TSource>);
@@ -649,7 +650,7 @@ namespace System.Reactive.Linq
             if (subscription == null)
                 throw new ArgumentNullException(nameof(subscription));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var refCount = new RefCountAsyncDisposable(subscription);
             var window = default(IAsyncSubject<TSource>);
@@ -736,7 +737,7 @@ namespace System.Reactive.Linq
 
             var closeSubscription = new SerialAsyncDisposable();
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
             var queueLock = new AsyncQueueLock();
 
             var refCount = new RefCountAsyncDisposable(subscription);

+ 3 - 3
AsyncRx.NET/System.Reactive.Async/Linq/Operators/WithLatestFrom.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -103,7 +103,7 @@ namespace System.Reactive.Linq
             if (resultSelector == null)
                 throw new ArgumentNullException(nameof(resultSelector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             async ValueTask OnErrorAsync(Exception ex)
             {
@@ -170,7 +170,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             async ValueTask OnErrorAsync(Exception ex)
             {

+ 29 - 28
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -1819,7 +1820,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -1913,7 +1914,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2007,7 +2008,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2103,7 +2104,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2199,7 +2200,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2297,7 +2298,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2395,7 +2396,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2495,7 +2496,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2595,7 +2596,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2697,7 +2698,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2799,7 +2800,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -2903,7 +2904,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3007,7 +3008,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3113,7 +3114,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3219,7 +3220,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3327,7 +3328,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3435,7 +3436,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3545,7 +3546,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3655,7 +3656,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3767,7 +3768,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3879,7 +3880,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -3993,7 +3994,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4107,7 +4108,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4223,7 +4224,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4339,7 +4340,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4457,7 +4458,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4575,7 +4576,7 @@ namespace System.Reactive.Linq
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();
@@ -4695,7 +4696,7 @@ namespace System.Reactive.Linq
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var values1 = new Queue<T1>();
             var values2 = new Queue<T2>();

+ 3 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.Generated.tt

@@ -10,6 +10,7 @@
 <#@ output extension=".cs" #>
 using System.Collections.Generic;
 using System.Reactive.Disposables;
+using System.Reactive.Threading;
 using System.Threading;
 using System.Threading.Tasks;
 
@@ -152,7 +153,7 @@ for (var i = 2; i <= 15; i++)
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
 <#
 for (var j = 1; j <= i; j++)
@@ -258,7 +259,7 @@ for (var j = 1; j <= i; j++)
             if (selector == null)
                 throw new ArgumentNullException(nameof(selector));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
 <#
 for (var j = 1; j <= i; j++)

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Linq/Operators/Zip.cs

@@ -5,7 +5,7 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -50,7 +50,7 @@ namespace System.Reactive.Linq
             if (count < 0)
                 throw new ArgumentOutOfRangeException(nameof(count));
 
-            var gate = new AsyncGate();
+            var gate = AsyncGate.Create();
 
             var queues = new Queue<TSource>[count];
             var isDone = new bool[count];

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Subjects/BehaviorAsyncSubject.cs

@@ -4,14 +4,14 @@
 
 using System.Collections.Generic;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Subjects
 {
     public abstract class BehaviorAsyncSubject<T> : IAsyncSubject<T>
     {
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
         private readonly List<IAsyncObserver<T>> _observers = new();
         private T _value;
         private bool _done;

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Subjects/ConnectableAsyncObservable.cs

@@ -3,7 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Linq;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Subjects
@@ -12,7 +12,7 @@ namespace System.Reactive.Subjects
     {
         private readonly IAsyncSubject<TSource, TResult> _subject;
         private readonly IAsyncObservable<TSource> _source;
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
 
         private Connection _connection;
 

+ 2 - 2
AsyncRx.NET/System.Reactive.Async/Subjects/ReplayAsyncSubject.cs

@@ -6,7 +6,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
-using System.Threading;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Subjects
@@ -120,7 +120,7 @@ namespace System.Reactive.Subjects
         private abstract class ReplayBase : IAsyncSubject<T>
         {
             private readonly bool _concurrent;
-            private readonly AsyncGate _lock = new();
+            private readonly IAsyncGate _lock = AsyncGate.Create();
             private readonly List<IScheduledAsyncObserver<T>> _observers = new(); // TODO: immutable array
             private bool _done;
             private Exception _error;

+ 37 - 7
AsyncRx.NET/System.Reactive.Async/Threading/AsyncGate.cs

@@ -3,18 +3,43 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Diagnostics;
-using System.Reactive.Threading;
+using System.Threading;
 using System.Threading.Tasks;
 
-namespace System.Threading
+namespace System.Reactive.Threading
 {
-    public sealed class AsyncGate : IAsyncGate
+    /// <summary>
+    /// Provides an implementation of <see cref="IAsyncGate"/>, enabling mutually exclusive locking
+    /// in async code.
+    /// </summary>
+    public sealed class AsyncGate : IAsyncGate, IAsyncGateReleaser
     {
         private readonly object _gate = new();
         private readonly SemaphoreSlim _semaphore = new(1, 1);
         private readonly AsyncLocal<int> _recursionCount = new();
 
-        public ValueTask<AsyncGateReleaser> LockAsync()
+        /// <summary>
+        /// Creates an <see cref="AsyncGate"/>.
+        /// </summary>
+        /// <remarks>
+        /// This is private because we hope that one day, the .NET runtime will provide a built-in
+        /// asynchronous mutual exclusion primitive, and that we might be able to use that instead of
+        /// our own implementation. Although that might be something we could do by modifying this
+        /// class, it might prove useful to be able to provide the old implementation for backwards
+        /// compatibility, so we don't want AsyncRx.NET consumers to depend on a specific concrete type
+        /// as the <see cref="IAsyncGate"/> implementation.
+        /// </remarks>
+        private AsyncGate()
+        {
+        }
+
+        /// <summary>
+        /// Creates a new instance of an <see cref="IAsyncGate"/> implementation.
+        /// </summary>
+        /// <returns></returns>
+        public static IAsyncGate Create() => new AsyncGate();
+
+        ValueTask<IAsyncGateReleaser> IAsyncGate.AcquireAsync()
         {
             var shouldAcquire = false;
 
@@ -33,13 +58,18 @@ namespace System.Threading
 
             if (shouldAcquire)
             {
-                return new ValueTask<AsyncGateReleaser>(_semaphore.WaitAsync().ContinueWith(_ => new AsyncGateReleaser(this)));
+                Task acquireTask = _semaphore.WaitAsync();
+                if (acquireTask.IsCompleted)
+                {
+                    return new ValueTask<IAsyncGateReleaser>(this);
+                }
+                return new ValueTask<IAsyncGateReleaser>(acquireTask.ContinueWith<IAsyncGateReleaser>(_ => this));
             }
 
-            return new ValueTask<AsyncGateReleaser>(new AsyncGateReleaser(this));
+            return new ValueTask<IAsyncGateReleaser>(this);
         }
 
-        void IAsyncGate.Release()
+        void IAsyncGateReleaser.Release()
         {
             lock (_gate)
             {

+ 43 - 0
AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateExtensions.cs

@@ -0,0 +1,43 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive.Threading;
+
+/// <summary>
+/// Extension methods for <see cref="IAsyncGate"/>.
+/// </summary>
+public static class AsyncGateExtensions
+{
+    /// <summary>
+    /// Acquires an <see cref="IAsyncGate"/> in a way enables the gate to be released with a <see langword="using" />
+    /// statement or declaration.
+    /// </summary>
+    /// <param name="gate">The gate to lock.</param>
+    /// <returns>
+    /// A <see cref="ValueTask{TResult}"/> that produces a <see cref="DisposableGateReleaser"/> that will call
+    /// <see cref="IAsyncGateReleaser.Release"/> when disposed.
+    /// </returns>
+    public static ValueTask<DisposableGateReleaser> LockAsync(this IAsyncGate gate)
+    {
+        // Note, we are avoiding async/await here because we MUST NOT create a new child ExecutionContext
+        // (The AsyncGate.LockAsync method does not use async/await either, and for the same reason.)
+        //
+        // IAsyncGate implementations are allowed to require that their LockAsync method is called from the same
+        // execution context as Release will be called. For example, AsyncGate uses an AsyncLocal<int> to track
+        // the recursion count, and when you update an AsyncLocal<T>'s value, that modified value is visible only
+        // in the current ExecutionContext and its descendants. An async method effectively introduces a new child
+        // context, so any AsyncLocal<T> value changes are lost when an async method returns, but we need the
+        // recursion count to live in our caller's context, which is why we must make sure we don't introduce a
+        // new child context here. That's why this needs to be old-school manual task management, and not async/await.
+        ValueTask<IAsyncGateReleaser> releaserValueTask = gate.AcquireAsync();
+        if (releaserValueTask.IsCompleted)
+        {
+            return new ValueTask<DisposableGateReleaser>(new DisposableGateReleaser(releaserValueTask.Result));
+        }
+
+        return new ValueTask<DisposableGateReleaser>(releaserValueTask.AsTask().ContinueWith(t => new DisposableGateReleaser(t.Result)));
+    }
+}

+ 0 - 54
AsyncRx.NET/System.Reactive.Async/Threading/AsyncGateReleaser.cs

@@ -1,54 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT License.
-// See the LICENSE file in the project root for more information. 
-
-namespace System.Reactive.Threading
-{
-    /// <summary>
-    /// Returned by <see cref="IAsyncGate.LockAsync"/>, enabling the caller to release the lock.
-    /// </summary>
-    public struct AsyncGateReleaser : IDisposable
-    {
-        // Holds either an IAsyncGate or an IDisposable.
-        // In the case where this is an IAsyncGate, it's important that we try to avoid
-        // calling Release more than once, because this releaser is associated with just one
-        // call to LockAsync. IDisposable implementations are expected to be idempotent,
-        // so we need to remember when we've already made our one call to Release. (This
-        // can't be perfect because this is a struct, so callers might end up copying
-        // this value and then disposing each copy. But for normal using usage that won't
-        // be a problem, and this provides a reasonable best-effort approach. It's why
-        // this can't be a readonly struct though.)
-        private object _parentOrDisposable;
-
-        /// <summary>
-        /// Creates an <see cref="AsyncGateReleaser"/> that calls <see cref="IAsyncGate.Release"/>
-        /// on its parent when disposed.
-        /// </summary>
-        /// <param name="parent"></param>
-        public AsyncGateReleaser(IAsyncGate parent) => _parentOrDisposable = parent;
-
-        /// <summary>
-        /// Creates an <see cref="AsyncGateReleaser"/> that calls another disposable when disposed.
-        /// </summary>
-        /// <param name="disposable">
-        /// The <see cref="IDisposable"/> implementation to which to defer.
-        /// </param>
-        /// <remarks>
-        /// This can be convenient for custom <see cref="IAsyncGate"/> implementations in that wrap
-        /// some underlying lock implementation that returns an <see cref="IDisposable"/> as the means
-        /// by which the lock is released.
-        /// </remarks>
-        public AsyncGateReleaser(IDisposable disposable) => _parentOrDisposable = disposable;
-
-        public void Dispose()
-        {
-            switch (_parentOrDisposable)
-            {
-                case IDisposable d: d.Dispose(); break;
-                case IAsyncGate g: g.Release(); break;
-            }
-
-            _parentOrDisposable = null;
-        }
-    }
-}

+ 2 - 1
AsyncRx.NET/System.Reactive.Async/Threading/AsyncQueueLock.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Reactive.Threading;
 using System.Threading.Tasks;
 
 namespace System.Threading
@@ -10,7 +11,7 @@ namespace System.Threading
     public sealed class AsyncQueueLock : IAsyncDisposable
     {
         private readonly Queue<Func<ValueTask>> _queue = new();
-        private readonly AsyncGate _gate = new();
+        private readonly IAsyncGate _gate = AsyncGate.Create();
 
         private bool _isAcquired;
         private bool _hasFaulted;

+ 15 - 0
AsyncRx.NET/System.Reactive.Async/Threading/DisposableGateReleaser.cs

@@ -0,0 +1,15 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Threading;
+
+/// <summary>
+/// Enables a <see langword="using" /> statement or declaration to be used to release an
+/// <see cref="IAsyncGate"/>. Typically obtained through <see cref="AsyncGateExtensions.LockAsync(IAsyncGate)"/>
+/// </summary>
+/// <param name="gateReleaser"></param>
+public struct DisposableGateReleaser(IAsyncGateReleaser gateReleaser) : IDisposable
+{
+    public void Dispose() => gateReleaser.Release();
+}

+ 6 - 16
AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGate.cs

@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Linq;
-using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Threading
@@ -49,25 +48,16 @@ namespace System.Reactive.Threading
         /// Acquires the lock.
         /// </summary>
         /// <returns>
-        /// A task that completes when the lock has been acquired, returning an <see cref="AsyncGateReleaser"/>
-        /// which can be disposed to release the lock.
+        /// A task that completes when the lock has been acquired, returning an <see cref="IAsyncGateReleaser"/>
+        /// with which to release the lock.
         /// </returns>
         /// <remarks>
         /// <para>
-        /// Applications release the lock by disposing the <see cref="AsyncGateReleaser"/> returned by this
-        /// method. Typically this is done with a <c>using</c> statement or declaration.
+        /// Applications release the lock by calling <see cref="IAsyncGateReleaser.Release"/> on the object
+        /// returned by this method. Typically this is done with a <c>using</c> statement or declaration by
+        /// using the <see cref="AsyncGateExtensions.LockAsync(IAsyncGate)"/> extension method.
         /// </para>
         /// </remarks>
-        public ValueTask<AsyncGateReleaser> LockAsync();
-
-        /// <summary>
-        /// Releases the lock. Applications typically won't call this directly, and will use
-        /// the <see cref="AsyncGateReleaser"/> returned by <see cref="LockAsync"/> instead.
-        /// </summary>
-        /// <remarks>
-        /// This method needs to be publicly accessible so that a single <see cref="AsyncGateReleaser"/>
-        /// can be shared by all implementations of this interface.
-        /// </remarks>
-        public void Release();
+        public ValueTask<IAsyncGateReleaser> AcquireAsync();
     }
 }

+ 24 - 0
AsyncRx.NET/System.Reactive.Async/Threading/IAsyncGateReleaser.cs

@@ -0,0 +1,24 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive.Threading;
+
+/// <summary>
+/// Releases a lock acquired from <see cref="IAsyncGate.AcquireAsync"/>.
+/// </summary>
+/// <remarks>
+/// <para>
+/// Note that implementations of <see cref="IAsyncGate"/> may return a reference to themselves
+/// as the <see cref="IAsyncGateReleaser"/>, so callers should not depend on each lock
+/// acquisition returning a distinct <see cref="IAsyncGateReleaser"/>. (This enables gate
+/// implementations to avoid unnecessary allocation during lock acquisition.)
+/// </para>
+/// </remarks>
+public interface IAsyncGateReleaser
+{
+    /// <summary>
+    /// Releases a lock acquired from <see cref="IAsyncGate.AcquireAsync"/>.
+    /// </summary>
+    void Release();
+}