瀏覽代碼

Enhance AsyncLock to support passing state. (#554)

Daniel C. Weber 7 年之前
父節點
當前提交
f4f6a62380
共有 1 個文件被更改,包括 28 次插入6 次删除
  1. 28 6
      Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs

+ 28 - 6
Rx.NET/Source/src/System.Reactive/Concurrency/AsyncLock.cs

@@ -11,10 +11,10 @@ namespace System.Reactive.Concurrency
     /// </summary>
     public sealed class AsyncLock : IDisposable
     {
-        private object guard = new object();
-        private Queue<Action> queue;
         private bool isAcquired = false;
         private bool hasFaulted = false;
+        private object guard = new object();
+        private Queue<(Action<Delegate, object> action, Delegate @delegate, object state)> queue;
 
         /// <summary>
         /// Queues the action for execution. If the caller acquires the lock and becomes the owner,
@@ -28,6 +28,28 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
+            Wait(action, closureAction => closureAction());
+        }
+
+        /// <summary>
+        /// Queues the action for execution. If the caller acquires the lock and becomes the owner,
+        /// the queue is processed. If the lock is already owned, the action is queued and will get
+        /// processed by the owner.
+        /// </summary>
+        /// <param name="action">Action to queue for execution.</param>
+        /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
+        /// <remarks>In case TState is a value type, this operation will involve boxing of <paramref name="state"/>.
+        /// However, this is often an improvement over the allocation of a closure object and a delegate.</remarks>
+        internal void Wait<TState>(TState state, Action<TState> action)
+        {
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            Wait(state, action, (actionObject, stateObject) => ((Action<TState>)actionObject)((TState)stateObject));
+        }
+
+        private void Wait(object state, Delegate @delegate, Action<Delegate, object> action)
+        { 
             // allow one thread to update the state
             lock (guard)
             {
@@ -45,11 +67,11 @@ namespace System.Reactive.Concurrency
                     var q = queue;
                     if (q == null)
                     {
-                        q = new Queue<Action>();
+                        q = new Queue<(Action<Delegate, object> action, Delegate @delegate, object state)>();
                         queue = q;
                     }
                     // enqueue the work
-                    q.Enqueue(action);
+                    q.Enqueue((action, @delegate, state));
                     return;
                 }
 
@@ -63,7 +85,7 @@ namespace System.Reactive.Concurrency
             {
                 try
                 {
-                    action();
+                    action(@delegate, state);
                 }
                 catch
                 {
@@ -91,7 +113,7 @@ namespace System.Reactive.Concurrency
                     }
 
                     // get the next work action
-                    action = q.Dequeue();
+                    (action, @delegate, state) = q.Dequeue();
                 }
                 // loop back and execute the action
             }