|
|
@@ -11,7 +11,8 @@ namespace System.Reactive.Concurrency
|
|
|
/// </summary>
|
|
|
public sealed class AsyncLock : IDisposable
|
|
|
{
|
|
|
- private readonly Queue<Action> queue = new Queue<Action>();
|
|
|
+ private object guard = new object();
|
|
|
+ private Queue<Action> queue;
|
|
|
private bool isAcquired = false;
|
|
|
private bool hasFaulted = false;
|
|
|
|
|
|
@@ -27,50 +28,72 @@ namespace System.Reactive.Concurrency
|
|
|
if (action == null)
|
|
|
throw new ArgumentNullException(nameof(action));
|
|
|
|
|
|
- var isOwner = false;
|
|
|
- lock (queue)
|
|
|
+ // allow one thread to update the state
|
|
|
+ lock (guard)
|
|
|
{
|
|
|
- if (!hasFaulted)
|
|
|
+ // if a previous action crashed, ignore any future actions
|
|
|
+ if (hasFaulted)
|
|
|
{
|
|
|
- queue.Enqueue(action);
|
|
|
- isOwner = !isAcquired;
|
|
|
- isAcquired = true;
|
|
|
+ return;
|
|
|
}
|
|
|
+
|
|
|
+ // if the "lock" is busy, queue up the extra work
|
|
|
+ // otherwise there is no need to queue up "action"
|
|
|
+ if (isAcquired)
|
|
|
+ {
|
|
|
+ // create the queue if necessary
|
|
|
+ var q = queue;
|
|
|
+ if (q == null)
|
|
|
+ {
|
|
|
+ q = new Queue<Action>();
|
|
|
+ queue = q;
|
|
|
+ }
|
|
|
+ // enqueue the work
|
|
|
+ q.Enqueue(action);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // indicate there is processing going on
|
|
|
+ isAcquired = true;
|
|
|
}
|
|
|
|
|
|
- if (isOwner)
|
|
|
+ // if we get here, execute the "action" first
|
|
|
+
|
|
|
+ for (; ; )
|
|
|
{
|
|
|
- while (true)
|
|
|
+ try
|
|
|
{
|
|
|
- var work = default(Action);
|
|
|
- lock (queue)
|
|
|
+ action();
|
|
|
+ }
|
|
|
+ catch
|
|
|
+ {
|
|
|
+ // the execution failed, terminate this AsyncLock
|
|
|
+ lock (guard)
|
|
|
{
|
|
|
- if (queue.Count > 0)
|
|
|
- {
|
|
|
- work = queue.Dequeue();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- isAcquired = false;
|
|
|
- break;
|
|
|
- }
|
|
|
+ // throw away the queue
|
|
|
+ queue = null;
|
|
|
+ // report fault
|
|
|
+ hasFaulted = true;
|
|
|
}
|
|
|
+ throw;
|
|
|
+ }
|
|
|
|
|
|
- try
|
|
|
+ // execution succeeded, let's see if more work has to be done
|
|
|
+ lock (guard)
|
|
|
+ {
|
|
|
+ var q = queue;
|
|
|
+ // either there is no queue yet or we run out of work
|
|
|
+ if (q == null || q.Count == 0)
|
|
|
{
|
|
|
- work();
|
|
|
+ // release the lock
|
|
|
+ isAcquired = false;
|
|
|
+ return;
|
|
|
}
|
|
|
- catch
|
|
|
- {
|
|
|
- lock (queue)
|
|
|
- {
|
|
|
- queue.Clear();
|
|
|
- hasFaulted = true;
|
|
|
- }
|
|
|
|
|
|
- throw;
|
|
|
- }
|
|
|
+ // get the next work action
|
|
|
+ action = q.Dequeue();
|
|
|
}
|
|
|
+ // loop back and execute the action
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -79,9 +102,9 @@ namespace System.Reactive.Concurrency
|
|
|
/// </summary>
|
|
|
public void Dispose()
|
|
|
{
|
|
|
- lock (queue)
|
|
|
+ lock (guard)
|
|
|
{
|
|
|
- queue.Clear();
|
|
|
+ queue = null;
|
|
|
hasFaulted = true;
|
|
|
}
|
|
|
}
|