For.cs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. public partial class AsyncObservable
  10. {
  11. // REVIEW: Use a tail-recursive sink.
  12. // TODO: Add IAsyncEnumerable<T> based overlaod.
  13. public static IAsyncObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, IAsyncObservable<TResult>> resultSelector)
  14. {
  15. if (source == null)
  16. throw new ArgumentNullException(nameof(source));
  17. if (resultSelector == null)
  18. throw new ArgumentNullException(nameof(resultSelector));
  19. return For(source, x => new ValueTask<IAsyncObservable<TResult>>(resultSelector(x)));
  20. }
  21. public static IAsyncObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncObservable<TResult>>> resultSelector)
  22. {
  23. if (source == null)
  24. throw new ArgumentNullException(nameof(source));
  25. if (resultSelector == null)
  26. throw new ArgumentNullException(nameof(resultSelector));
  27. return Create<TResult>(async observer =>
  28. {
  29. var subscription = new SerialAsyncDisposable();
  30. var enumerator = source.GetEnumerator();
  31. var o = default(IAsyncObserver<TResult>);
  32. o = AsyncObserver.CreateUnsafe<TResult>(
  33. observer.OnNextAsync,
  34. observer.OnErrorAsync,
  35. MoveNext
  36. );
  37. async ValueTask MoveNext()
  38. {
  39. var b = default(bool);
  40. var next = default(IAsyncObservable<TResult>);
  41. try
  42. {
  43. b = enumerator.MoveNext();
  44. if (b)
  45. {
  46. next = await resultSelector(enumerator.Current).ConfigureAwait(false);
  47. }
  48. }
  49. catch (Exception ex)
  50. {
  51. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  52. return;
  53. }
  54. if (b)
  55. {
  56. var sad = new SingleAssignmentAsyncDisposable();
  57. await subscription.AssignAsync(sad).ConfigureAwait(false);
  58. var d = await next.SubscribeSafeAsync(o).ConfigureAwait(false);
  59. await sad.AssignAsync(d).ConfigureAwait(false);
  60. }
  61. else
  62. {
  63. await observer.OnCompletedAsync().ConfigureAwait(false);
  64. }
  65. }
  66. await MoveNext().ConfigureAwait(false);
  67. var disposeEnumerator = AsyncDisposable.Create(() =>
  68. {
  69. enumerator.Dispose();
  70. return default;
  71. });
  72. return StableCompositeAsyncDisposable.Create(disposeEnumerator, subscription);
  73. });
  74. }
  75. }
  76. }