AsyncQueueLock.cs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Threading
  8. {
  9. public sealed class AsyncQueueLock : IAsyncDisposable
  10. {
  11. private readonly Queue<Func<ValueTask>> _queue = new();
  12. private readonly IAsyncGate _gate = AsyncGate.Create();
  13. private bool _isAcquired;
  14. private bool _hasFaulted;
  15. public async ValueTask WaitAsync(Func<ValueTask> action)
  16. {
  17. if (action == null)
  18. throw new ArgumentNullException(nameof(action));
  19. var shouldRun = false;
  20. using (await _gate.LockAsync().ConfigureAwait(false))
  21. {
  22. if (!_hasFaulted)
  23. {
  24. _queue.Enqueue(action);
  25. shouldRun = !_isAcquired;
  26. _isAcquired = true;
  27. }
  28. }
  29. if (shouldRun)
  30. {
  31. while (true)
  32. {
  33. var next = default(Func<ValueTask>);
  34. using (await _gate.LockAsync().ConfigureAwait(false))
  35. {
  36. if (_queue.Count == 0)
  37. {
  38. _isAcquired = false;
  39. break;
  40. }
  41. next = _queue.Dequeue();
  42. }
  43. try
  44. {
  45. await next().ConfigureAwait(false);
  46. }
  47. catch
  48. {
  49. using (await _gate.LockAsync().ConfigureAwait(false))
  50. {
  51. _queue.Clear();
  52. _hasFaulted = true;
  53. }
  54. throw;
  55. }
  56. }
  57. }
  58. }
  59. public async ValueTask DisposeAsync()
  60. {
  61. var queue = _queue;
  62. using (await _gate.LockAsync().ConfigureAwait(false))
  63. {
  64. queue.Clear();
  65. _hasFaulted = true;
  66. }
  67. }
  68. }
  69. }