Zip.Generated.tt 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Collections.Generic;
  11. using System.Reactive.Disposables;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. namespace System.Reactive.Linq
  15. {
  16. partial class AsyncObservable
  17. {
  18. <#
  19. for (var i = 2; i <= 15; i++)
  20. {
  21. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  22. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  23. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
  24. var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
  25. #>
  26. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
  27. {
  28. <#
  29. for (var j = 1; j <= i; j++)
  30. {
  31. #>
  32. if (source<#=j#> == null)
  33. throw new ArgumentNullException(nameof(source<#=j#>));
  34. <#
  35. }
  36. #>
  37. if (selector == null)
  38. throw new ArgumentNullException(nameof(selector));
  39. return Create<TResult>(async observer =>
  40. {
  41. var d = new CompositeAsyncDisposable();
  42. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  43. <#
  44. for (var j = 1; j <= i; j++)
  45. {
  46. #>
  47. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  48. <#
  49. }
  50. #>
  51. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  52. return d;
  53. });
  54. }
  55. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, Task<TResult>> selector)
  56. {
  57. <#
  58. for (var j = 1; j <= i; j++)
  59. {
  60. #>
  61. if (source<#=j#> == null)
  62. throw new ArgumentNullException(nameof(source<#=j#>));
  63. <#
  64. }
  65. #>
  66. if (selector == null)
  67. throw new ArgumentNullException(nameof(selector));
  68. return Create<TResult>(async observer =>
  69. {
  70. var d = new CompositeAsyncDisposable();
  71. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  72. <#
  73. for (var j = 1; j <= i; j++)
  74. {
  75. #>
  76. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  77. <#
  78. }
  79. #>
  80. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  81. return d;
  82. });
  83. }
  84. <#
  85. }
  86. #>
  87. }
  88. partial class AsyncObserver
  89. {
  90. <#
  91. for (var i = 2; i <= 15; i++)
  92. {
  93. var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
  94. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  95. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  96. #>
  97. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
  98. {
  99. if (observer == null)
  100. throw new ArgumentNullException(nameof(observer));
  101. if (selector == null)
  102. throw new ArgumentNullException(nameof(selector));
  103. return Zip<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => Task.FromResult(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
  104. }
  105. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, Task<TResult>> selector)
  106. {
  107. if (observer == null)
  108. throw new ArgumentNullException(nameof(observer));
  109. if (selector == null)
  110. throw new ArgumentNullException(nameof(selector));
  111. var gate = new AsyncLock();
  112. <#
  113. for (var j = 1; j <= i; j++)
  114. {
  115. #>
  116. var values<#=j#> = new Queue<T<#=j#>>();
  117. <#
  118. }
  119. #>
  120. var isDone = new bool[<#=i#>];
  121. <#
  122. var all = string.Join(" && ", Enumerable.Range(1, i).Select(j => "values" + j + ".Count > 0"));
  123. var vals = string.Join(", ", Enumerable.Range(1, i).Select(j => "values" + j + ".Dequeue()"));
  124. var done = "";
  125. #>
  126. IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
  127. Create<T>(
  128. async x =>
  129. {
  130. using (await gate.LockAsync().ConfigureAwait(false))
  131. {
  132. queue.Enqueue(x);
  133. if (<#=all#>)
  134. {
  135. TResult res;
  136. try
  137. {
  138. res = await selector(<#=vals#>).ConfigureAwait(false);
  139. }
  140. catch (Exception ex)
  141. {
  142. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  143. return;
  144. }
  145. await observer.OnNextAsync(res).ConfigureAwait(false);
  146. }
  147. else
  148. {
  149. var allDone = true;
  150. for (var i = 0; i < <#=i#>; i++)
  151. {
  152. if (i != index && !isDone[i])
  153. {
  154. allDone = false;
  155. break;
  156. }
  157. }
  158. if (allDone)
  159. {
  160. await observer.OnCompletedAsync().ConfigureAwait(false);
  161. }
  162. }
  163. }
  164. },
  165. async ex =>
  166. {
  167. using (await gate.LockAsync().ConfigureAwait(false))
  168. {
  169. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  170. }
  171. },
  172. async () =>
  173. {
  174. using (await gate.LockAsync().ConfigureAwait(false))
  175. {
  176. isDone[index] = true;
  177. var allDone = true;
  178. for (var i = 0; i < <#=i#>; i++)
  179. {
  180. if (!isDone[i])
  181. {
  182. allDone = false;
  183. break;
  184. }
  185. }
  186. if (allDone)
  187. {
  188. await observer.OnCompletedAsync().ConfigureAwait(false);
  189. }
  190. }
  191. }
  192. );
  193. return
  194. (
  195. <#
  196. for (var j = 1; j <= i; j++)
  197. {
  198. #>
  199. CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
  200. <#
  201. }
  202. #>
  203. );
  204. }
  205. <#
  206. }
  207. #>
  208. }
  209. }