Zip.Generated.tt 11 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Collections.Generic;
  11. using System.Reactive.Disposables;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. namespace System.Reactive.Linq
  15. {
  16. // TODO: Add overloads with tuples.
  17. partial class AsyncObservable
  18. {
  19. <#
  20. for (var i = 2; i <= 15; i++)
  21. {
  22. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  23. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  24. var args = string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObservable<T" + j + "> source" + j));
  25. var obs = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "observer" + j)) + ")";
  26. var tuple = "(" + genArgs + ")";
  27. #>
  28. public static IAsyncObservable<<#=tuple#>> Zip<<#=genArgs#>>(this <#=args#>)
  29. {
  30. <#
  31. for (var j = 1; j <= i; j++)
  32. {
  33. #>
  34. if (source<#=j#> == null)
  35. throw new ArgumentNullException(nameof(source<#=j#>));
  36. <#
  37. }
  38. #>
  39. return Create<<#=tuple#>>(async observer =>
  40. {
  41. var d = new CompositeAsyncDisposable();
  42. var <#=obs#> = AsyncObserver.Zip(observer);
  43. <#
  44. for (var j = 1; j <= i; j++)
  45. {
  46. #>
  47. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  48. <#
  49. }
  50. #>
  51. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  52. return d;
  53. });
  54. }
  55. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, TResult> selector)
  56. {
  57. <#
  58. for (var j = 1; j <= i; j++)
  59. {
  60. #>
  61. if (source<#=j#> == null)
  62. throw new ArgumentNullException(nameof(source<#=j#>));
  63. <#
  64. }
  65. #>
  66. if (selector == null)
  67. throw new ArgumentNullException(nameof(selector));
  68. return Create<TResult>(async observer =>
  69. {
  70. var d = new CompositeAsyncDisposable();
  71. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  72. <#
  73. for (var j = 1; j <= i; j++)
  74. {
  75. #>
  76. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  77. <#
  78. }
  79. #>
  80. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  81. return d;
  82. });
  83. }
  84. public static IAsyncObservable<TResult> Zip<<#=genPars#>>(this <#=args#>, Func<<#=genArgs#>, Task<TResult>> selector)
  85. {
  86. <#
  87. for (var j = 1; j <= i; j++)
  88. {
  89. #>
  90. if (source<#=j#> == null)
  91. throw new ArgumentNullException(nameof(source<#=j#>));
  92. <#
  93. }
  94. #>
  95. if (selector == null)
  96. throw new ArgumentNullException(nameof(selector));
  97. return Create<TResult>(async observer =>
  98. {
  99. var d = new CompositeAsyncDisposable();
  100. var <#=obs#> = AsyncObserver.Zip(observer, selector);
  101. <#
  102. for (var j = 1; j <= i; j++)
  103. {
  104. #>
  105. var sub<#=j#> = source<#=j#>.SubscribeSafeAsync(observer<#=j#>).ContinueWith(disposable => d.AddAsync(disposable.Result)).Unwrap();
  106. <#
  107. }
  108. #>
  109. await Task.WhenAll(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "sub" + j))#>).ConfigureAwait(false);
  110. return d;
  111. });
  112. }
  113. <#
  114. }
  115. #>
  116. }
  117. partial class AsyncObserver
  118. {
  119. <#
  120. for (var i = 2; i <= 15; i++)
  121. {
  122. var res = "(" + string.Join(", ", Enumerable.Range(1, i).Select(j => "IAsyncObserver<T" + j + ">")) + ")";
  123. var genPars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  124. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  125. var tuple = "(" + genArgs + ")";
  126. var all = string.Join(" && ", Enumerable.Range(1, i).Select(j => "values" + j + ".Count > 0"));
  127. var vals = string.Join(", ", Enumerable.Range(1, i).Select(j => "values" + j + ".Dequeue()"));
  128. #>
  129. public static <#=res#> Zip<<#=genArgs#>>(IAsyncObserver<<#=tuple#>> observer)
  130. {
  131. if (observer == null)
  132. throw new ArgumentNullException(nameof(observer));
  133. var gate = new AsyncLock();
  134. <#
  135. for (var j = 1; j <= i; j++)
  136. {
  137. #>
  138. var values<#=j#> = new Queue<T<#=j#>>();
  139. <#
  140. }
  141. #>
  142. var isDone = new bool[<#=i#>];
  143. IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
  144. Create<T>(
  145. async x =>
  146. {
  147. using (await gate.LockAsync().ConfigureAwait(false))
  148. {
  149. queue.Enqueue(x);
  150. if (<#=all#>)
  151. {
  152. await observer.OnNextAsync((<#=vals#>)).ConfigureAwait(false);
  153. }
  154. else
  155. {
  156. var allDone = true;
  157. for (var i = 0; i < <#=i#>; i++)
  158. {
  159. if (i != index && !isDone[i])
  160. {
  161. allDone = false;
  162. break;
  163. }
  164. }
  165. if (allDone)
  166. {
  167. await observer.OnCompletedAsync().ConfigureAwait(false);
  168. }
  169. }
  170. }
  171. },
  172. async ex =>
  173. {
  174. using (await gate.LockAsync().ConfigureAwait(false))
  175. {
  176. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  177. }
  178. },
  179. async () =>
  180. {
  181. using (await gate.LockAsync().ConfigureAwait(false))
  182. {
  183. isDone[index] = true;
  184. var allDone = true;
  185. for (var i = 0; i < <#=i#>; i++)
  186. {
  187. if (!isDone[i])
  188. {
  189. allDone = false;
  190. break;
  191. }
  192. }
  193. if (allDone)
  194. {
  195. await observer.OnCompletedAsync().ConfigureAwait(false);
  196. }
  197. }
  198. }
  199. );
  200. return
  201. (
  202. <#
  203. for (var j = 1; j <= i; j++)
  204. {
  205. #>
  206. CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
  207. <#
  208. }
  209. #>
  210. );
  211. }
  212. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, TResult> selector)
  213. {
  214. if (observer == null)
  215. throw new ArgumentNullException(nameof(observer));
  216. if (selector == null)
  217. throw new ArgumentNullException(nameof(selector));
  218. return Zip<<#=genPars#>>(observer, (<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>) => Task.FromResult(selector(<#=string.Join(", ", Enumerable.Range(1, i).Select(j => "x" + j))#>)));
  219. }
  220. public static <#=res#> Zip<<#=genPars#>>(IAsyncObserver<TResult> observer, Func<<#=genArgs#>, Task<TResult>> selector)
  221. {
  222. if (observer == null)
  223. throw new ArgumentNullException(nameof(observer));
  224. if (selector == null)
  225. throw new ArgumentNullException(nameof(selector));
  226. var gate = new AsyncLock();
  227. <#
  228. for (var j = 1; j <= i; j++)
  229. {
  230. #>
  231. var values<#=j#> = new Queue<T<#=j#>>();
  232. <#
  233. }
  234. #>
  235. var isDone = new bool[<#=i#>];
  236. IAsyncObserver<T> CreateObserver<T>(int index, Queue<T> queue) =>
  237. Create<T>(
  238. async x =>
  239. {
  240. using (await gate.LockAsync().ConfigureAwait(false))
  241. {
  242. queue.Enqueue(x);
  243. if (<#=all#>)
  244. {
  245. TResult res;
  246. try
  247. {
  248. res = await selector(<#=vals#>).ConfigureAwait(false);
  249. }
  250. catch (Exception ex)
  251. {
  252. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  253. return;
  254. }
  255. await observer.OnNextAsync(res).ConfigureAwait(false);
  256. }
  257. else
  258. {
  259. var allDone = true;
  260. for (var i = 0; i < <#=i#>; i++)
  261. {
  262. if (i != index && !isDone[i])
  263. {
  264. allDone = false;
  265. break;
  266. }
  267. }
  268. if (allDone)
  269. {
  270. await observer.OnCompletedAsync().ConfigureAwait(false);
  271. }
  272. }
  273. }
  274. },
  275. async ex =>
  276. {
  277. using (await gate.LockAsync().ConfigureAwait(false))
  278. {
  279. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  280. }
  281. },
  282. async () =>
  283. {
  284. using (await gate.LockAsync().ConfigureAwait(false))
  285. {
  286. isDone[index] = true;
  287. var allDone = true;
  288. for (var i = 0; i < <#=i#>; i++)
  289. {
  290. if (!isDone[i])
  291. {
  292. allDone = false;
  293. break;
  294. }
  295. }
  296. if (allDone)
  297. {
  298. await observer.OnCompletedAsync().ConfigureAwait(false);
  299. }
  300. }
  301. }
  302. );
  303. return
  304. (
  305. <#
  306. for (var j = 1; j <= i; j++)
  307. {
  308. #>
  309. CreateObserver<T<#=j#>>(<#=j#>, values<#=j#>)<#=(j < i ? "," : "")#>
  310. <#
  311. }
  312. #>
  313. );
  314. }
  315. <#
  316. }
  317. #>
  318. }
  319. }