DoWhile.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading.Tasks;
  6. namespace System.Reactive.Linq
  7. {
  8. public partial class AsyncObservable
  9. {
  10. // REVIEW: Use a tail-recursive sink.
  11. public static IAsyncObservable<TSource> DoWhile<TSource>(IAsyncObservable<TSource> source, Func<bool> condition)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (condition == null)
  16. throw new ArgumentNullException(nameof(condition));
  17. return Create<TSource>(async observer =>
  18. {
  19. var subscription = new SerialAsyncDisposable();
  20. var o = default(IAsyncObserver<TSource>);
  21. o = AsyncObserver.CreateUnsafe<TSource>(
  22. observer.OnNextAsync,
  23. observer.OnErrorAsync,
  24. MoveNext
  25. );
  26. async Task Subscribe()
  27. {
  28. var sad = new SingleAssignmentAsyncDisposable();
  29. await subscription.AssignAsync(sad).ConfigureAwait(false);
  30. var d = await source.SubscribeSafeAsync(o).ConfigureAwait(false);
  31. await sad.AssignAsync(d).ConfigureAwait(false);
  32. }
  33. async ValueTask MoveNext()
  34. {
  35. var b = default(bool);
  36. try
  37. {
  38. b = condition();
  39. }
  40. catch (Exception ex)
  41. {
  42. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  43. return;
  44. }
  45. if (b)
  46. {
  47. await Subscribe().ConfigureAwait(false);
  48. }
  49. else
  50. {
  51. await observer.OnCompletedAsync().ConfigureAwait(false);
  52. }
  53. }
  54. await Subscribe().ConfigureAwait(false);
  55. return subscription;
  56. });
  57. }
  58. public static IAsyncObservable<TSource> DoWhile<TSource>(IAsyncObservable<TSource> source, Func<ValueTask<bool>> condition)
  59. {
  60. if (source == null)
  61. throw new ArgumentNullException(nameof(source));
  62. if (condition == null)
  63. throw new ArgumentNullException(nameof(condition));
  64. return Create<TSource>(async observer =>
  65. {
  66. var subscription = new SerialAsyncDisposable();
  67. var o = default(IAsyncObserver<TSource>);
  68. o = AsyncObserver.CreateUnsafe<TSource>(
  69. observer.OnNextAsync,
  70. observer.OnErrorAsync,
  71. MoveNext
  72. );
  73. async Task Subscribe()
  74. {
  75. var sad = new SingleAssignmentAsyncDisposable();
  76. await subscription.AssignAsync(sad).ConfigureAwait(false);
  77. var d = await source.SubscribeSafeAsync(o).ConfigureAwait(false);
  78. await sad.AssignAsync(d).ConfigureAwait(false);
  79. }
  80. async ValueTask MoveNext()
  81. {
  82. var b = default(bool);
  83. try
  84. {
  85. b = await condition().ConfigureAwait(false);
  86. }
  87. catch (Exception ex)
  88. {
  89. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  90. return;
  91. }
  92. if (b)
  93. {
  94. await Subscribe().ConfigureAwait(false);
  95. }
  96. else
  97. {
  98. await observer.OnCompletedAsync().ConfigureAwait(false);
  99. }
  100. }
  101. await Subscribe().ConfigureAwait(false);
  102. return subscription;
  103. });
  104. }
  105. }
  106. }