// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.ComponentModel; using System.Runtime.CompilerServices; using System.Threading; namespace System.Reactive.Concurrency { /// /// Represents an awaitable scheduler operation. Awaiting the object causes the continuation to be posted back to the originating scheduler's work queue. /// public sealed class SchedulerOperation { private readonly Func _schedule; private readonly CancellationToken _cancellationToken; private readonly bool _postBackToOriginalContext; internal SchedulerOperation(Func schedule, CancellationToken cancellationToken) : this(schedule, cancellationToken, false) { } internal SchedulerOperation(Func schedule, CancellationToken cancellationToken, bool postBackToOriginalContext) { _schedule = schedule; _cancellationToken = cancellationToken; _postBackToOriginalContext = postBackToOriginalContext; } /// /// Controls whether the continuation is run on the originating synchronization context (false by default). /// /// true to run the continuation on the captured synchronization context; false otherwise (default). /// Scheduler operation object with configured await behavior. public SchedulerOperation ConfigureAwait(bool continueOnCapturedContext) { return new SchedulerOperation(_schedule, _cancellationToken, continueOnCapturedContext); } /// /// Gets an awaiter for the scheduler operation, used to post back the continuation. /// /// Awaiter for the scheduler operation. public SchedulerOperationAwaiter GetAwaiter() { return new SchedulerOperationAwaiter(_schedule, _cancellationToken, _postBackToOriginalContext); } } /// /// (Infrastructure) Scheduler operation awaiter type used by the code generated for C# await and Visual Basic Await expressions. /// [EditorBrowsable(EditorBrowsableState.Never)] public sealed class SchedulerOperationAwaiter : INotifyCompletion { private readonly Func _schedule; private readonly CancellationToken _cancellationToken; private readonly bool _postBackToOriginalContext; private readonly CancellationTokenRegistration _ctr; internal SchedulerOperationAwaiter(Func schedule, CancellationToken cancellationToken, bool postBackToOriginalContext) { _schedule = schedule; _cancellationToken = cancellationToken; _postBackToOriginalContext = postBackToOriginalContext; if (cancellationToken.CanBeCanceled) { _ctr = _cancellationToken.Register(Cancel); } } /// /// Indicates whether the scheduler operation has completed. Returns false unless cancellation was already requested. /// public bool IsCompleted { get { return _cancellationToken.IsCancellationRequested; } } /// /// Completes the scheduler operation, throwing an OperationCanceledException in case cancellation was requested. /// public void GetResult() { _cancellationToken.ThrowIfCancellationRequested(); } /// /// Registers the continuation with the scheduler operation. /// /// Continuation to be run on the originating scheduler. public void OnCompleted(Action continuation) { if (continuation == null) throw new ArgumentNullException(nameof(continuation)); if (_continuation != null) throw new InvalidOperationException(Strings_Core.SCHEDULER_OPERATION_ALREADY_AWAITED); if (_postBackToOriginalContext) { var ctx = SynchronizationContext.Current; if (ctx != null) { var original = continuation; continuation = () => { // // No need for OperationStarted and OperationCompleted calls here; // this code is invoked through await support and will have a way // to observe its start/complete behavior, either through returned // Task objects or the async method builder's interaction with the // SynchronizationContext object. // // In general though, Rx doesn't play nicely with synchronization // contexts objects at the scheduler level. It's possible to start // async operations by calling Schedule, without a way to observe // their completion. Not interacting with SynchronizationContext // is a concious design decision as the performance impact was non // negligable and our schedulers abstract over more constructs. // ctx.Post(a => ((Action)a)(), original); }; } } var ran = 0; _continuation = () => { if (Interlocked.Exchange(ref ran, 1) == 0) { _ctr.Dispose(); // no null-check needed (struct) continuation(); } }; _work = _schedule(_continuation); } private volatile Action _continuation; private volatile IDisposable _work; private void Cancel() { var w = _work; if (w != null) w.Dispose(); var c = _continuation; if (c != null) c(); } } }