// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading.Tasks; namespace System.Reactive.Joins { internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1) => new ValueTask(_selector(arg1)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1) => _selector(arg1); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, async (arg1) => { var res = default(TResult); try { res = await EvalAsync(arg1).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2) => new ValueTask(_selector(arg1, arg2)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2) => _selector(arg1, arg2); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, async (arg1, arg2) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3) => new ValueTask(_selector(arg1, arg2, arg3)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3) => _selector(arg1, arg2, arg3); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, async (arg1, arg2, arg3) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4) => new ValueTask(_selector(arg1, arg2, arg3, arg4)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4) => _selector(arg1, arg2, arg3, arg4); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, async (arg1, arg2, arg3, arg4) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5) => _selector(arg1, arg2, arg3, arg4, arg5); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, async (arg1, arg2, arg3, arg4, arg5) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6) => _selector(arg1, arg2, arg3, arg4, arg5, arg6); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, async (arg1, arg2, arg3, arg4, arg5, arg6) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var joinObserver12 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source12, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, joinObserver12, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver12.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); joinObserver12.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var joinObserver12 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source12, onError); var joinObserver13 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source13, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, joinObserver12, joinObserver13, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver12.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver13.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); joinObserver12.AddActivePlan(activePlan); joinObserver13.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var joinObserver12 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source12, onError); var joinObserver13 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source13, onError); var joinObserver14 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source14, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, joinObserver12, joinObserver13, joinObserver14, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver12.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver13.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver14.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); joinObserver12.AddActivePlan(activePlan); joinObserver13.AddActivePlan(activePlan); joinObserver14.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var joinObserver12 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source12, onError); var joinObserver13 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source13, onError); var joinObserver14 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source14, onError); var joinObserver15 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source15, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, joinObserver12, joinObserver13, joinObserver14, joinObserver15, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver12.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver13.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver14.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver15.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); joinObserver12.AddActivePlan(activePlan); joinObserver13.AddActivePlan(activePlan); joinObserver14.AddActivePlan(activePlan); joinObserver15.AddActivePlan(activePlan); return activePlan; } } internal sealed class AsyncPlan : AsyncPlanBase { private readonly Func _selector; internal AsyncPlan(AsyncPattern expression, Func selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15, TSource16 arg16) => new ValueTask(_selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16)); } internal sealed class AsyncPlanWithTask : AsyncPlanBase { private readonly Func> _selector; internal AsyncPlanWithTask(AsyncPattern expression, Func> selector) : base(expression) { _selector = selector; } protected override ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15, TSource16 arg16) => _selector(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16); } internal abstract class AsyncPlanBase : AsyncPlan { private readonly AsyncPattern _expression; internal AsyncPlanBase(AsyncPattern expression) { _expression = expression; } protected abstract ValueTask EvalAsync(TSource1 arg1, TSource2 arg2, TSource3 arg3, TSource4 arg4, TSource5 arg5, TSource6 arg6, TSource7 arg7, TSource8 arg8, TSource9 arg9, TSource10 arg10, TSource11 arg11, TSource12 arg12, TSource13 arg13, TSource14 arg14, TSource15 arg15, TSource16 arg16); internal override ActiveAsyncPlan Activate(Dictionary externalSubscriptions, IAsyncObserver observer, Func deactivate) { var onError = new Func(observer.OnErrorAsync); var joinObserver1 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source1, onError); var joinObserver2 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source2, onError); var joinObserver3 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source3, onError); var joinObserver4 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source4, onError); var joinObserver5 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source5, onError); var joinObserver6 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source6, onError); var joinObserver7 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source7, onError); var joinObserver8 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source8, onError); var joinObserver9 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source9, onError); var joinObserver10 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source10, onError); var joinObserver11 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source11, onError); var joinObserver12 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source12, onError); var joinObserver13 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source13, onError); var joinObserver14 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source14, onError); var joinObserver15 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source15, onError); var joinObserver16 = AsyncPlan.CreateObserver(externalSubscriptions, _expression.Source16, onError); var activePlan = default(ActiveAsyncPlan); activePlan = new ActiveAsyncPlan( joinObserver1, joinObserver2, joinObserver3, joinObserver4, joinObserver5, joinObserver6, joinObserver7, joinObserver8, joinObserver9, joinObserver10, joinObserver11, joinObserver12, joinObserver13, joinObserver14, joinObserver15, joinObserver16, async (arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16) => { var res = default(TResult); try { res = await EvalAsync(arg1, arg2, arg3, arg4, arg5, arg6, arg7, arg8, arg9, arg10, arg11, arg12, arg13, arg14, arg15, arg16).ConfigureAwait(false); } catch (Exception ex) { await observer.OnErrorAsync(ex).ConfigureAwait(false); return; } await observer.OnNextAsync(res).ConfigureAwait(false); }, async () => { await joinObserver1.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver2.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver3.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver4.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver5.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver6.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver7.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver8.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver9.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver10.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver11.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver12.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver13.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver14.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver15.RemoveActivePlan(activePlan).ConfigureAwait(false); await joinObserver16.RemoveActivePlan(activePlan).ConfigureAwait(false); await deactivate(activePlan).ConfigureAwait(false); } ); joinObserver1.AddActivePlan(activePlan); joinObserver2.AddActivePlan(activePlan); joinObserver3.AddActivePlan(activePlan); joinObserver4.AddActivePlan(activePlan); joinObserver5.AddActivePlan(activePlan); joinObserver6.AddActivePlan(activePlan); joinObserver7.AddActivePlan(activePlan); joinObserver8.AddActivePlan(activePlan); joinObserver9.AddActivePlan(activePlan); joinObserver10.AddActivePlan(activePlan); joinObserver11.AddActivePlan(activePlan); joinObserver12.AddActivePlan(activePlan); joinObserver13.AddActivePlan(activePlan); joinObserver14.AddActivePlan(activePlan); joinObserver15.AddActivePlan(activePlan); joinObserver16.AddActivePlan(activePlan); return activePlan; } } }