CombineLatest.Generated.tt 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Reactive.Disposables;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. namespace System.Reactive.Linq
  14. {
  15. public partial class AsyncObservable
  16. {
  17. <#
  18. for (var i = 2; i <= 15; i++)
  19. {
  20. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  21. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  22. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
  23. var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
  24. var tuple = "(" + genArgs + ")";
  25. #>
  26. public static IAsyncObservable<<#=tuple#>> CombineLatest<<#=genArgs#>>(this <#=args#>)
  27. {
  28. <#
  29. for (var j = 1; j <= i; j++)
  30. {
  31. #>
  32. if (source<#=j#> == null)
  33. throw new ArgumentNullException(nameof(source<#=j#>));
  34. <#
  35. }
  36. #>
  37. return Create<<#=tuple#>>(async observer =>
  38. {
  39. var d = new CompositeAsyncDisposable();
  40. var <#=obs#> = AsyncObserver.CombineLatest(observer);
  41. <#
  42. for (var j = 1; j <= i; j++)
  43. {
  44. #>
  45. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
  46. <#
  47. }
  48. #>
  49. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  50. return d;
  51. });
  52. }
  53. public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
  54. {
  55. <#
  56. for (var j = 1; j <= i; j++)
  57. {
  58. #>
  59. if (source<#=j#> == null)
  60. throw new ArgumentNullException(nameof(source<#=j#>));
  61. <#
  62. }
  63. #>
  64. if (selector == null)
  65. throw new ArgumentNullException(nameof(selector));
  66. return Create<TResult>(async observer =>
  67. {
  68. var d = new CompositeAsyncDisposable();
  69. var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
  70. <#
  71. for (var j = 1; j <= i; j++)
  72. {
  73. #>
  74. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
  75. <#
  76. }
  77. #>
  78. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  79. return d;
  80. });
  81. }
  82. public static IAsyncObservable<TResult> CombineLatest<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, ValueTask<TResult>> selector)
  83. {
  84. <#
  85. for (var j = 1; j <= i; j++)
  86. {
  87. #>
  88. if (source<#=j#> == null)
  89. throw new ArgumentNullException(nameof(source<#=j#>));
  90. <#
  91. }
  92. #>
  93. if (selector == null)
  94. throw new ArgumentNullException(nameof(selector));
  95. return Create<TResult>(async observer =>
  96. {
  97. var d = new CompositeAsyncDisposable();
  98. var <#=obs#> = AsyncObserver.CombineLatest(observer, selector);
  99. <#
  100. for (var j = 1; j <= i; j++)
  101. {
  102. #>
  103. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).AsTask().ContinueWith(disposable => d.AddAsync(disposable.Result).AsTask()).Unwrap();
  104. <#
  105. }
  106. #>
  107. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  108. return d;
  109. });
  110. }
  111. <#
  112. }
  113. #>
  114. }
  115. public partial class AsyncObserver
  116. {
  117. <#
  118. for (var i = 2; i <= 15; i++)
  119. {
  120. var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
  121. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  122. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  123. var tuple = "(" + genArgs + ")";
  124. #>
  125. public static <#=res#> CombineLatest<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
  126. {
  127. if (observer == null)
  128. throw new ArgumentNullException(nameof(observer));
  129. bool allHasValue = false;
  130. <#
  131. for (var j = 1; j <= i; j++)
  132. {
  133. #>
  134. bool hasValue<#=j#> = false;
  135. bool isDone<#=j#> = false;
  136. T<#=j#> latestValue<#=j#> = default(T<#=j#>);
  137. <#
  138. }
  139. #>
  140. var gate = new AsyncLock();
  141. return
  142. (
  143. <#
  144. for (var j = 1; j <= i; j++)
  145. {
  146. #>
  147. Create<T<#=j#>>(
  148. async x =>
  149. {
  150. using (await gate.LockAsync().ConfigureAwait(false))
  151. {
  152. if (!hasValue<#=j#>)
  153. {
  154. hasValue<#=j#> = true;
  155. allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
  156. }
  157. latestValue<#=j#> = x;
  158. if (allHasValue)
  159. {
  160. await observer.OnNextAsync((<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>)).ConfigureAwait(false);
  161. }
  162. else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
  163. {
  164. await observer.OnCompletedAsync().ConfigureAwait(false);
  165. }
  166. }
  167. },
  168. async ex =>
  169. {
  170. using (await gate.LockAsync().ConfigureAwait(false))
  171. {
  172. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  173. }
  174. },
  175. async () =>
  176. {
  177. using (await gate.LockAsync().ConfigureAwait(false))
  178. {
  179. isDone<#=j#> = true;
  180. if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
  181. {
  182. await observer.OnCompletedAsync().ConfigureAwait(false);
  183. }
  184. }
  185. }
  186. )<#=(j < i ? "," : "")#>
  187. <#
  188. }
  189. #>
  190. );
  191. }
  192. public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
  193. {
  194. if (observer == null)
  195. throw new ArgumentNullException(nameof(observer));
  196. if (selector == null)
  197. throw new ArgumentNullException(nameof(selector));
  198. return CombineLatest<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => new ValueTask<TResult>(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
  199. }
  200. public static <#=res#> CombineLatest<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, ValueTask<TResult>> selector)
  201. {
  202. if (observer == null)
  203. throw new ArgumentNullException(nameof(observer));
  204. if (selector == null)
  205. throw new ArgumentNullException(nameof(selector));
  206. bool allHasValue = false;
  207. <#
  208. for (var j = 1; j <= i; j++)
  209. {
  210. #>
  211. bool hasValue<#=j#> = false;
  212. bool isDone<#=j#> = false;
  213. T<#=j#> latestValue<#=j#> = default(T<#=j#>);
  214. <#
  215. }
  216. #>
  217. var gate = new AsyncLock();
  218. return
  219. (
  220. <#
  221. for (var j = 1; j <= i; j++)
  222. {
  223. #>
  224. Create<T<#=j#>>(
  225. async x =>
  226. {
  227. using (await gate.LockAsync().ConfigureAwait(false))
  228. {
  229. if (!hasValue<#=j#>)
  230. {
  231. hasValue<#=j#> = true;
  232. allHasValue = <#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "hasValue" + k))#>;
  233. }
  234. latestValue<#=j#> = x;
  235. if (allHasValue)
  236. {
  237. TResult res;
  238. try
  239. {
  240. res = await selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(k => "latestValue" + k))#>).ConfigureAwait(false);
  241. }
  242. catch (Exception ex)
  243. {
  244. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  245. return;
  246. }
  247. await observer.OnNextAsync(res).ConfigureAwait(false);
  248. }
  249. else if (<#=string.Join(" && ", Enumerable.Range(1, i).Where(k => k != j).Select(k => "isDone" + k))#>)
  250. {
  251. await observer.OnCompletedAsync().ConfigureAwait(false);
  252. }
  253. }
  254. },
  255. async ex =>
  256. {
  257. using (await gate.LockAsync().ConfigureAwait(false))
  258. {
  259. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  260. }
  261. },
  262. async () =>
  263. {
  264. using (await gate.LockAsync().ConfigureAwait(false))
  265. {
  266. isDone<#=j#> = true;
  267. if (<#=string.Join(" && ", Enumerable.Range(1, i).Select(k => "isDone" + k))#>)
  268. {
  269. await observer.OnCompletedAsync().ConfigureAwait(false);
  270. }
  271. }
  272. }
  273. )<#=(j < i ? "," : "")#>
  274. <#
  275. }
  276. #>
  277. );
  278. }
  279. <#
  280. }
  281. #>
  282. }
  283. }