AsyncInfoObservable.cs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. extern alias SystemReactiveNet;
  6. using SystemReactiveNet::System;
  7. using SystemReactiveNet::System.Reactive;
  8. using SystemReactiveNet::System.Reactive.Linq;
  9. using SystemReactiveNet::System.Reactive.Threading.Tasks;
  10. using System.Runtime.InteropServices.WindowsRuntime;
  11. using System.Threading.Tasks;
  12. using Windows.Foundation;
  13. namespace System.Reactive.Linq
  14. {
  15. /// <summary>
  16. /// Obsolete. The <c>System.Reactive.For.WindowsRuntime</c> NuGet package defines a
  17. /// <c>WindowsRuntimeAsyncInfoObservable</c> class that defines new extension methods to be used in
  18. /// place of these (also in the <c>System.Reactive.Linq</c> namespace).
  19. /// </summary>
  20. /// <remarks>
  21. /// <para>
  22. /// The replacement <c>WindowsRuntimeAsyncInfoObservable</c> class uses different method names.
  23. /// When you migrate to that new class from this obsolete one, you will need to change your code
  24. /// to invoke different method names:
  25. /// </para>
  26. /// <list type="table">
  27. /// <listheader><term>Rx &lt;= 6.0</term><term>Now</term></listheader>
  28. /// <item>
  29. /// <term><c>ToAsyncAction</c></term>
  30. /// <term><c>ToIAsyncAction</c></term>
  31. /// </item>
  32. /// <item>
  33. /// <term><c>ToAsyncActionWithProgress</c></term>
  34. /// <term><c>ToIAsyncActionWithProgress</c></term>
  35. /// </item>
  36. /// <item>
  37. /// <term><c>ToAsyncOperation</c></term>
  38. /// <term><c>ToIAsyncOperation</c></term>
  39. /// </item>
  40. /// <item>
  41. /// <term><c>ToAsyncOperationWithProgress</c></term>
  42. /// <term><c>ToIAsyncOperationWithProgress</c></term>
  43. /// </item>
  44. /// </list>
  45. /// <para>
  46. /// This name change is necessary because of a limitation of the <c>Obsolete</c> attribute: if
  47. /// you want to move an existing method into a different package, and you leave the old one in
  48. /// place (and marked as <c>Obsolete</c>) for a few versions to enable a gradual transition to
  49. /// the new one, you will cause a problem if you keep the method name the same. The problem
  50. /// is that both the old and new extension methods will be in scope simultaneously, so the
  51. /// compiler will complain of ambiguity when you try to use them. In some cases you can
  52. /// mitigate this by defining the new type in a different namespace, but the problem is that
  53. /// these extension methods for <see cref="IObservable{T}"/> are defined in the
  54. /// <c>System.Reactive.Linq</c> namespace. Code often brings that namespace into scope for more
  55. /// than one reason, so we can't just tell developers to replace <c>using
  56. /// System.Reactive.Linq;</c> with some other namespace. While that might fix the ambiguity
  57. /// problem, it's likely to cause a load of new problems instead.
  58. /// </para>
  59. /// <para>
  60. /// The only practical solution for this is for the new methods to have different names than
  61. /// the old ones. (There is a proposal for being able to annotate a method as being for binary
  62. /// compatibility only, but it will be some time before that is available to all projects using
  63. /// Rx.NET.)
  64. /// </para>
  65. /// <para>
  66. /// This type will eventually be removed because all UI-framework-specific functionality is
  67. /// being removed from <c>System.Reactive</c>. This is necessary to fix problems in which
  68. /// <c>System.Reactive</c> causes applications to end up with dependencies on Windows Forms and
  69. /// WPF whether they want them or not. Strictly speaking, the <see cref="IAsyncAction"/>,
  70. /// <see cref="IAsyncActionWithProgress{TProgress}"/>, <see cref="IAsyncOperation{TResult}"/>
  71. /// and <see cref="IAsyncOperationWithProgress{TResult, TProgress}"/> types that this class
  72. /// supports are part of Windows Runtime, and aren't specific to a single UI framework. These
  73. /// types are used routinely in both UWP and WinUI applications, but it's also possible to use
  74. /// them from Windows Forms, WPF, and even console applications. These types are effectively
  75. /// WinRT native way of representing what the TPL's various Task types represent in a purely
  76. /// .NET world. Even so, once support for genuinely UI-framework-specific types (such as
  77. /// WPF's <c>Dispatcher</c>) has been removed from Rx.NET, support for these Windows Runtime
  78. /// types would require us to continue to offer <c>netX.0</c> and <c>netX.0-windows10.0.YYYYY</c>
  79. /// TFMs. The fact that we offer both has caused confusion because it's quite possible to get the
  80. /// former even when running on Windows.
  81. /// </para>
  82. /// </remarks>
  83. [Obsolete("Use the extension methods defined by the System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable class in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  84. [CLSCompliant(false)]
  85. public static class AsyncInfoObservable
  86. {
  87. #region IAsyncAction
  88. /// <summary>
  89. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncAction</c> method in the
  90. /// <c>System.Reactive.For.WindowsRuntime</c> package instead.
  91. /// </summary>
  92. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  93. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  94. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence.</returns>
  95. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  96. [Obsolete("Use the ToIAsyncAction extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  97. public static IAsyncAction ToAsyncAction<TSource>(this IObservable<TSource> source)
  98. {
  99. if (source == null)
  100. {
  101. throw new ArgumentNullException(nameof(source));
  102. }
  103. return AsyncInfo.Run(ct => (Task)source.DefaultIfEmpty().ToTask(ct));
  104. }
  105. #region Progress
  106. /// <summary>
  107. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncActionWithProgress</c>
  108. /// method in the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  109. /// </summary>
  110. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  111. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  112. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  113. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  114. [Obsolete("Use the ToIAsyncActionWithProgress extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  115. public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<TSource>(this IObservable<TSource> source)
  116. {
  117. if (source == null)
  118. {
  119. throw new ArgumentNullException(nameof(source));
  120. }
  121. return AsyncInfo.Run<int>((ct, progress) =>
  122. {
  123. var i = 0;
  124. return source.Do(_ => progress.Report(i++)).DefaultIfEmpty().ToTask(ct);
  125. });
  126. }
  127. /// <summary>
  128. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncActionWithProgress</c>
  129. /// method in the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  130. /// </summary>
  131. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  132. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  133. /// <param name="source">Source sequence to expose as an asynchronous action and to compute a progress sequence that gets reported through the asynchronous action.</param>
  134. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  135. /// <returns>Windows Runtime asynchronous action object representing the completion of the result sequence, reporting progress computed through the progress sequence.</returns>
  136. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="progressSelector"/> is null.</exception>
  137. [Obsolete("Use the ToIAsyncActionWithProgress extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  138. public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<TSource, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  139. {
  140. if (source == null)
  141. {
  142. throw new ArgumentNullException(nameof(source));
  143. }
  144. if (progressSelector == null)
  145. {
  146. throw new ArgumentNullException(nameof(progressSelector));
  147. }
  148. return AsyncInfo.Run<TProgress>((ct, progress) =>
  149. {
  150. return Observable.Create<TSource?>(observer =>
  151. {
  152. var obs = Observer.Synchronize(observer);
  153. var data = source.Publish();
  154. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  155. var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
  156. var connection = data.Connect();
  157. return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
  158. }).ToTask(ct);
  159. });
  160. }
  161. #endregion
  162. #endregion
  163. #region IAsyncOperation<T>
  164. /// <summary>
  165. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToAsyncOperation</c> method in
  166. /// the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  167. /// </summary>
  168. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  169. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  170. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence.</returns>
  171. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  172. [Obsolete("Use the ToIAsyncOperation extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  173. public static IAsyncOperation<TSource> ToAsyncOperation<TSource>(this IObservable<TSource> source)
  174. {
  175. if (source == null)
  176. {
  177. throw new ArgumentNullException(nameof(source));
  178. }
  179. return AsyncInfo.Run(ct => source.ToTask(ct));
  180. }
  181. /// <summary>
  182. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncOperationWithProgress</c>
  183. /// method in the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  184. /// </summary>
  185. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  186. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  187. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  188. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  189. [Obsolete("Use the ToIAsyncOperationWithProgress extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  190. public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<TSource>(this IObservable<TSource> source)
  191. {
  192. if (source == null)
  193. {
  194. throw new ArgumentNullException(nameof(source));
  195. }
  196. return AsyncInfo.Run<TSource, int>((ct, progress) =>
  197. {
  198. var i = 0;
  199. return source.Do(_ => progress.Report(i++)).ToTask(ct);
  200. });
  201. }
  202. #region Progress
  203. /// <summary>
  204. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncOperationWithProgress</c>
  205. /// method in the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  206. /// </summary>
  207. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  208. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  209. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation.</param>
  210. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  211. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting incremental progress for each source sequence element.</returns>
  212. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
  213. [Obsolete("Use the ToIAsyncOperationWithProgress extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  214. public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
  215. {
  216. if (source == null)
  217. {
  218. throw new ArgumentNullException(nameof(source));
  219. }
  220. if (resultSelector == null)
  221. {
  222. throw new ArgumentNullException(nameof(resultSelector));
  223. }
  224. return AsyncInfo.Run<TResult, int>((ct, progress) =>
  225. {
  226. var i = 0;
  227. return resultSelector(source.Do(_ => progress.Report(i++))).ToTask(ct);
  228. });
  229. }
  230. /// <summary>
  231. /// Obsolete. Use the <c>WindowsRuntimeAsyncInfoObservable.ToIAsyncOperationWithProgress</c>
  232. /// method in the <c>System.Reactive.For.WindowsRuntime</c> package instead.
  233. /// </summary>
  234. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  235. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  236. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  237. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation and a progress sequence that gets reported through the asynchronous operation.</param>
  238. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  239. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  240. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting progress computed through the progress sequence.</returns>
  241. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> or <paramref name="progressSelector"/> is null.</exception>
  242. [Obsolete("Use the ToIAsyncOperationWithProgress extension method defined by System.Reactive.Linq.WindowsRuntimeAsyncInfoObservable in the System.Reactive.For.WindowsRuntime package instead", error: false)]
  243. public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<TSource, TResult, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  244. {
  245. if (source == null)
  246. {
  247. throw new ArgumentNullException(nameof(source));
  248. }
  249. if (resultSelector == null)
  250. {
  251. throw new ArgumentNullException(nameof(resultSelector));
  252. }
  253. if (progressSelector == null)
  254. {
  255. throw new ArgumentNullException(nameof(progressSelector));
  256. }
  257. return AsyncInfo.Run<TResult, TProgress>((ct, progress) =>
  258. {
  259. return Observable.Create<TResult>(observer =>
  260. {
  261. var obs = Observer.Synchronize(observer);
  262. var data = source.Publish();
  263. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  264. var dataSubscription = resultSelector(data).Subscribe(obs);
  265. var connection = data.Connect();
  266. return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
  267. }).ToTask(ct);
  268. });
  269. }
  270. #endregion
  271. #endregion
  272. }
  273. }
  274. #endif