1
0

Timeout.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerableEx
  10. {
  11. /// <summary>
  12. /// Applies a timeout policy for each element in the async-enumerable sequence.
  13. /// If the next element isn't received within the specified timeout duration starting from its predecessor, a TimeoutException is propagated to the observer.
  14. /// </summary>
  15. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  16. /// <param name="source">Source sequence to perform a timeout for.</param>
  17. /// <param name="timeout">Maximum duration between values before a timeout occurs.</param>
  18. /// <returns>The source sequence with a TimeoutException in case of a timeout.</returns>
  19. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  20. /// <exception cref="ArgumentOutOfRangeException"><paramref name="timeout"/> is less than TimeSpan.Zero.</exception>
  21. /// <exception cref="TimeoutException">(Asynchronous) If no element is produced within <paramref name="timeout"/> from the previous element.</exception>
  22. /// <remarks>
  23. /// <para>
  24. /// In case you only want to timeout on the first element, consider using the <see cref="Amb{TSource}(IAsyncEnumerable{TSource}, IAsyncEnumerable{TSource})"/>
  25. /// operator applied to the source sequence and a delayed <see cref="Throw{TResult}(Exception)"/> sequence.
  26. /// <!-- FIXME: Timeout with initial and per item timeout option not implemented yet.
  27. /// Alternatively, the general-purpose overload
  28. /// of Timeout, <see cref="Timeout{TSource, TTimeout}(IObservable{TSource}, IObservable{TTimeout}, Func{TSource, IObservable{TTimeout}})"/> can be used.
  29. /// -->
  30. /// </para>
  31. /// <para>
  32. /// Specifying a TimeSpan.Zero value for <paramref name="timeout"/> is not recommended but supported, causing timeout timers to be scheduled that are due
  33. /// immediately. However, this doesn't guarantee a timeout will occur, even for the first element. This is a side-effect of the asynchrony introduced by the
  34. /// scheduler, where the action to propagate a timeout may not execute immediately, despite the TimeSpan.Zero due time. In such cases, the next element may
  35. /// arrive before the scheduler gets a chance to run the timeout action.
  36. /// </para>
  37. /// </remarks>
  38. public static IAsyncEnumerable<TSource> Timeout<TSource>(this IAsyncEnumerable<TSource> source, TimeSpan timeout)
  39. {
  40. if (source == null)
  41. throw Error.ArgumentNull(nameof(source));
  42. var num = (long)timeout.TotalMilliseconds;
  43. if (num < -1L || num > int.MaxValue)
  44. throw Error.ArgumentOutOfRange(nameof(timeout));
  45. return new TimeoutAsyncIterator<TSource>(source, timeout);
  46. }
  47. private sealed class TimeoutAsyncIterator<TSource> : AsyncIterator<TSource>
  48. {
  49. private readonly IAsyncEnumerable<TSource> _source;
  50. private readonly TimeSpan _timeout;
  51. private IAsyncEnumerator<TSource>? _enumerator;
  52. private Task? _loserTask;
  53. private CancellationTokenSource? _sourceCTS;
  54. public TimeoutAsyncIterator(IAsyncEnumerable<TSource> source, TimeSpan timeout)
  55. {
  56. _source = source;
  57. _timeout = timeout;
  58. }
  59. public override AsyncIteratorBase<TSource> Clone()
  60. {
  61. return new TimeoutAsyncIterator<TSource>(_source, _timeout);
  62. }
  63. public override async ValueTask DisposeAsync()
  64. {
  65. if (_loserTask != null)
  66. {
  67. await _loserTask.ConfigureAwait(false);
  68. _loserTask = null;
  69. _enumerator = null;
  70. }
  71. else if (_enumerator != null)
  72. {
  73. await _enumerator.DisposeAsync().ConfigureAwait(false);
  74. _enumerator = null;
  75. }
  76. if (_sourceCTS != null)
  77. {
  78. _sourceCTS.Dispose();
  79. _sourceCTS = null;
  80. }
  81. await base.DisposeAsync().ConfigureAwait(false);
  82. }
  83. protected override async ValueTask<bool> MoveNextCore()
  84. {
  85. switch (_state)
  86. {
  87. case AsyncIteratorState.Allocated:
  88. _sourceCTS = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken);
  89. _enumerator = _source.GetAsyncEnumerator(_sourceCTS.Token);
  90. _state = AsyncIteratorState.Iterating;
  91. goto case AsyncIteratorState.Iterating;
  92. case AsyncIteratorState.Iterating:
  93. #if NET6_0_OR_GREATER
  94. #pragma warning disable CA2012 // Always await ValueTasks immediately; this is deliberate advanced usage
  95. #endif
  96. var moveNext = _enumerator!.MoveNextAsync();
  97. #if NET6_0_OR_GREATER
  98. #pragma warning restore CA2012
  99. #endif
  100. if (!moveNext.IsCompleted)
  101. {
  102. using var delayCts = CancellationTokenSource.CreateLinkedTokenSource(_cancellationToken);
  103. var delay = Task.Delay(_timeout, delayCts.Token);
  104. var next = moveNext.AsTask();
  105. var winner = await Task.WhenAny(next, delay).ConfigureAwait(false);
  106. if (winner == delay)
  107. {
  108. // NB: We still have to wait for the MoveNextAsync operation to complete before we can
  109. // dispose _enumerator. The resulting task will be used by DisposeAsync. Also note
  110. // that throwing an exception here causes a call to DisposeAsync, where we pick up
  111. // the task prepared below.
  112. // NB: Any exception reported by a timed out MoveNextAsync operation won't be reported
  113. // to the caller, but the task's exception is not marked as observed, so unhandled
  114. // exception handlers can still observe the exception.
  115. // REVIEW: Should exceptions reported by a timed out MoveNextAsync operation come out
  116. // when attempting to call DisposeAsync?
  117. _loserTask = next.ContinueWith((_, state) => ((IAsyncDisposable)state!).DisposeAsync().AsTask(), _enumerator);
  118. _sourceCTS!.Cancel();
  119. throw new TimeoutException();
  120. }
  121. delayCts.Cancel();
  122. }
  123. if (await moveNext.ConfigureAwait(false))
  124. {
  125. _current = _enumerator.Current;
  126. return true;
  127. }
  128. break;
  129. }
  130. await DisposeAsync().ConfigureAwait(false);
  131. return false;
  132. }
  133. }
  134. }
  135. }