ToAsync.Generated.tt 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. <#@ template debug="false" hostspecific="false" language="C#" #>
  5. <#@ assembly name="System.Core" #>
  6. <#@ import namespace="System.Linq" #>
  7. <#@ import namespace="System.Text" #>
  8. <#@ import namespace="System.Collections.Generic" #>
  9. <#@ output extension=".cs" #>
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Subjects;
  12. using System.Threading.Tasks;
  13. namespace System.Reactive.Linq
  14. {
  15. // REVIEW: Consider if these are worth retaining in the async space.
  16. partial class AsyncObservable
  17. {
  18. <#
  19. for (var i = 0; i <= 16; i++)
  20. {
  21. var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
  22. var ret = "Func<" + string.Join(", ", args) + ">";
  23. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  24. var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
  25. var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
  26. var invokeArgs = pars;
  27. if (i > 0)
  28. {
  29. invokeArgs = ", " + invokeArgs;
  30. }
  31. #>
  32. public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function)
  33. {
  34. if (function == null)
  35. throw new ArgumentNullException(nameof(function));
  36. return ToAsync(function, TaskPoolAsyncScheduler.Default);
  37. }
  38. public static <#=ret#> ToAsync<<#=genArgs#>>(<#=type#> function, IAsyncScheduler scheduler)
  39. {
  40. if (function == null)
  41. throw new ArgumentNullException(nameof(function));
  42. if (scheduler == null)
  43. throw new ArgumentNullException(nameof(scheduler));
  44. return (<#=pars#>) =>
  45. {
  46. var subject = new SequentialAsyncAsyncSubject<TResult>();
  47. AsyncObserver.ToAsync(subject, function<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
  48. return subject.AsAsyncObservable();
  49. };
  50. }
  51. <#
  52. }
  53. #>
  54. <#
  55. for (var i = 0; i <= 16; i++)
  56. {
  57. var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
  58. var ret = "Func<" + string.Join(", ", args) + ">";
  59. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  60. var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
  61. var type = "Action";
  62. if (genArgs != "")
  63. {
  64. genArgs = "<" + genArgs + ">";
  65. type += genArgs;
  66. }
  67. var invokeArgs = pars;
  68. if (i > 0)
  69. {
  70. invokeArgs = ", " + invokeArgs;
  71. }
  72. #>
  73. public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action)
  74. {
  75. if (action == null)
  76. throw new ArgumentNullException(nameof(action));
  77. return ToAsync(action, TaskPoolAsyncScheduler.Default);
  78. }
  79. public static <#=ret#> ToAsync<#=genArgs#>(<#=type#> action, IAsyncScheduler scheduler)
  80. {
  81. if (action == null)
  82. throw new ArgumentNullException(nameof(action));
  83. if (scheduler == null)
  84. throw new ArgumentNullException(nameof(scheduler));
  85. return (<#=pars#>) =>
  86. {
  87. var subject = new SequentialAsyncAsyncSubject<Unit>();
  88. AsyncObserver.ToAsync(subject, action<#=invokeArgs#>); // NB: We don't do anything with the result of scheduling the action; it can't be cancelled.
  89. return subject.AsAsyncObservable();
  90. };
  91. }
  92. <#
  93. }
  94. #>
  95. }
  96. partial class AsyncObserver
  97. {
  98. <#
  99. for (var i = 0; i <= 16; i++)
  100. {
  101. var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<TResult>" });
  102. var ret = "Func<" + string.Join(", ", args) + ">";
  103. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" }));
  104. var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
  105. var type = "Func<" + string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "TResult" })) + ">";
  106. var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
  107. var invokeArgs = pars;
  108. if (i > 0)
  109. {
  110. invokePars = ", " + invokePars;
  111. invokeArgs += ", ";
  112. }
  113. #>
  114. public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>)
  115. {
  116. if (observer == null)
  117. throw new ArgumentNullException(nameof(observer));
  118. if (function == null)
  119. throw new ArgumentNullException(nameof(function));
  120. return ToAsync(observer, function, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
  121. }
  122. public static Task<IAsyncDisposable> ToAsync<<#=genArgs#>>(IAsyncObserver<TResult> observer, <#=type#> function<#=invokePars#>, IAsyncScheduler scheduler)
  123. {
  124. if (observer == null)
  125. throw new ArgumentNullException(nameof(observer));
  126. if (function == null)
  127. throw new ArgumentNullException(nameof(function));
  128. if (scheduler == null)
  129. throw new ArgumentNullException(nameof(scheduler));
  130. return scheduler.ScheduleAsync(async ct =>
  131. {
  132. TResult res;
  133. try
  134. {
  135. res = function(<#=pars#>);
  136. }
  137. catch (Exception ex)
  138. {
  139. await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
  140. return;
  141. }
  142. await observer.OnNextAsync(res).RendezVous(scheduler, ct);
  143. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  144. });
  145. }
  146. <#
  147. }
  148. #>
  149. <#
  150. for (var i = 0; i <= 16; i++)
  151. {
  152. var args = Enumerable.Range(1, i).Select(j => "T" + j).Concat(new[] { "IAsyncObservable<Unit>" });
  153. var ret = "Func<" + string.Join(", ", args) + ">";
  154. var genArgs = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j));
  155. var pars = string.Join(", ", Enumerable.Range(1, i).Select(j => "arg" + j));
  156. var type = "Action";
  157. if (genArgs != "")
  158. {
  159. genArgs = "<" + genArgs + ">";
  160. type += genArgs;
  161. }
  162. var invokePars = string.Join(", ", Enumerable.Range(1, i).Select(j => "T" + j + " arg" + j));
  163. var invokeArgs = pars;
  164. if (i > 0)
  165. {
  166. invokePars = ", " + invokePars;
  167. invokeArgs += ", ";
  168. }
  169. #>
  170. public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>)
  171. {
  172. if (observer == null)
  173. throw new ArgumentNullException(nameof(observer));
  174. if (action == null)
  175. throw new ArgumentNullException(nameof(action));
  176. return ToAsync(observer, action, <#=invokeArgs#>TaskPoolAsyncScheduler.Default);
  177. }
  178. public static Task<IAsyncDisposable> ToAsync<#=genArgs#>(IAsyncObserver<Unit> observer, <#=type#> action<#=invokePars#>, IAsyncScheduler scheduler)
  179. {
  180. if (observer == null)
  181. throw new ArgumentNullException(nameof(observer));
  182. if (action == null)
  183. throw new ArgumentNullException(nameof(action));
  184. if (scheduler == null)
  185. throw new ArgumentNullException(nameof(scheduler));
  186. return scheduler.ScheduleAsync(async ct =>
  187. {
  188. try
  189. {
  190. action(<#=pars#>);
  191. }
  192. catch (Exception ex)
  193. {
  194. await observer.OnErrorAsync(ex).RendezVous(scheduler, ct);
  195. return;
  196. }
  197. await observer.OnNextAsync(Unit.Default).RendezVous(scheduler, ct);
  198. await observer.OnCompletedAsync().RendezVous(scheduler, ct);
  199. });
  200. }
  201. <#
  202. }
  203. #>
  204. }
  205. }