AsyncQueueLock.cs 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading.Tasks;
  6. namespace System.Threading
  7. {
  8. public sealed class AsyncQueueLock : IAsyncDisposable
  9. {
  10. private readonly Queue<Func<Task>> _queue = new Queue<Func<Task>>();
  11. private readonly AsyncLock _gate = new AsyncLock();
  12. private bool _isAcquired;
  13. private bool _hasFaulted;
  14. public async Task Wait(Func<Task> action)
  15. {
  16. if (action == null)
  17. throw new ArgumentNullException(nameof(action));
  18. var shouldRun = false;
  19. using (await _gate.LockAsync().ConfigureAwait(false))
  20. {
  21. if (!_hasFaulted)
  22. {
  23. _queue.Enqueue(action);
  24. shouldRun = !_isAcquired;
  25. _isAcquired = true;
  26. }
  27. }
  28. if (shouldRun)
  29. {
  30. while (true)
  31. {
  32. var next = default(Func<Task>);
  33. using (await _gate.LockAsync().ConfigureAwait(false))
  34. {
  35. if (_queue.Count == 0)
  36. {
  37. _isAcquired = false;
  38. break;
  39. }
  40. next = _queue.Dequeue();
  41. }
  42. try
  43. {
  44. await next().ConfigureAwait(false);
  45. }
  46. catch
  47. {
  48. using (await _gate.LockAsync().ConfigureAwait(false))
  49. {
  50. _queue.Clear();
  51. _hasFaulted = true;
  52. }
  53. throw;
  54. }
  55. }
  56. }
  57. }
  58. public async Task DisposeAsync()
  59. {
  60. var queue = _queue;
  61. using (await _gate.LockAsync().ConfigureAwait(false))
  62. {
  63. queue.Clear();
  64. _hasFaulted = true;
  65. }
  66. }
  67. }
  68. }