// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; namespace System.Reactive.Concurrency { /// /// Asynchronous lock. /// public sealed class AsyncLock : IDisposable { private bool _isAcquired; private bool _hasFaulted; private readonly object _guard = new object(); private Queue<(Action action, Delegate @delegate, object state)> _queue; /// /// Queues the action for execution. If the caller acquires the lock and becomes the owner, /// the queue is processed. If the lock is already owned, the action is queued and will get /// processed by the owner. /// /// Action to queue for execution. /// is null. public void Wait(Action action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } Wait(action, static closureAction => closureAction()); } /// /// Queues the action for execution. If the caller acquires the lock and becomes the owner, /// the queue is processed. If the lock is already owned, the action is queued and will get /// processed by the owner. /// /// Action to queue for execution. /// The state to pass to the action when it gets invoked under the lock. /// is null. /// In case TState is a value type, this operation will involve boxing of . /// However, this is often an improvement over the allocation of a closure object and a delegate. internal void Wait(TState state, Action action) { if (action == null) { throw new ArgumentNullException(nameof(action)); } Wait(state, action, static (actionObject, stateObject) => ((Action)actionObject)((TState)stateObject)); } private void Wait(object state, Delegate @delegate, Action action) { // allow one thread to update the state lock (_guard) { // if a previous action crashed, ignore any future actions if (_hasFaulted) { return; } // if the "lock" is busy, queue up the extra work // otherwise there is no need to queue up "action" if (_isAcquired) { // create the queue if necessary var q = _queue; if (q == null) { q = new Queue<(Action action, Delegate @delegate, object state)>(); _queue = q; } // enqueue the work q.Enqueue((action, @delegate, state)); return; } // indicate there is processing going on _isAcquired = true; } // if we get here, execute the "action" first for (; ; ) { try { action(@delegate, state); } catch { // the execution failed, terminate this AsyncLock lock (_guard) { // throw away the queue _queue = null; // report fault _hasFaulted = true; } throw; } // execution succeeded, let's see if more work has to be done lock (_guard) { var q = _queue; // either there is no queue yet or we run out of work if (q == null || q.Count == 0) { // release the lock _isAcquired = false; return; } // get the next work action (action, @delegate, state) = q.Dequeue(); } // loop back and execute the action } } /// /// Clears the work items in the queue and drops further work being queued. /// public void Dispose() { lock (_guard) { _queue = null; _hasFaulted = true; } } } }