Zip.Generated.tt 11 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Collections.Generic;
  11. using System.Reactive.Disposables;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. namespace System.Reactive.Linq
  15. {
  16. partial class AsyncObservable
  17. {
  18. <#
  19. for (var i = 2; i <= 15; i++)
  20. {
  21. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  22. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  23. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
  24. var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
  25. var tuple = "(" + genArgs + ")";
  26. #>
  27. public static IAsyncObservable<<#=tuple#>> Zip<<#=genArgs#>>(this <#=args#>)
  28. {
  29. <#
  30. for (var j = 1; j <= i; j++)
  31. {
  32. #>
  33. if (source<#=j#> == null)
  34. throw new ArgumentNullException(nameof(source<#=j#>));
  35. <#
  36. }
  37. #>
  38. return Create<<#=tuple#>>(async observer =>
  39. {
  40. var d = new CompositeAsyncDisposable();
  41. var <#=obs#> = AsyncObserver.Zip(observer);
  42. <#
  43. for (var j = 1; j <= i; j++)
  44. {
  45. #>
  46. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  47. <#
  48. }
  49. #>
  50. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  51. return d;
  52. });
  53. }
  54. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
  55. {
  56. <#
  57. for (var j = 1; j <= i; j++)
  58. {
  59. #>
  60. if (source<#=j#> == null)
  61. throw new ArgumentNullException(nameof(source<#=j#>));
  62. <#
  63. }
  64. #>
  65. if (selector == null)
  66. throw new ArgumentNullException(nameof(selector));
  67. return Create<TResult>(async observer =>
  68. {
  69. var d = new CompositeAsyncDisposable();
  70. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  71. <#
  72. for (var j = 1; j <= i; j++)
  73. {
  74. #>
  75. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  76. <#
  77. }
  78. #>
  79. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  80. return d;
  81. });
  82. }
  83. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, Task<TResult>> selector)
  84. {
  85. <#
  86. for (var j = 1; j <= i; j++)
  87. {
  88. #>
  89. if (source<#=j#> == null)
  90. throw new ArgumentNullException(nameof(source<#=j#>));
  91. <#
  92. }
  93. #>
  94. if (selector == null)
  95. throw new ArgumentNullException(nameof(selector));
  96. return Create<TResult>(async observer =>
  97. {
  98. var d = new CompositeAsyncDisposable();
  99. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  100. <#
  101. for (var j = 1; j <= i; j++)
  102. {
  103. #>
  104. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  105. <#
  106. }
  107. #>
  108. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  109. return d;
  110. });
  111. }
  112. <#
  113. }
  114. #>
  115. }
  116. partial class AsyncObserver
  117. {
  118. <#
  119. for (var i = 2; i <= 15; i++)
  120. {
  121. var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
  122. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  123. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  124. var tuple = "(" + genArgs + ")";
  125. var all = string.Join(" && ", Enumerable.Range(1, i).Select(j => "values" + j + ".Count > 0"));
  126. var vals = string.Join(", ", Enumerable.Range(1, i).Select(j => "values" + j + ".Dequeue()"));
  127. #>
  128. public static <#=res#> Zip<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
  129. {
  130. if (observer == null)
  131. throw new ArgumentNullException(nameof(observer));
  132. var gate = new AsyncLock();
  133. <#
  134. for (var j = 1; j <= i; j++)
  135. {
  136. #>
  137. var values<#=j#> = new Queue<T<#=j#>>();
  138. <#
  139. }
  140. #>
  141. var isDone = new bool[<#=i#>];
  142. IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
  143. Create<T>(
  144. async x =>
  145. {
  146. using (await gate.LockAsync().ConfigureAwait(false))
  147. {
  148. queue.Enqueue(x);
  149. if (<#=all#>)
  150. {
  151. await observer.OnNextAsync((<#=vals#>)).ConfigureAwait(false);
  152. }
  153. else
  154. {
  155. var allDone = true;
  156. for (var i = 0; i < <#=i#>; i++)
  157. {
  158. if (i != index && !isDone[i])
  159. {
  160. allDone = false;
  161. break;
  162. }
  163. }
  164. if (allDone)
  165. {
  166. await observer.OnCompletedAsync().ConfigureAwait(false);
  167. }
  168. }
  169. }
  170. },
  171. async ex =>
  172. {
  173. using (await gate.LockAsync().ConfigureAwait(false))
  174. {
  175. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  176. }
  177. },
  178. async () =>
  179. {
  180. using (await gate.LockAsync().ConfigureAwait(false))
  181. {
  182. isDone[index] = true;
  183. var allDone = true;
  184. for (var i = 0; i < <#=i#>; i++)
  185. {
  186. if (!isDone[i])
  187. {
  188. allDone = false;
  189. break;
  190. }
  191. }
  192. if (allDone)
  193. {
  194. await observer.OnCompletedAsync().ConfigureAwait(false);
  195. }
  196. }
  197. }
  198. );
  199. return
  200. (
  201. <#
  202. for (var j = 1; j <= i; j++)
  203. {
  204. #>
  205. CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
  206. <#
  207. }
  208. #>
  209. );
  210. }
  211. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
  212. {
  213. if (observer == null)
  214. throw new ArgumentNullException(nameof(observer));
  215. if (selector == null)
  216. throw new ArgumentNullException(nameof(selector));
  217. return Zip<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => Task.FromResult(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
  218. }
  219. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, Task<TResult>> selector)
  220. {
  221. if (observer == null)
  222. throw new ArgumentNullException(nameof(observer));
  223. if (selector == null)
  224. throw new ArgumentNullException(nameof(selector));
  225. var gate = new AsyncLock();
  226. <#
  227. for (var j = 1; j <= i; j++)
  228. {
  229. #>
  230. var values<#=j#> = new Queue<T<#=j#>>();
  231. <#
  232. }
  233. #>
  234. var isDone = new bool[<#=i#>];
  235. IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
  236. Create<T>(
  237. async x =>
  238. {
  239. using (await gate.LockAsync().ConfigureAwait(false))
  240. {
  241. queue.Enqueue(x);
  242. if (<#=all#>)
  243. {
  244. TResult res;
  245. try
  246. {
  247. res = await selector(<#=vals#>).ConfigureAwait(false);
  248. }
  249. catch (Exception ex)
  250. {
  251. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  252. return;
  253. }
  254. await observer.OnNextAsync(res).ConfigureAwait(false);
  255. }
  256. else
  257. {
  258. var allDone = true;
  259. for (var i = 0; i < <#=i#>; i++)
  260. {
  261. if (i != index && !isDone[i])
  262. {
  263. allDone = false;
  264. break;
  265. }
  266. }
  267. if (allDone)
  268. {
  269. await observer.OnCompletedAsync().ConfigureAwait(false);
  270. }
  271. }
  272. }
  273. },
  274. async ex =>
  275. {
  276. using (await gate.LockAsync().ConfigureAwait(false))
  277. {
  278. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  279. }
  280. },
  281. async () =>
  282. {
  283. using (await gate.LockAsync().ConfigureAwait(false))
  284. {
  285. isDone[index] = true;
  286. var allDone = true;
  287. for (var i = 0; i < <#=i#>; i++)
  288. {
  289. if (!isDone[i])
  290. {
  291. allDone = false;
  292. break;
  293. }
  294. }
  295. if (allDone)
  296. {
  297. await observer.OnCompletedAsync().ConfigureAwait(false);
  298. }
  299. }
  300. }
  301. );
  302. return
  303. (
  304. <#
  305. for (var j = 1; j <= i; j++)
  306. {
  307. #>
  308. CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
  309. <#
  310. }
  311. #>
  312. );
  313. }
  314. <#
  315. }
  316. #>
  317. }
  318. }