1
0

DoWhile.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. // REVIEW: Use a tail-recursive sink.
  11. public static IAsyncObservable<TSource> DoWhile<TSource>(IAsyncObservable<TSource> source, Func<bool> condition)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (condition == null)
  16. throw new ArgumentNullException(nameof(condition));
  17. return Create(
  18. source,
  19. condition,
  20. static async (source, condition, observer) =>
  21. {
  22. var subscription = new SerialAsyncDisposable();
  23. var o = default(IAsyncObserver<TSource>);
  24. o = AsyncObserver.CreateUnsafe<TSource>(
  25. observer.OnNextAsync,
  26. observer.OnErrorAsync,
  27. MoveNext
  28. );
  29. async Task Subscribe()
  30. {
  31. var sad = new SingleAssignmentAsyncDisposable();
  32. await subscription.AssignAsync(sad).ConfigureAwait(false);
  33. var d = await source.SubscribeSafeAsync(o).ConfigureAwait(false);
  34. await sad.AssignAsync(d).ConfigureAwait(false);
  35. }
  36. async ValueTask MoveNext()
  37. {
  38. var b = default(bool);
  39. try
  40. {
  41. b = condition();
  42. }
  43. catch (Exception ex)
  44. {
  45. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  46. return;
  47. }
  48. if (b)
  49. {
  50. await Subscribe().ConfigureAwait(false);
  51. }
  52. else
  53. {
  54. await observer.OnCompletedAsync().ConfigureAwait(false);
  55. }
  56. }
  57. await Subscribe().ConfigureAwait(false);
  58. return subscription;
  59. });
  60. }
  61. public static IAsyncObservable<TSource> DoWhile<TSource>(IAsyncObservable<TSource> source, Func<ValueTask<bool>> condition)
  62. {
  63. if (source == null)
  64. throw new ArgumentNullException(nameof(source));
  65. if (condition == null)
  66. throw new ArgumentNullException(nameof(condition));
  67. return Create(
  68. source,
  69. condition,
  70. static async (source, condition, observer) =>
  71. {
  72. var subscription = new SerialAsyncDisposable();
  73. var o = default(IAsyncObserver<TSource>);
  74. o = AsyncObserver.CreateUnsafe<TSource>(
  75. observer.OnNextAsync,
  76. observer.OnErrorAsync,
  77. MoveNext
  78. );
  79. async Task Subscribe()
  80. {
  81. var sad = new SingleAssignmentAsyncDisposable();
  82. await subscription.AssignAsync(sad).ConfigureAwait(false);
  83. var d = await source.SubscribeSafeAsync(o).ConfigureAwait(false);
  84. await sad.AssignAsync(d).ConfigureAwait(false);
  85. }
  86. async ValueTask MoveNext()
  87. {
  88. var b = default(bool);
  89. try
  90. {
  91. b = await condition().ConfigureAwait(false);
  92. }
  93. catch (Exception ex)
  94. {
  95. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  96. return;
  97. }
  98. if (b)
  99. {
  100. await Subscribe().ConfigureAwait(false);
  101. }
  102. else
  103. {
  104. await observer.OnCompletedAsync().ConfigureAwait(false);
  105. }
  106. }
  107. await Subscribe().ConfigureAwait(false);
  108. return subscription;
  109. });
  110. }
  111. }
  112. }