Browse Source

Adding Zip with tuples.

Bart De Smet 8 years ago
parent
commit
7dbe577140

File diff suppressed because it is too large
+ 408 - 131
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Zip.Generated.cs


+ 134 - 5
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Zip.Generated.tt

@@ -15,6 +15,8 @@ using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
+    // TODO: Add overloads with tuples.
+
     partial class AsyncObservable
     {
 <#
@@ -24,7 +26,41 @@ for (var i = 2; i <= 15; i++)
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
     var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
     var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
+    var tuple = "(" + genArgs + ")";
+#>
+        public static IAsyncObservable<<#=tuple#>> Zip<<#=genArgs#>>(this <#=args#>)
+        {
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            if (source<#=j#> == null)
+                throw new ArgumentNullException(nameof(source<#=j#>));
+<#
+}
+#>
+
+            return Create<<#=tuple#>>(async observer =>
+            {
+                var d = new CompositeAsyncDisposable();
+
+                var <#=obs#> = AsyncObserver.Zip(observer);
+
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+                var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
+<#
+}
 #>
+
+                await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
+
+                return d;
+            });
+        }
+
         public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
         {
 <#
@@ -108,7 +144,105 @@ for (var i = 2; i <= 15; i++)
     var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
     var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
+    var tuple = "(" + genArgs + ")";
+
+    var all = string.Join(" && ", Enumerable.Range(1, i).Select(j => "values" + j + ".Count > 0"));
+    var vals = string.Join(", ", Enumerable.Range(1, i).Select(j => "values" + j + ".Dequeue()"));
+#>
+        public static <#=res#> Zip<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            var gate = new AsyncLock();
+
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            var values<#=j#> = new Queue<T<#=j#>>();
+<#
+}
+#>
+            var isDone = new bool[<#=i#>];
+
+            IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
+                Create<T>(
+                    async x =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            queue.Enqueue(x);
+
+                            if (<#=all#>)
+                            {
+                                await observer.OnNextAsync((<#=vals#>)).ConfigureAwait(false);
+                            }
+                            else
+                            {
+                                var allDone = true;
+
+                                for (var i = 0; i < <#=i#>; i++)
+                                {
+                                    if (i != index && !isDone[i])
+                                    {
+                                        allDone = false;
+                                        break;
+                                    }
+                                }
+
+                                if (allDone)
+                                {
+                                    await observer.OnCompletedAsync().ConfigureAwait(false);
+                                }
+                            }
+                        }
+                    },
+                    async ex =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        }
+                    },
+                    async () =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            isDone[index] = true;
+
+                            var allDone = true;
+
+                            for (var i = 0; i < <#=i#>; i++)
+                            {
+                                if (!isDone[i])
+                                {
+                                    allDone = false;
+                                    break;
+                                }
+                            }
+
+                            if (allDone)
+                            {
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    }
+                );
+
+            return
+            (
+<#
+for (var j = 1; j <= i; j++)
+{
 #>
+                CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
+<#
+}
+#>
+            );
+        }
+
         public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
         {
             if (observer == null)
@@ -137,11 +271,6 @@ for (var j = 1; j <= i; j++)
 }
 #>
             var isDone = new bool[<#=i#>];
-<#
-var all = string.Join(" && ", Enumerable.Range(1, i).Select(j => "values" + j + ".Count > 0"));
-var vals = string.Join(", ", Enumerable.Range(1, i).Select(j => "values" + j + ".Dequeue()"));
-var done = "";
-#>
 
             IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
                 Create<T>(

Some files were not shown because too many files changed in this diff