AsyncPlan.Generated.tt 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Collections.Generic;
  11. using System.Threading.Tasks;
  12. namespace System.Reactive.Joins
  13. {
  14. <#
  15. for (var i = 1; i <= 16; i++)
  16. {
  17. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j));
  18. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IObservable<TSource" + j + "> source" + j));
  19. var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
  20. var evalPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "TSource" + j + " arg" + j));
  21. #>
  22. internal sealed class AsyncPlan<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>
  23. {
  24. private readonly Func<<#=genArgs#>, TResult> _selector;
  25. internal AsyncPlan(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, TResult> selector)
  26. : base(expression)
  27. {
  28. _selector = selector;
  29. }
  30. protected override ValueTask<TResult> EvalAsync(<#=evalPars#>) => new ValueTask<TResult>(_selector(<#=pars#>));
  31. }
  32. internal sealed class AsyncPlanWithTask<<#=genArgs#>, TResult> : AsyncPlanBase<<#=genArgs#>, TResult>
  33. {
  34. private readonly Func<<#=genArgs#>, ValueTask<TResult>> _selector;
  35. internal AsyncPlanWithTask(AsyncPattern<<#=genArgs#>> expression, Func<<#=genArgs#>, ValueTask<TResult>> selector)
  36. : base(expression)
  37. {
  38. _selector = selector;
  39. }
  40. protected override ValueTask<TResult> EvalAsync(<#=evalPars#>) => _selector(<#=pars#>);
  41. }
  42. internal abstract class AsyncPlanBase<<#=genArgs#>, TResult> : AsyncPlan<TResult>
  43. {
  44. private readonly AsyncPattern<<#=genArgs#>> _expression;
  45. internal AsyncPlanBase(AsyncPattern<<#=genArgs#>> expression)
  46. {
  47. _expression = expression;
  48. }
  49. protected abstract ValueTask<TResult> EvalAsync(<#=evalPars#>);
  50. internal override ActiveAsyncPlan Activate(Dictionary<object, IAsyncJoinObserver> externalSubscriptions, IAsyncObserver<TResult> observer, Func<ActiveAsyncPlan, ValueTask> deactivate)
  51. {
  52. var onError = new Func<Exception, ValueTask>(observer.OnErrorAsync);
  53. <#
  54. for (var j = 1; j <= i; j++)
  55. {
  56. #>
  57. var joinObserver<#=j#> = AsyncPlan<TResult>.CreateObserver<TSource<#=j#>>(externalSubscriptions, _expression.Source<#=j#>, onError);
  58. <#
  59. }
  60. #>
  61. var activePlan = default(ActiveAsyncPlan<<#=genArgs#>>);
  62. activePlan = new ActiveAsyncPlan<<#=genArgs#>>(
  63. <#
  64. for (var j = 1; j <= i; j++)
  65. {
  66. #>
  67. joinObserver<#=j#>,
  68. <#
  69. }
  70. #>
  71. async (<#=pars#>) =>
  72. {
  73. var res = default(TResult);
  74. try
  75. {
  76. res = await EvalAsync(<#=pars#>).ConfigureAwait(false);
  77. }
  78. catch (Exception ex)
  79. {
  80. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  81. return;
  82. }
  83. await observer.OnNextAsync(res).ConfigureAwait(false);
  84. },
  85. async () =>
  86. {
  87. <#
  88. for (var j = 1; j <= i; j++)
  89. {
  90. #>
  91. await joinObserver<#=j#>.RemoveActivePlan(activePlan).ConfigureAwait(false);
  92. <#
  93. }
  94. #>
  95. await deactivate(activePlan).ConfigureAwait(false);
  96. }
  97. );
  98. <#
  99. for (var j = 1; j <= i; j++)
  100. {
  101. #>
  102. joinObserver<#=j#>.AddActivePlan(activePlan);
  103. <#
  104. }
  105. #>
  106. return activePlan;
  107. }
  108. }
  109. <#
  110. }
  111. #>
  112. }