SchedulerOperation.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.ComponentModel;
  5. using System.Runtime.CompilerServices;
  6. using System.Threading;
  7. namespace System.Reactive.Concurrency
  8. {
  9. /// <summary>
  10. /// Represents an awaitable scheduler operation. Awaiting the object causes the continuation to be posted back to the originating scheduler's work queue.
  11. /// </summary>
  12. public sealed class SchedulerOperation
  13. {
  14. private readonly Func<Action, IDisposable> _schedule;
  15. private readonly CancellationToken _cancellationToken;
  16. private readonly bool _postBackToOriginalContext;
  17. internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken)
  18. : this(schedule, cancellationToken, false)
  19. {
  20. }
  21. internal SchedulerOperation(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext)
  22. {
  23. _schedule = schedule;
  24. _cancellationToken = cancellationToken;
  25. _postBackToOriginalContext = postBackToOriginalContext;
  26. }
  27. /// <summary>
  28. /// Controls whether the continuation is run on the originating synchronization context (false by default).
  29. /// </summary>
  30. /// <param name="continueOnCapturedContext">true to run the continuation on the captured synchronization context; false otherwise (default).</param>
  31. /// <returns>Scheduler operation object with configured await behavior.</returns>
  32. public SchedulerOperation ConfigureAwait(bool continueOnCapturedContext)
  33. {
  34. return new SchedulerOperation(_schedule, _cancellationToken, continueOnCapturedContext);
  35. }
  36. /// <summary>
  37. /// Gets an awaiter for the scheduler operation, used to post back the continuation.
  38. /// </summary>
  39. /// <returns>Awaiter for the scheduler operation.</returns>
  40. public SchedulerOperationAwaiter GetAwaiter()
  41. {
  42. return new SchedulerOperationAwaiter(_schedule, _cancellationToken, _postBackToOriginalContext);
  43. }
  44. }
  45. /// <summary>
  46. /// (Infrastructure) Scheduler operation awaiter type used by the code generated for C# await and Visual Basic Await expressions.
  47. /// </summary>
  48. [EditorBrowsable(EditorBrowsableState.Never)]
  49. public sealed class SchedulerOperationAwaiter : INotifyCompletion
  50. {
  51. private readonly Func<Action, IDisposable> _schedule;
  52. private readonly CancellationToken _cancellationToken;
  53. private readonly bool _postBackToOriginalContext;
  54. private readonly CancellationTokenRegistration _ctr;
  55. internal SchedulerOperationAwaiter(Func<Action, IDisposable> schedule, CancellationToken cancellationToken, bool postBackToOriginalContext)
  56. {
  57. _schedule = schedule;
  58. _cancellationToken = cancellationToken;
  59. _postBackToOriginalContext = postBackToOriginalContext;
  60. if (cancellationToken.CanBeCanceled)
  61. {
  62. _ctr = _cancellationToken.Register(Cancel);
  63. }
  64. }
  65. /// <summary>
  66. /// Indicates whether the scheduler operation has completed. Returns false unless cancellation was already requested.
  67. /// </summary>
  68. public bool IsCompleted => _cancellationToken.IsCancellationRequested;
  69. /// <summary>
  70. /// Completes the scheduler operation, throwing an OperationCanceledException in case cancellation was requested.
  71. /// </summary>
  72. public void GetResult() => _cancellationToken.ThrowIfCancellationRequested();
  73. /// <summary>
  74. /// Registers the continuation with the scheduler operation.
  75. /// </summary>
  76. /// <param name="continuation">Continuation to be run on the originating scheduler.</param>
  77. public void OnCompleted(Action continuation)
  78. {
  79. if (continuation == null)
  80. throw new ArgumentNullException(nameof(continuation));
  81. if (_continuation != null)
  82. throw new InvalidOperationException(Strings_Core.SCHEDULER_OPERATION_ALREADY_AWAITED);
  83. if (_postBackToOriginalContext)
  84. {
  85. var ctx = SynchronizationContext.Current;
  86. if (ctx != null)
  87. {
  88. var original = continuation;
  89. continuation = () =>
  90. {
  91. //
  92. // No need for OperationStarted and OperationCompleted calls here;
  93. // this code is invoked through await support and will have a way
  94. // to observe its start/complete behavior, either through returned
  95. // Task objects or the async method builder's interaction with the
  96. // SynchronizationContext object.
  97. //
  98. // In general though, Rx doesn't play nicely with synchronization
  99. // contexts objects at the scheduler level. It's possible to start
  100. // async operations by calling Schedule, without a way to observe
  101. // their completion. Not interacting with SynchronizationContext
  102. // is a concious design decision as the performance impact was non
  103. // negligable and our schedulers abstract over more constructs.
  104. //
  105. ctx.Post(a => ((Action)a)(), original);
  106. };
  107. }
  108. }
  109. var ran = 0;
  110. _continuation = () =>
  111. {
  112. if (Interlocked.Exchange(ref ran, 1) == 0)
  113. {
  114. _ctr.Dispose(); // no null-check needed (struct)
  115. continuation();
  116. }
  117. };
  118. _work = _schedule(_continuation);
  119. }
  120. private volatile Action _continuation;
  121. private volatile IDisposable _work;
  122. private void Cancel()
  123. {
  124. _work?.Dispose();
  125. _continuation?.Invoke();
  126. }
  127. }
  128. }