|
@@ -0,0 +1,292 @@
|
|
|
+#if !BCL_HAS_CONFIGUREAWAIT
|
|
|
+// Licensed to the .NET Foundation under one or more agreements.
|
|
|
+// The .NET Foundation licenses this file to you under the MIT license.
|
|
|
+// See the LICENSE file in the project root for more information.
|
|
|
+
|
|
|
+using System.Diagnostics;
|
|
|
+using System.Runtime.ExceptionServices;
|
|
|
+using System.Runtime.InteropServices;
|
|
|
+
|
|
|
+namespace System.Threading.Tasks.Sources
|
|
|
+{
|
|
|
+ /// <summary>Provides the core logic for implementing a manual-reset <see cref="IValueTaskSource"/> or <see cref="IValueTaskSource{TResult}"/>.</summary>
|
|
|
+ /// <typeparam name="TResult"></typeparam>
|
|
|
+ [StructLayout(LayoutKind.Auto)]
|
|
|
+ public struct ManualResetValueTaskSourceCore<TResult>
|
|
|
+ {
|
|
|
+ /// <summary>
|
|
|
+ /// The callback to invoke when the operation completes if <see cref="OnCompleted"/> was called before the operation completed,
|
|
|
+ /// or <see cref="ManualResetValueTaskSourceCoreShared.s_sentinel"/> if the operation completed before a callback was supplied,
|
|
|
+ /// or null if a callback hasn't yet been provided and the operation hasn't yet completed.
|
|
|
+ /// </summary>
|
|
|
+ private Action<object> _continuation;
|
|
|
+ /// <summary>State to pass to <see cref="_continuation"/>.</summary>
|
|
|
+ private object _continuationState;
|
|
|
+ /// <summary><see cref="ExecutionContext"/> to flow to the callback, or null if no flowing is required.</summary>
|
|
|
+ private ExecutionContext _executionContext;
|
|
|
+ /// <summary>
|
|
|
+ /// A "captured" <see cref="SynchronizationContext"/> or <see cref="TaskScheduler"/> with which to invoke the callback,
|
|
|
+ /// or null if no special context is required.
|
|
|
+ /// </summary>
|
|
|
+ private object _capturedContext;
|
|
|
+ /// <summary>Whether the current operation has completed.</summary>
|
|
|
+ private bool _completed;
|
|
|
+ /// <summary>The result with which the operation succeeded, or the default value if it hasn't yet completed or failed.</summary>
|
|
|
+ private TResult _result;
|
|
|
+ /// <summary>The exception with which the operation failed, or null if it hasn't yet completed or completed successfully.</summary>
|
|
|
+ private ExceptionDispatchInfo _error;
|
|
|
+ /// <summary>The current version of this value, used to help prevent misuse.</summary>
|
|
|
+ private short _version;
|
|
|
+
|
|
|
+ /// <summary>Gets or sets whether to force continuations to run asynchronously.</summary>
|
|
|
+ /// <remarks>Continuations may run asynchronously if this is false, but they'll never run synchronously if this is true.</remarks>
|
|
|
+ public bool RunContinuationsAsynchronously { get; set; }
|
|
|
+
|
|
|
+ /// <summary>Resets to prepare for the next operation.</summary>
|
|
|
+ public void Reset()
|
|
|
+ {
|
|
|
+ // Reset/update state for the next use/await of this instance.
|
|
|
+ _version++;
|
|
|
+ _completed = false;
|
|
|
+ _result = default;
|
|
|
+ _error = null;
|
|
|
+ _executionContext = null;
|
|
|
+ _capturedContext = null;
|
|
|
+ _continuation = null;
|
|
|
+ _continuationState = null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Completes with a successful result.</summary>
|
|
|
+ /// <param name="result">The result.</param>
|
|
|
+ public void SetResult(TResult result)
|
|
|
+ {
|
|
|
+ _result = result;
|
|
|
+ SignalCompletion();
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Complets with an error.</summary>
|
|
|
+ /// <param name="error"></param>
|
|
|
+ public void SetException(Exception error)
|
|
|
+ {
|
|
|
+ _error = ExceptionDispatchInfo.Capture(error);
|
|
|
+ SignalCompletion();
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Gets the operation version.</summary>
|
|
|
+ public short Version => _version;
|
|
|
+
|
|
|
+ /// <summary>Gets the status of the operation.</summary>
|
|
|
+ /// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param>
|
|
|
+ public ValueTaskSourceStatus GetStatus(short token)
|
|
|
+ {
|
|
|
+ ValidateToken(token);
|
|
|
+ return
|
|
|
+ !_completed ? ValueTaskSourceStatus.Pending :
|
|
|
+ _error == null ? ValueTaskSourceStatus.Succeeded :
|
|
|
+ _error.SourceException is OperationCanceledException ? ValueTaskSourceStatus.Canceled :
|
|
|
+ ValueTaskSourceStatus.Faulted;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Gets the result of the operation.</summary>
|
|
|
+ /// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param>
|
|
|
+ [StackTraceHidden]
|
|
|
+ public TResult GetResult(short token)
|
|
|
+ {
|
|
|
+ ValidateToken(token);
|
|
|
+ if (!_completed)
|
|
|
+ {
|
|
|
+ ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ _error?.Throw();
|
|
|
+ return _result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Schedules the continuation action for this operation.</summary>
|
|
|
+ /// <param name="continuation">The continuation to invoke when the operation has completed.</param>
|
|
|
+ /// <param name="state">The state object to pass to <paramref name="continuation"/> when it's invoked.</param>
|
|
|
+ /// <param name="token">Opaque value that was provided to the <see cref="ValueTask"/>'s constructor.</param>
|
|
|
+ /// <param name="flags">The flags describing the behavior of the continuation.</param>
|
|
|
+ public void OnCompleted(Action<object> continuation, object state, short token, ValueTaskSourceOnCompletedFlags flags)
|
|
|
+ {
|
|
|
+ if (continuation == null)
|
|
|
+ {
|
|
|
+ throw new ArgumentNullException(nameof(continuation));
|
|
|
+ }
|
|
|
+ ValidateToken(token);
|
|
|
+
|
|
|
+ if ((flags & ValueTaskSourceOnCompletedFlags.FlowExecutionContext) != 0)
|
|
|
+ {
|
|
|
+ _executionContext = ExecutionContext.Capture();
|
|
|
+ }
|
|
|
+
|
|
|
+ if ((flags & ValueTaskSourceOnCompletedFlags.UseSchedulingContext) != 0)
|
|
|
+ {
|
|
|
+ SynchronizationContext sc = SynchronizationContext.Current;
|
|
|
+ if (sc != null && sc.GetType() != typeof(SynchronizationContext))
|
|
|
+ {
|
|
|
+ _capturedContext = sc;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ TaskScheduler ts = TaskScheduler.Current;
|
|
|
+ if (ts != TaskScheduler.Default)
|
|
|
+ {
|
|
|
+ _capturedContext = ts;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // We need to set the continuation state before we swap in the delegate, so that
|
|
|
+ // if there's a race between this and SetResult/Exception and SetResult/Exception
|
|
|
+ // sees the _continuation as non-null, it'll be able to invoke it with the state
|
|
|
+ // stored here. However, this also means that if this is used incorrectly (e.g.
|
|
|
+ // awaited twice concurrently), _continuationState might get erroneously overwritten.
|
|
|
+ // To minimize the chances of that, we check preemptively whether _continuation
|
|
|
+ // is already set to something other than the completion sentinel.
|
|
|
+
|
|
|
+ object oldContinuation = _continuation;
|
|
|
+ if (oldContinuation == null)
|
|
|
+ {
|
|
|
+ _continuationState = state;
|
|
|
+ oldContinuation = Interlocked.CompareExchange(ref _continuation, continuation, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (oldContinuation != null)
|
|
|
+ {
|
|
|
+ // Operation already completed, so we need to queue the supplied callback.
|
|
|
+ if (!ReferenceEquals(oldContinuation, ManualResetValueTaskSourceCoreShared.s_sentinel))
|
|
|
+ {
|
|
|
+ ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
|
|
|
+ }
|
|
|
+
|
|
|
+ switch (_capturedContext)
|
|
|
+ {
|
|
|
+ case null:
|
|
|
+ if (_executionContext != null)
|
|
|
+ {
|
|
|
+ ThreadPool.QueueUserWorkItem(s => continuation(s), state);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ThreadPool.UnsafeQueueUserWorkItem(s => continuation(s), state);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case SynchronizationContext sc:
|
|
|
+ sc.Post(s =>
|
|
|
+ {
|
|
|
+ var tuple = (Tuple<Action<object>, object>)s;
|
|
|
+ tuple.Item1(tuple.Item2);
|
|
|
+ }, Tuple.Create(continuation, state));
|
|
|
+ break;
|
|
|
+
|
|
|
+ case TaskScheduler ts:
|
|
|
+ Task.Factory.StartNew(continuation, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Ensures that the specified token matches the current version.</summary>
|
|
|
+ /// <param name="token">The token supplied by <see cref="ValueTask"/>.</param>
|
|
|
+ private void ValidateToken(short token)
|
|
|
+ {
|
|
|
+ if (token != _version)
|
|
|
+ {
|
|
|
+ ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>Signals that that the operation has completed. Invoked after the result or error has been set.</summary>
|
|
|
+ private void SignalCompletion()
|
|
|
+ {
|
|
|
+ if (_completed)
|
|
|
+ {
|
|
|
+ ManualResetValueTaskSourceCoreShared.ThrowInvalidOperationException();
|
|
|
+ }
|
|
|
+ _completed = true;
|
|
|
+
|
|
|
+ if (_continuation != null || Interlocked.CompareExchange(ref _continuation, ManualResetValueTaskSourceCoreShared.s_sentinel, null) != null)
|
|
|
+ {
|
|
|
+ if (_executionContext != null)
|
|
|
+ {
|
|
|
+ ExecutionContext.Run(_executionContext,
|
|
|
+ s => ((ManualResetValueTaskSourceCore<TResult>)s).InvokeContinuation(),
|
|
|
+ this);
|
|
|
+
|
|
|
+ //ExecutionContext.RunInternal(
|
|
|
+ // _executionContext,
|
|
|
+ // (ref ManualResetValueTaskSourceCore<TResult> s) => s.InvokeContinuation(),
|
|
|
+ // ref this);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ InvokeContinuation();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /// <summary>
|
|
|
+ /// Invokes the continuation with the appropriate captured context / scheduler.
|
|
|
+ /// This assumes that if <see cref="_executionContext"/> is not null we're already
|
|
|
+ /// running within that <see cref="ExecutionContext"/>.
|
|
|
+ /// </summary>
|
|
|
+ private void InvokeContinuation()
|
|
|
+ {
|
|
|
+ switch (_capturedContext)
|
|
|
+ {
|
|
|
+ case null:
|
|
|
+ if (RunContinuationsAsynchronously)
|
|
|
+ {
|
|
|
+ var c = _continuation;
|
|
|
+ if (_executionContext != null)
|
|
|
+ {
|
|
|
+ ThreadPool.QueueUserWorkItem(s => c(s), _continuationState);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ ThreadPool.UnsafeQueueUserWorkItem(s => c(s), _continuationState);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ _continuation(_continuationState);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+
|
|
|
+ case SynchronizationContext sc:
|
|
|
+ sc.Post(s =>
|
|
|
+ {
|
|
|
+ var state = (Tuple<Action<object>, object>)s;
|
|
|
+ state.Item1(state.Item2);
|
|
|
+ }, Tuple.Create(_continuation, _continuationState));
|
|
|
+ break;
|
|
|
+
|
|
|
+ case TaskScheduler ts:
|
|
|
+ Task.Factory.StartNew(_continuation, _continuationState, CancellationToken.None, TaskCreationOptions.DenyChildAttach, ts);
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ internal static class ManualResetValueTaskSourceCoreShared // separated out of generic to avoid unnecessary duplication
|
|
|
+ {
|
|
|
+ [StackTraceHidden]
|
|
|
+ internal static void ThrowInvalidOperationException() => throw new InvalidOperationException();
|
|
|
+
|
|
|
+ internal static readonly Action<object> s_sentinel = CompletionSentinel;
|
|
|
+ private static void CompletionSentinel(object _) // named method to aid debugging
|
|
|
+ {
|
|
|
+ Debug.Fail("The sentinel delegate should never be invoked.");
|
|
|
+ ThrowInvalidOperationException();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+#else
|
|
|
+
|
|
|
+using System.Runtime.CompilerServices;
|
|
|
+
|
|
|
+[assembly: TypeForwardedTo(typeof(System.Threading.Tasks.Sources.ManualResetValueTaskSourceCore<>))]
|
|
|
+#endif
|