// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
namespace System.Reactive.Concurrency
{
///
/// Asynchronous lock.
///
public sealed class AsyncLock : IDisposable
{
private bool _isAcquired;
private bool _hasFaulted;
private readonly object _guard = new object();
private Queue<(Action action, Delegate @delegate, object state)> _queue;
///
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
/// the queue is processed. If the lock is already owned, the action is queued and will get
/// processed by the owner.
///
/// Action to queue for execution.
/// is null.
public void Wait(Action action)
{
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
Wait(action, static closureAction => closureAction());
}
///
/// Queues the action for execution. If the caller acquires the lock and becomes the owner,
/// the queue is processed. If the lock is already owned, the action is queued and will get
/// processed by the owner.
///
/// Action to queue for execution.
/// The state to pass to the action when it gets invoked under the lock.
/// is null.
/// In case TState is a value type, this operation will involve boxing of .
/// However, this is often an improvement over the allocation of a closure object and a delegate.
internal void Wait(TState state, Action action)
{
if (action == null)
{
throw new ArgumentNullException(nameof(action));
}
Wait(state, action, static (actionObject, stateObject) => ((Action)actionObject)((TState)stateObject));
}
private void Wait(object state, Delegate @delegate, Action action)
{
// allow one thread to update the state
lock (_guard)
{
// if a previous action crashed, ignore any future actions
if (_hasFaulted)
{
return;
}
// if the "lock" is busy, queue up the extra work
// otherwise there is no need to queue up "action"
if (_isAcquired)
{
// create the queue if necessary
var q = _queue;
if (q == null)
{
q = new Queue<(Action action, Delegate @delegate, object state)>();
_queue = q;
}
// enqueue the work
q.Enqueue((action, @delegate, state));
return;
}
// indicate there is processing going on
_isAcquired = true;
}
// if we get here, execute the "action" first
for (; ; )
{
try
{
action(@delegate, state);
}
catch
{
// the execution failed, terminate this AsyncLock
lock (_guard)
{
// throw away the queue
_queue = null;
// report fault
_hasFaulted = true;
}
throw;
}
// execution succeeded, let's see if more work has to be done
lock (_guard)
{
var q = _queue;
// either there is no queue yet or we run out of work
if (q == null || q.Count == 0)
{
// release the lock
_isAcquired = false;
return;
}
// get the next work action
(action, @delegate, state) = q.Dequeue();
}
// loop back and execute the action
}
}
///
/// Clears the work items in the queue and drops further work being queued.
///
public void Dispose()
{
lock (_guard)
{
_queue = null;
_hasFaulted = true;
}
}
}
}