浏览代码

Adding AsyncQueueLock.

Bart De Smet 8 年之前
父节点
当前提交
fe90baf46c

+ 4 - 0
AsyncRx.NET/System.Reactive.Bcl/System.Reactive.Bcl.csproj

@@ -4,4 +4,8 @@
     <TargetFramework>netstandard2.0</TargetFramework>
   </PropertyGroup>
 
+  <ItemGroup>
+    <ProjectReference Include="..\System.Reactive.Async.Interfaces\System.Reactive.Async.Interfaces.csproj" />
+  </ItemGroup>
+
 </Project>

+ 81 - 0
AsyncRx.NET/System.Reactive.Bcl/System/Threading/AsyncQueueLock.cs

@@ -0,0 +1,81 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Collections.Generic;
+using System.Threading.Tasks;
+
+namespace System.Threading
+{
+    public sealed class AsyncQueueLock : IAsyncDisposable
+    {
+        private readonly Queue<Func<Task>> _queue = new Queue<Func<Task>>();
+        private readonly AsyncLock _gate = new AsyncLock();
+
+        private bool _isAcquired;
+        private bool _hasFaulted;
+
+        public async Task Wait(Func<Task> action)
+        {
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            var shouldRun = false;
+
+            using (await _gate.LockAsync().ConfigureAwait(false))
+            {
+                if (!_hasFaulted)
+                {
+                    _queue.Enqueue(action);
+                    shouldRun = !_isAcquired;
+                    _isAcquired = true;
+                }
+            }
+
+            if (shouldRun)
+            {
+                while (true)
+                {
+                    var next = default(Func<Task>);
+
+                    using (await _gate.LockAsync().ConfigureAwait(false))
+                    {
+                        if (_queue.Count == 0)
+                        {
+                            _isAcquired = false;
+                            break;
+                        }
+
+                        next = _queue.Dequeue();
+                    }
+
+                    try
+                    {
+                        await next().ConfigureAwait(false);
+                    }
+                    catch
+                    {
+                        using (await _gate.LockAsync().ConfigureAwait(false))
+                        {
+                            _queue.Clear();
+                            _hasFaulted = true;
+                        }
+
+                        throw;
+                    }
+                }
+            }
+        }
+
+        public async Task DisposeAsync()
+        {
+            var queue = _queue;
+
+            using (await _gate.LockAsync().ConfigureAwait(false))
+            {
+                queue.Clear();
+                _hasFaulted = true;
+            }
+        }
+    }
+}