Преглед изворни кода

Async support on join patterns.

Bart De Smet пре 8 година
родитељ
комит
8c0a8a4e81

+ 130 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Joins/AsyncPattern.Generated.cs

@@ -2,6 +2,8 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
+using System.Threading.Tasks;
+
 namespace System.Reactive.Joins
 namespace System.Reactive.Joins
 {
 {
     public class AsyncPattern<TSource1> : AsyncPattern
     public class AsyncPattern<TSource1> : AsyncPattern
@@ -20,6 +22,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TResult>(this, selector);
             return new AsyncPlan<TSource1, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2> : AsyncPattern
@@ -40,6 +50,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3> : AsyncPattern
@@ -62,6 +80,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4> : AsyncPattern
@@ -86,6 +112,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5> : AsyncPattern
@@ -112,6 +146,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6> : AsyncPattern
@@ -140,6 +182,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7> : AsyncPattern
@@ -170,6 +220,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8> : AsyncPattern
@@ -202,6 +260,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9> : AsyncPattern
@@ -236,6 +302,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10> : AsyncPattern
@@ -272,6 +346,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11> : AsyncPattern
@@ -310,6 +392,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12> : AsyncPattern
@@ -350,6 +440,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13> : AsyncPattern
@@ -392,6 +490,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14> : AsyncPattern
@@ -436,6 +542,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15> : AsyncPattern
@@ -482,6 +596,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(this, selector);
+        }
     }
     }
 
 
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16> : AsyncPattern
     public class AsyncPattern<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16> : AsyncPattern
@@ -530,6 +652,14 @@ namespace System.Reactive.Joins
 
 
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(this, selector);
             return new AsyncPlan<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(this, selector);
+        }
     }
     }
 
 
 }
 }

+ 10 - 0
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Joins/AsyncPattern.Generated.tt

@@ -8,6 +8,8 @@
 <#@ import namespace="System.Text" #>
 <#@ import namespace="System.Text" #>
 <#@ import namespace="System.Collections.Generic" #>
 <#@ import namespace="System.Collections.Generic" #>
 <#@ output extension=".cs" #>
 <#@ output extension=".cs" #>
+using System.Threading.Tasks;
+
 namespace System.Reactive.Joins
 namespace System.Reactive.Joins
 {
 {
 <#
 <#
@@ -46,6 +48,14 @@ for (var j = 1; j <= i; j++)
 
 
             return new AsyncPlan<<#=genArgs#>, TResult>(this, selector);
             return new AsyncPlan<<#=genArgs#>, TResult>(this, selector);
         }
         }
+
+        public AsyncPlan<TResult> Then<TResult>(Func<<#=genArgs#>, Task<TResult>> selector)
+        {
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPlanWithTask<<#=genArgs#>, TResult>(this, selector);
+        }
     }
     }
 
 
 <#
 <#

Разлика између датотеке није приказан због своје велике величине
+ 507 - 182
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Joins/AsyncPlan.Generated.cs


+ 34 - 8
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Joins/AsyncPlan.Generated.tt

@@ -19,19 +19,45 @@ for (var i = 1; i <= 16; i++)
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j));
     var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j));
     var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable<TSource" + j + "> source" + j));
     var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable<TSource" + j + "> source" + j));
     var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
     var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
+    var evalPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j + " arg" + j));
 #>
 #>
-    internal sealed class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlan<TResult>
+    internal sealed class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>
     {
     {
-        public AsyncPattern<<#=genArgs#>> Expression { get; }
-
-        public Func<<#=genArgs#>, TResult> Selector { get; }
+        private readonly Func<<#=genArgs#>, TResult> _selector;
 
 
         internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector)
         internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector)
+            : base(expression)
+        {
+            _selector = selector;
+        }
+
+        protected override Task<TResult> EvalAsync(<#=evalPars#>) => Task.FromResult(_selector(<#=pars#>));
+    }
+
+    internal sealed class AsyncPlanWithTask<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>
+    {
+        private readonly Func<<#=genArgs#>, Task<TResult>> _selector;
+
+        internal AsyncPlanWithTask(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, Task<TResult>> selector)
+            : base(expression)
         {
         {
-            Expression = expression;
-            Selector = selector;
+            _selector = selector;
         }
         }
 
 
+        protected override Task<TResult> EvalAsync(<#=evalPars#>) => _selector(<#=pars#>);
+    }
+
+    internal abstract class AsyncPlanBase<<#=genArgs#>, TResult> : AsyncPlan<TResult>
+    {
+        private readonly AsyncPattern<<#=genArgs#>> _expression;
+
+        internal AsyncPlanBase(AsyncPattern<<#=genArgs#>> expression)
+        {
+            _expression = expression;
+        }
+
+        protected abstract Task<TResult> EvalAsync(<#=evalPars#>); // REVIEW: Consider the use of ValueTask<TResult>.
+
         internal override ActiveAsyncPlan Activate(Dictionary<object, IAsyncJoinObserver> externalSubscriptions, IAsyncObserver<TResult> observer, Func<ActiveAsyncPlan, Task> deactivate)
         internal override ActiveAsyncPlan Activate(Dictionary<object, IAsyncJoinObserver> externalSubscriptions, IAsyncObserver<TResult> observer, Func<ActiveAsyncPlan, Task> deactivate)
         {
         {
             var onError = new Func<Exception, Task>(observer.OnErrorAsync);
             var onError = new Func<Exception, Task>(observer.OnErrorAsync);
@@ -40,7 +66,7 @@ for (var i = 1; i <= 16; i++)
 for (var j = 1; j <= i; j++)
 for (var j = 1; j <= i; j++)
 {
 {
 #>
 #>
-            var joinObserver<#=j#> = AsyncPlan<TResult>.CreateObserver<TSource<#=j#>>(externalSubscriptions, this.Expression.Source<#=j#>, onError);
+            var joinObserver<#=j#> = AsyncPlan<TResult>.CreateObserver<TSource<#=j#>>(externalSubscriptions, _expression.Source<#=j#>, onError);
 <#
 <#
 }
 }
 #>
 #>
@@ -62,7 +88,7 @@ for (var j = 1; j <= i; j++)
 
 
                     try
                     try
                     {
                     {
-                        res = Selector(<#=pars#>);
+                        res = await EvalAsync(<#=pars#>).ConfigureAwait(false);
                     }
                     }
                     catch (Exception ex)
                     catch (Exception ex)
                     {
                     {

+ 11 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Then.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
 using System.Reactive.Joins;
 using System.Reactive.Joins;
+using System.Threading.Tasks;
 
 
 namespace System.Reactive.Linq
 namespace System.Reactive.Linq
 {
 {
@@ -10,8 +11,6 @@ namespace System.Reactive.Linq
 
 
     partial class AsyncObservable
     partial class AsyncObservable
     {
     {
-        // REVIEW: Consider adding async support.
-
         public static AsyncPlan<TResult> Then<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, TResult> selector)
         public static AsyncPlan<TResult> Then<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, TResult> selector)
         {
         {
             if (source == null)
             if (source == null)
@@ -21,5 +20,15 @@ namespace System.Reactive.Linq
 
 
             return new AsyncPattern<TSource>(source).Then(selector);
             return new AsyncPattern<TSource>(source).Then(selector);
         }
         }
+
+        public static AsyncPlan<TResult> Then<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, Task<TResult>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return new AsyncPattern<TSource>(source).Then(selector);
+        }
     }
     }
 }
 }

Неке датотеке нису приказане због велике количине промена