Merge.cs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. // TODO: Add Merge with max concurrency and IEnumerable<T>-based overloads.
  12. public static IAsyncObservable<TSource> Merge<TSource>(this IAsyncObservable<IAsyncObservable<TSource>> source)
  13. {
  14. if (source == null)
  15. throw new ArgumentNullException(nameof(source));
  16. return Create<TSource>(async observer =>
  17. {
  18. var (sink, cancel) = AsyncObserver.Merge(observer);
  19. var subscription = await source.SubscribeAsync(sink);
  20. return StableCompositeAsyncDisposable.Create(subscription, cancel);
  21. });
  22. }
  23. }
  24. partial class AsyncObserver
  25. {
  26. public static (IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable) Merge<TSource>(IAsyncObserver<TSource> observer)
  27. {
  28. if (observer == null)
  29. throw new ArgumentNullException(nameof(observer));
  30. var gate = new AsyncLock();
  31. var count = 1;
  32. var disposable = new CompositeAsyncDisposable();
  33. async Task OnErrorAsync(Exception ex)
  34. {
  35. using (await gate.LockAsync().ConfigureAwait(false))
  36. {
  37. await observer.OnErrorAsync(ex);
  38. }
  39. };
  40. async Task OnCompletedAsync()
  41. {
  42. using (await gate.LockAsync().ConfigureAwait(false))
  43. {
  44. if (--count == 0)
  45. {
  46. await observer.OnCompletedAsync().ConfigureAwait(false);
  47. }
  48. }
  49. };
  50. return
  51. (
  52. Create<IAsyncObservable<TSource>>(
  53. async xs =>
  54. {
  55. using (await gate.LockAsync().ConfigureAwait(false))
  56. {
  57. count++;
  58. }
  59. var inner = await xs.SubscribeAsync(
  60. async x =>
  61. {
  62. using (await gate.LockAsync().ConfigureAwait(false))
  63. {
  64. await observer.OnNextAsync(x).ConfigureAwait(false);
  65. }
  66. },
  67. OnErrorAsync,
  68. OnCompletedAsync
  69. ).ConfigureAwait(false);
  70. await disposable.AddAsync(inner).ConfigureAwait(false);
  71. },
  72. OnErrorAsync,
  73. OnCompletedAsync
  74. ),
  75. disposable
  76. );
  77. }
  78. }
  79. }