| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | 
							- // Licensed to the .NET Foundation under one or more agreements.
 
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 
- // See the LICENSE file in the project root for more information. 
 
- using System.Reactive.Disposables;
 
- using System.Threading;
 
- using System.Threading.Tasks;
 
- namespace System.Reactive.Linq
 
- {
 
-     partial class AsyncObservable
 
-     {
 
-         // TODO: Add Merge with max concurrency and IEnumerable<T>-based overloads.
 
-         public static IAsyncObservable<TSource> Merge<TSource>(this IAsyncObservable<IAsyncObservable<TSource>> source)
 
-         {
 
-             if (source == null)
 
-                 throw new ArgumentNullException(nameof(source));
 
-             return Create<TSource>(async observer =>
 
-             {
 
-                 var (sink, cancel) = AsyncObserver.Merge(observer);
 
-                 var subscription = await source.SubscribeAsync(sink);
 
-                 return StableCompositeAsyncDisposable.Create(subscription, cancel);
 
-             });
 
-         }
 
-     }
 
-     partial class AsyncObserver
 
-     {
 
-         public static (IAsyncObserver<IAsyncObservable<TSource>>, IAsyncDisposable) Merge<TSource>(IAsyncObserver<TSource> observer)
 
-         {
 
-             if (observer == null)
 
-                 throw new ArgumentNullException(nameof(observer));
 
-             var gate = new AsyncLock();
 
-             var count = 1;
 
-             var disposable = new CompositeAsyncDisposable();
 
-             async Task OnErrorAsync(Exception ex)
 
-             {
 
-                 using (await gate.LockAsync().ConfigureAwait(false))
 
-                 {
 
-                     await observer.OnErrorAsync(ex);
 
-                 }
 
-             };
 
-             async Task OnCompletedAsync()
 
-             {
 
-                 using (await gate.LockAsync().ConfigureAwait(false))
 
-                 {
 
-                     if (--count == 0)
 
-                     {
 
-                         await observer.OnCompletedAsync().ConfigureAwait(false);
 
-                     }
 
-                 }
 
-             };
 
-             return
 
-                 (
 
-                     Create<IAsyncObservable<TSource>>(
 
-                         async xs =>
 
-                         {
 
-                             using (await gate.LockAsync().ConfigureAwait(false))
 
-                             {
 
-                                 count++;
 
-                             }
 
-                             var inner = await xs.SubscribeAsync(
 
-                                 async x =>
 
-                                 {
 
-                                     using (await gate.LockAsync().ConfigureAwait(false))
 
-                                     {
 
-                                         await observer.OnNextAsync(x).ConfigureAwait(false);
 
-                                     }
 
-                                 },
 
-                                 OnErrorAsync,
 
-                                 OnCompletedAsync
 
-                             ).ConfigureAwait(false);
 
-                             await disposable.AddAsync(inner).ConfigureAwait(false);
 
-                         },
 
-                         OnErrorAsync,
 
-                         OnCompletedAsync
 
-                     ),
 
-                     disposable
 
-                 );
 
-         }
 
-     }
 
- }
 
 
  |