AsyncGate.cs 2.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Diagnostics;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Threading
  8. {
  9. /// <summary>
  10. /// Provides an implementation of <see cref="IAsyncGate"/>, enabling mutually exclusive locking
  11. /// in async code.
  12. /// </summary>
  13. public sealed class AsyncGate : IAsyncGate, IAsyncGateReleaser
  14. {
  15. private readonly object _gate = new();
  16. private readonly SemaphoreSlim _semaphore = new(1, 1);
  17. private readonly AsyncLocal<int> _recursionCount = new();
  18. /// <summary>
  19. /// Creates an <see cref="AsyncGate"/>.
  20. /// </summary>
  21. /// <remarks>
  22. /// This is private because we hope that one day, the .NET runtime will provide a built-in
  23. /// asynchronous mutual exclusion primitive, and that we might be able to use that instead of
  24. /// our own implementation. Although that might be something we could do by modifying this
  25. /// class, it might prove useful to be able to provide the old implementation for backwards
  26. /// compatibility, so we don't want AsyncRx.NET consumers to depend on a specific concrete type
  27. /// as the <see cref="IAsyncGate"/> implementation.
  28. /// </remarks>
  29. private AsyncGate()
  30. {
  31. }
  32. /// <summary>
  33. /// Creates a new instance of an <see cref="IAsyncGate"/> implementation.
  34. /// </summary>
  35. /// <returns></returns>
  36. public static IAsyncGate Create() => new AsyncGate();
  37. ValueTask<IAsyncGateReleaser> IAsyncGate.AcquireAsync()
  38. {
  39. var shouldAcquire = false;
  40. lock (_gate)
  41. {
  42. if (_recursionCount.Value == 0)
  43. {
  44. shouldAcquire = true;
  45. _recursionCount.Value = 1;
  46. }
  47. else
  48. {
  49. _recursionCount.Value++;
  50. }
  51. }
  52. if (shouldAcquire)
  53. {
  54. Task acquireTask = _semaphore.WaitAsync();
  55. if (acquireTask.IsCompleted)
  56. {
  57. return new ValueTask<IAsyncGateReleaser>(this);
  58. }
  59. return new ValueTask<IAsyncGateReleaser>(acquireTask.ContinueWith<IAsyncGateReleaser>(_ => this));
  60. }
  61. return new ValueTask<IAsyncGateReleaser>(this);
  62. }
  63. void IAsyncGateReleaser.Release()
  64. {
  65. lock (_gate)
  66. {
  67. Debug.Assert(_recursionCount.Value > 0);
  68. if (--_recursionCount.Value == 0)
  69. {
  70. _semaphore.Release();
  71. }
  72. }
  73. }
  74. }
  75. }