AsyncInfoObservable.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. using System.Reactive.Threading.Tasks;
  9. using System.Runtime.InteropServices.WindowsRuntime;
  10. using System.Threading.Tasks;
  11. using Windows.Foundation;
  12. namespace System.Reactive.Linq
  13. {
  14. /// <summary>
  15. /// Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
  16. /// </summary>
  17. [CLSCompliant(false)]
  18. public static class AsyncInfoObservable
  19. {
  20. #region IAsyncAction
  21. /// <summary>
  22. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence.
  23. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  24. /// </summary>
  25. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  26. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  27. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence.</returns>
  28. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  29. public static IAsyncAction ToAsyncAction<TSource>(this IObservable<TSource> source)
  30. {
  31. if (source == null)
  32. throw new ArgumentNullException(nameof(source));
  33. return AsyncInfo.Run(ct => (Task)source.DefaultIfEmpty().ToTask(ct));
  34. }
  35. #region Progress
  36. /// <summary>
  37. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, reporting incremental progress for each element produced by the sequence.
  38. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  39. /// </summary>
  40. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  41. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  42. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  43. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  44. public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<TSource>(this IObservable<TSource> source)
  45. {
  46. if (source == null)
  47. throw new ArgumentNullException(nameof(source));
  48. return AsyncInfo.Run<int>((ct, progress) =>
  49. {
  50. var i = 0;
  51. return (Task)source.Do(_ => progress.Report(i++)).DefaultIfEmpty().ToTask(ct);
  52. });
  53. }
  54. /// <summary>
  55. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, using a selector function to map the source sequence on a progress reporting sequence.
  56. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  57. /// </summary>
  58. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  59. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  60. /// <param name="source">Source sequence to expose as an asynchronous action and to compute a progress sequence that gets reported through the asynchronous action.</param>
  61. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  62. /// <returns>Windows Runtime asynchronous action object representing the completion of the result sequence, reporting progress computed through the progress sequence.</returns>
  63. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="progressSelector"/> is null.</exception>
  64. public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<TSource, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  65. {
  66. if (source == null)
  67. throw new ArgumentNullException(nameof(source));
  68. if (progressSelector == null)
  69. throw new ArgumentNullException(nameof(progressSelector));
  70. return AsyncInfo.Run<TProgress>((ct, progress) =>
  71. {
  72. return (Task)Observable.Create<TSource>(observer =>
  73. {
  74. var obs = Observer.Synchronize(observer);
  75. var data = source.Publish();
  76. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  77. var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
  78. var connection = data.Connect();
  79. return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
  80. }).ToTask(ct);
  81. });
  82. }
  83. #endregion
  84. #endregion
  85. #region IAsyncOperation<T>
  86. /// <summary>
  87. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence.
  88. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  89. /// </summary>
  90. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  91. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  92. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence.</returns>
  93. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  94. public static IAsyncOperation<TSource> ToAsyncOperation<TSource>(this IObservable<TSource> source)
  95. {
  96. if (source == null)
  97. throw new ArgumentNullException(nameof(source));
  98. return AsyncInfo.Run(ct => source.ToTask(ct));
  99. }
  100. /// <summary>
  101. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence, reporting incremental progress for each element produced by the sequence.
  102. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  103. /// </summary>
  104. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  105. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  106. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  107. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  108. public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<TSource>(this IObservable<TSource> source)
  109. {
  110. if (source == null)
  111. throw new ArgumentNullException(nameof(source));
  112. return AsyncInfo.Run<TSource, int>((ct, progress) =>
  113. {
  114. var i = 0;
  115. return source.Do(_ => progress.Report(i++)).ToTask(ct);
  116. });
  117. }
  118. #region Progress
  119. /// <summary>
  120. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, reporting incremental progress for each element produced by the source sequence.
  121. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  122. /// </summary>
  123. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  124. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  125. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation.</param>
  126. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  127. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting incremental progress for each source sequence element.</returns>
  128. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
  129. public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
  130. {
  131. if (source == null)
  132. throw new ArgumentNullException(nameof(source));
  133. if (resultSelector == null)
  134. throw new ArgumentNullException(nameof(resultSelector));
  135. return AsyncInfo.Run<TResult, int>((ct, progress) =>
  136. {
  137. var i = 0;
  138. return resultSelector(source.Do(_ => progress.Report(i++))).ToTask(ct);
  139. });
  140. }
  141. /// <summary>
  142. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, using a selector function to map the source sequence on a progress reporting sequence.
  143. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  144. /// </summary>
  145. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  146. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  147. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  148. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation and a progress sequence that gets reported through the asynchronous operation.</param>
  149. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  150. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  151. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting progress computed through the progress sequence.</returns>
  152. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> or <paramref name="progressSelector"/> is null.</exception>
  153. public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<TSource, TResult, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  154. {
  155. if (source == null)
  156. throw new ArgumentNullException(nameof(source));
  157. if (resultSelector == null)
  158. throw new ArgumentNullException(nameof(resultSelector));
  159. if (progressSelector == null)
  160. throw new ArgumentNullException(nameof(progressSelector));
  161. return AsyncInfo.Run<TResult, TProgress>((ct, progress) =>
  162. {
  163. return Observable.Create<TResult>(observer =>
  164. {
  165. var obs = Observer.Synchronize(observer);
  166. var data = source.Publish();
  167. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  168. var dataSubscription = resultSelector(data).Subscribe(obs);
  169. var connection = data.Connect();
  170. return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
  171. }).ToTask(ct);
  172. });
  173. }
  174. #endregion
  175. #endregion
  176. }
  177. }
  178. #endif