CombineLatest.Generated.tt 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Reactive.Disposables;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. namespace System.Reactive.Linq
  14. {
  15. partial class AsyncObservable
  16. {
  17. <#
  18. for (var i = 2; i <= 15; i++)
  19. {
  20. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  21. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  22. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
  23. var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
  24. #>
  25. public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
  26. {
  27. <#
  28. for (var j = 1; j <= i; j++)
  29. {
  30. #>
  31. if (source<#=j#> == null)
  32. throw new ArgumentNullException(nameof(source<#=j#>));
  33. <#
  34. }
  35. #>
  36. if (selector == null)
  37. throw new ArgumentNullException(nameof(selector));
  38. return Create<TResult>(async observer =>
  39. {
  40. var d = new CompositeAsyncDisposable();
  41. var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
  42. <#
  43. for (var j = 1; j <= i; j++)
  44. {
  45. #>
  46. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  47. <#
  48. }
  49. #>
  50. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  51. return d;
  52. });
  53. }
  54. public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, Task<TResult>> selector)
  55. {
  56. <#
  57. for (var j = 1; j <= i; j++)
  58. {
  59. #>
  60. if (source<#=j#> == null)
  61. throw new ArgumentNullException(nameof(source<#=j#>));
  62. <#
  63. }
  64. #>
  65. if (selector == null)
  66. throw new ArgumentNullException(nameof(selector));
  67. return Create<TResult>(async observer =>
  68. {
  69. var d = new CompositeAsyncDisposable();
  70. var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
  71. <#
  72. for (var j = 1; j <= i; j++)
  73. {
  74. #>
  75. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  76. <#
  77. }
  78. #>
  79. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  80. return d;
  81. });
  82. }
  83. <#
  84. }
  85. #>
  86. }
  87. partial class AsyncObserver
  88. {
  89. <#
  90. for (var i = 2; i <= 15; i++)
  91. {
  92. var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
  93. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  94. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  95. #>
  96. public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
  97. {
  98. if (observer == null)
  99. throw new ArgumentNullException(nameof(observer));
  100. if (selector == null)
  101. throw new ArgumentNullException(nameof(selector));
  102. return CombineLatest<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => Task.FromResult(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
  103. }
  104. public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, Task<TResult>> selector)
  105. {
  106. if (observer == null)
  107. throw new ArgumentNullException(nameof(observer));
  108. if (selector == null)
  109. throw new ArgumentNullException(nameof(selector));
  110. bool allHasValue = false;
  111. <#
  112. for (var j = 1; j <= i; j++)
  113. {
  114. #>
  115. bool hasValue<#=j#> = false;
  116. bool isDone<#=j#> = false;
  117. T<#=j#> latestValue<#=j#> = default(T<#=j#>);
  118. <#
  119. }
  120. #>
  121. var gate = new AsyncLock();
  122. return
  123. (
  124. <#
  125. for (var j = 1; j <= i; j++)
  126. {
  127. #>
  128. Create<T<#=j#>>(
  129. async x =>
  130. {
  131. using (await gate.LockAsync().ConfigureAwait(false))
  132. {
  133. if (!hasValue<#=j#>)
  134. {
  135. hasValue<#=j#> = true;
  136. allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
  137. }
  138. latestValue<#=j#> = x;
  139. if (allHasValue)
  140. {
  141. TResult res;
  142. try
  143. {
  144. res = await selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>).ConfigureAwait(false);
  145. }
  146. catch (Exception ex)
  147. {
  148. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  149. return;
  150. }
  151. await observer.OnNextAsync(res).ConfigureAwait(false);
  152. }
  153. else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
  154. {
  155. await observer.OnCompletedAsync().ConfigureAwait(false);
  156. }
  157. }
  158. },
  159. async ex =>
  160. {
  161. using (await gate.LockAsync().ConfigureAwait(false))
  162. {
  163. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  164. }
  165. },
  166. async () =>
  167. {
  168. using (await gate.LockAsync().ConfigureAwait(false))
  169. {
  170. isDone<#=j#> = true;
  171. if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
  172. {
  173. await observer.OnCompletedAsync().ConfigureAwait(false);
  174. }
  175. }
  176. }
  177. )<#=(j < i ? "," : "")#>
  178. <#
  179. }
  180. #>
  181. );
  182. }
  183. <#
  184. }
  185. #>
  186. }
  187. }