AsyncInfoObservable.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Threading.Tasks;
  7. using System.Runtime.InteropServices.WindowsRuntime;
  8. using System.Threading.Tasks;
  9. using Windows.Foundation;
  10. namespace System.Reactive.Linq
  11. {
  12. /// <summary>
  13. /// Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
  14. /// </summary>
  15. [CLSCompliant(false)]
  16. public static class AsyncInfoObservable
  17. {
  18. #region IAsyncAction
  19. /// <summary>
  20. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence.
  21. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  22. /// </summary>
  23. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  24. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  25. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence.</returns>
  26. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  27. public static IAsyncAction ToAsyncAction<TSource>(this IObservable<TSource> source)
  28. {
  29. if (source == null)
  30. {
  31. throw new ArgumentNullException(nameof(source));
  32. }
  33. return AsyncInfo.Run(ct => (Task)source.DefaultIfEmpty().ToTask(ct));
  34. }
  35. #region Progress
  36. /// <summary>
  37. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, reporting incremental progress for each element produced by the sequence.
  38. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  39. /// </summary>
  40. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  41. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  42. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  43. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  44. public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<TSource>(this IObservable<TSource> source)
  45. {
  46. if (source == null)
  47. {
  48. throw new ArgumentNullException(nameof(source));
  49. }
  50. return AsyncInfo.Run<int>((ct, progress) =>
  51. {
  52. var i = 0;
  53. return source.Do(_ => progress.Report(i++)).DefaultIfEmpty().ToTask(ct);
  54. });
  55. }
  56. /// <summary>
  57. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, using a selector function to map the source sequence on a progress reporting sequence.
  58. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  59. /// </summary>
  60. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  61. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  62. /// <param name="source">Source sequence to expose as an asynchronous action and to compute a progress sequence that gets reported through the asynchronous action.</param>
  63. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  64. /// <returns>Windows Runtime asynchronous action object representing the completion of the result sequence, reporting progress computed through the progress sequence.</returns>
  65. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="progressSelector"/> is null.</exception>
  66. public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<TSource, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  67. {
  68. if (source == null)
  69. {
  70. throw new ArgumentNullException(nameof(source));
  71. }
  72. if (progressSelector == null)
  73. {
  74. throw new ArgumentNullException(nameof(progressSelector));
  75. }
  76. return AsyncInfo.Run<TProgress>((ct, progress) =>
  77. {
  78. return Observable.Create<TSource>(observer =>
  79. {
  80. var obs = Observer.Synchronize(observer);
  81. var data = source.Publish();
  82. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  83. var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
  84. var connection = data.Connect();
  85. return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
  86. }).ToTask(ct);
  87. });
  88. }
  89. #endregion
  90. #endregion
  91. #region IAsyncOperation<T>
  92. /// <summary>
  93. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence.
  94. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  95. /// </summary>
  96. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  97. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  98. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence.</returns>
  99. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  100. public static IAsyncOperation<TSource> ToAsyncOperation<TSource>(this IObservable<TSource> source)
  101. {
  102. if (source == null)
  103. {
  104. throw new ArgumentNullException(nameof(source));
  105. }
  106. return AsyncInfo.Run(ct => source.ToTask(ct));
  107. }
  108. /// <summary>
  109. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence, reporting incremental progress for each element produced by the sequence.
  110. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  111. /// </summary>
  112. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  113. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  114. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  115. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  116. public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<TSource>(this IObservable<TSource> source)
  117. {
  118. if (source == null)
  119. {
  120. throw new ArgumentNullException(nameof(source));
  121. }
  122. return AsyncInfo.Run<TSource, int>((ct, progress) =>
  123. {
  124. var i = 0;
  125. return source.Do(_ => progress.Report(i++)).ToTask(ct);
  126. });
  127. }
  128. #region Progress
  129. /// <summary>
  130. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, reporting incremental progress for each element produced by the source sequence.
  131. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  132. /// </summary>
  133. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  134. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  135. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation.</param>
  136. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  137. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting incremental progress for each source sequence element.</returns>
  138. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
  139. public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
  140. {
  141. if (source == null)
  142. {
  143. throw new ArgumentNullException(nameof(source));
  144. }
  145. if (resultSelector == null)
  146. {
  147. throw new ArgumentNullException(nameof(resultSelector));
  148. }
  149. return AsyncInfo.Run<TResult, int>((ct, progress) =>
  150. {
  151. var i = 0;
  152. return resultSelector(source.Do(_ => progress.Report(i++))).ToTask(ct);
  153. });
  154. }
  155. /// <summary>
  156. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, using a selector function to map the source sequence on a progress reporting sequence.
  157. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  158. /// </summary>
  159. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  160. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  161. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  162. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation and a progress sequence that gets reported through the asynchronous operation.</param>
  163. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  164. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  165. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting progress computed through the progress sequence.</returns>
  166. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> or <paramref name="progressSelector"/> is null.</exception>
  167. public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<TSource, TResult, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  168. {
  169. if (source == null)
  170. {
  171. throw new ArgumentNullException(nameof(source));
  172. }
  173. if (resultSelector == null)
  174. {
  175. throw new ArgumentNullException(nameof(resultSelector));
  176. }
  177. if (progressSelector == null)
  178. {
  179. throw new ArgumentNullException(nameof(progressSelector));
  180. }
  181. return AsyncInfo.Run<TResult, TProgress>((ct, progress) =>
  182. {
  183. return Observable.Create<TResult>(observer =>
  184. {
  185. var obs = Observer.Synchronize(observer);
  186. var data = source.Publish();
  187. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  188. var dataSubscription = resultSelector(data).Subscribe(obs);
  189. var connection = data.Connect();
  190. return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
  191. }).ToTask(ct);
  192. });
  193. }
  194. #endregion
  195. #endregion
  196. }
  197. }
  198. #endif