Browse Source

Adding CombineLatest with tuples.

Bart De Smet 8 years ago
parent
commit
e97b9bfa52

File diff suppressed because it is too large
+ 6582 - 202
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/CombineLatest.Generated.cs


+ 110 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/CombineLatest.Generated.tt

@@ -23,7 +23,41 @@ for (var i = 2; i <= 15; i++)
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
     var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
     var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
+    var tuple = "(" + genArgs + ")";
 #>
+        public static IAsyncObservable<<#=tuple#>> CombineLatest<<#=genPars#>>(this <#=args#>)
+        {
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            if (source<#=j#> == null)
+                throw new ArgumentNullException(nameof(source<#=j#>));
+<#
+}
+#>
+
+            return Create<<#=tuple#>>(async observer =>
+            {
+                var d = new CompositeAsyncDisposable();
+
+                var <#=obs#> = AsyncObserver.CombineLatest(observer);
+
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+                var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
+<#
+}
+#>
+
+                await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
+
+                return d;
+            });
+        }
+
         public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
         {
 <#
@@ -107,7 +141,83 @@ for (var i = 2; i <= 15; i++)
     var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
     var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
+    var tuple = "(" + genArgs + ")";
+#>
+        public static <#=res#> CombineLatest<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            bool allHasValue = false;
+
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+            bool hasValue<#=j#> = false;
+            bool isDone<#=j#> = false;
+            T<#=j#> latestValue<#=j#> = default(T<#=j#>);
+<#
+}
 #>
+
+            var gate = new AsyncLock();
+
+            return
+            (
+<#
+for (var j = 1; j <= i; j++)
+{
+#>
+                Create<T<#=j#>>(
+                    async x =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            if (!hasValue<#=j#>)
+                            {
+                                hasValue<#=j#> = true;
+                                allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
+                            }
+
+                            latestValue<#=j#> = x;
+
+                            if (allHasValue)
+                            {
+                                await observer.OnNextAsync((<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>)).ConfigureAwait(false);
+                            }
+                            else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
+                            {
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    },
+                    async ex =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false);
+                        }
+                    },
+                    async () =>
+                    {
+                        using (await gate.LockAsync().ConfigureAwait(false))
+                        {
+                            isDone<#=j#> = true;
+
+                            if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
+                            {
+                                await observer.OnCompletedAsync().ConfigureAwait(false);
+                            }
+                        }
+                    }
+                )<#=(j < i ? "," : "")#>
+<#
+}
+#>
+            );
+        }
+
         public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
         {
             if (observer == null)

Some files were not shown because too many files changed in this diff