Merge.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. // TODO: Add Merge with max concurrency and IEnumerable<T>-based overloads.
  12. public static IAsyncObservable<TSource> Merge<TSource>(this IAsyncObservable<IAsyncObservable<TSource>> source)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. return Create<TSource>(async observer =>
  17. {
  18. var (sink, cancel) = AsyncObserver.Merge(observer);
  19. var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
  20. return StableCompositeAsyncDisposable.Create(subscription, cancel);
  21. });
  22. }
  23. }
  24. partial class AsyncObserver
  25. {
  26. public static (IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable) Merge<TSource>(IAsyncObserver<TSource> observer)
  27. {
  28. if (observer == null)
  29. throw new ArgumentNullException(nameof(observer));
  30. var gate = new AsyncLock();
  31. var count = 1;
  32. var disposable = new CompositeAsyncDisposable();
  33. async Task OnErrorAsync(Exception ex)
  34. {
  35. using (await gate.LockAsync().ConfigureAwait(false))
  36. {
  37. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  38. }
  39. };
  40. async Task OnCompletedAsync()
  41. {
  42. using (await gate.LockAsync().ConfigureAwait(false))
  43. {
  44. if (--count == 0)
  45. {
  46. await observer.OnCompletedAsync().ConfigureAwait(false);
  47. }
  48. }
  49. };
  50. return
  51. (
  52. Create<IAsyncObservable<TSource>>(
  53. async xs =>
  54. {
  55. using (await gate.LockAsync().ConfigureAwait(false))
  56. {
  57. count++;
  58. }
  59. var inner = new SingleAssignmentAsyncDisposable();
  60. await disposable.AddAsync(inner).ConfigureAwait(false);
  61. var innerObserver = Create<TSource>(
  62. async x =>
  63. {
  64. using (await gate.LockAsync().ConfigureAwait(false))
  65. {
  66. await observer.OnNextAsync(x).ConfigureAwait(false);
  67. }
  68. },
  69. OnErrorAsync,
  70. async () =>
  71. {
  72. await OnCompletedAsync().ConfigureAwait(false);
  73. await disposable.RemoveAsync(inner).ConfigureAwait(false);
  74. }
  75. );
  76. var innerSubscription = await xs.SubscribeSafeAsync(innerObserver).ConfigureAwait(false);
  77. await inner.AssignAsync(innerSubscription).ConfigureAwait(false);
  78. },
  79. OnErrorAsync,
  80. OnCompletedAsync
  81. ),
  82. disposable
  83. );
  84. }
  85. }
  86. }