AsyncInfoObservable.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_WINRT
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Linq;
  7. using System.Reactive.Subjects;
  8. using System.Reactive.Threading.Tasks;
  9. using System.Runtime.InteropServices.WindowsRuntime;
  10. using System.Threading.Tasks;
  11. using Windows.Foundation;
  12. namespace System.Reactive.Linq
  13. {
  14. /// <summary>
  15. /// Provides a set of extension methods to expose observable sequences as Windows Runtime asynchronous actions and operations.
  16. /// </summary>
  17. public static class AsyncInfoObservable
  18. {
  19. #region IAsyncAction
  20. /// <summary>
  21. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence.
  22. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  23. /// </summary>
  24. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  25. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  26. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence.</returns>
  27. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  28. public static IAsyncAction ToAsyncAction<TSource>(this IObservable<TSource> source)
  29. {
  30. if (source == null)
  31. throw new ArgumentNullException("source");
  32. return AsyncInfo.Run(ct => (Task)source.DefaultIfEmpty().ToTask(ct));
  33. }
  34. #region Progress
  35. /// <summary>
  36. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, reporting incremental progress for each element produced by the sequence.
  37. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  38. /// </summary>
  39. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  40. /// <param name="source">Source sequence to expose as an asynchronous action.</param>
  41. /// <returns>Windows Runtime asynchronous action object representing the completion of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  42. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  43. public static IAsyncActionWithProgress<int> ToAsyncActionWithProgress<TSource>(this IObservable<TSource> source)
  44. {
  45. if (source == null)
  46. throw new ArgumentNullException("source");
  47. return AsyncInfo.Run<int>((ct, progress) =>
  48. {
  49. var i = 0;
  50. return (Task)source.Do(_ => progress.Report(i++)).DefaultIfEmpty().ToTask(ct);
  51. });
  52. }
  53. /// <summary>
  54. /// Creates a Windows Runtime asynchronous action that represents the completion of the observable sequence, using a selector function to map the source sequence on a progress reporting sequence.
  55. /// Upon cancellation of the asynchronous action, the subscription to the source sequence will be disposed.
  56. /// </summary>
  57. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  58. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  59. /// <param name="source">Source sequence to expose as an asynchronous action and to compute a progress sequence that gets reported through the asynchronous action.</param>
  60. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  61. /// <returns>Windows Runtime asynchronous action object representing the completion of the result sequence, reporting progress computed through the progress sequence.</returns>
  62. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="progressSelector"/> is null.</exception>
  63. public static IAsyncActionWithProgress<TProgress> ToAsyncActionWithProgress<TSource, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  64. {
  65. if (source == null)
  66. throw new ArgumentNullException("source");
  67. if (progressSelector == null)
  68. throw new ArgumentNullException("progressSelector");
  69. return AsyncInfo.Run<TProgress>((ct, progress) =>
  70. {
  71. return (Task)Observable.Create<TSource>(observer =>
  72. {
  73. var obs = Observer.Synchronize(observer);
  74. var data = source.Publish();
  75. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  76. var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
  77. var connection = data.Connect();
  78. return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
  79. }).ToTask(ct);
  80. });
  81. }
  82. #endregion
  83. #endregion
  84. #region IAsyncOperation<T>
  85. /// <summary>
  86. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence.
  87. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  88. /// </summary>
  89. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  90. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  91. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence.</returns>
  92. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  93. public static IAsyncOperation<TSource> ToAsyncOperation<TSource>(this IObservable<TSource> source)
  94. {
  95. if (source == null)
  96. throw new ArgumentNullException("source");
  97. return AsyncInfo.Run(ct => source.ToTask(ct));
  98. }
  99. /// <summary>
  100. /// Creates a Windows Runtime asynchronous operation that returns the last element of the observable sequence, reporting incremental progress for each element produced by the sequence.
  101. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  102. /// </summary>
  103. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  104. /// <param name="source">Source sequence to expose as an asynchronous operation.</param>
  105. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the observable sequence, reporting incremental progress for each source sequence element.</returns>
  106. /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
  107. public static IAsyncOperationWithProgress<TSource, int> ToAsyncOperationWithProgress<TSource>(this IObservable<TSource> source)
  108. {
  109. if (source == null)
  110. throw new ArgumentNullException("source");
  111. return AsyncInfo.Run<TSource, int>((ct, progress) =>
  112. {
  113. var i = 0;
  114. return source.Do(_ => progress.Report(i++)).ToTask(ct);
  115. });
  116. }
  117. #region Progress
  118. /// <summary>
  119. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, reporting incremental progress for each element produced by the source sequence.
  120. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  121. /// </summary>
  122. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  123. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  124. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation.</param>
  125. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  126. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting incremental progress for each source sequence element.</returns>
  127. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
  128. public static IAsyncOperationWithProgress<TResult, int> ToAsyncOperationWithProgress<TSource, TResult>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector)
  129. {
  130. if (source == null)
  131. throw new ArgumentNullException("source");
  132. if (resultSelector == null)
  133. throw new ArgumentNullException("resultSelector");
  134. return AsyncInfo.Run<TResult, int>((ct, progress) =>
  135. {
  136. var i = 0;
  137. return resultSelector(source.Do(_ => progress.Report(i++))).ToTask(ct);
  138. });
  139. }
  140. /// <summary>
  141. /// Creates a Windows Runtime asynchronous operation that returns the last element of the result sequence, using a selector function to map the source sequence on a progress reporting sequence.
  142. /// Upon cancellation of the asynchronous operation, the subscription to the source sequence will be disposed.
  143. /// </summary>
  144. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  145. /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
  146. /// <typeparam name="TProgress">The type of the elements in the progress sequence.</typeparam>
  147. /// <param name="source">Source sequence to compute a result sequence that gets exposed as an asynchronous operation and a progress sequence that gets reported through the asynchronous operation.</param>
  148. /// <param name="resultSelector">Selector function to map the source sequence on a result sequence.</param>
  149. /// <param name="progressSelector">Selector function to map the source sequence on a progress reporting sequence.</param>
  150. /// <returns>Windows Runtime asynchronous operation object that returns the last element of the result sequence, reporting progress computed through the progress sequence.</returns>
  151. /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> or <paramref name="progressSelector"/> is null.</exception>
  152. public static IAsyncOperationWithProgress<TResult, TProgress> ToAsyncOperationWithProgress<TSource, TResult, TProgress>(this IObservable<TSource> source, Func<IObservable<TSource>, IObservable<TResult>> resultSelector, Func<IObservable<TSource>, IObservable<TProgress>> progressSelector)
  153. {
  154. if (source == null)
  155. throw new ArgumentNullException("source");
  156. if (resultSelector == null)
  157. throw new ArgumentNullException("resultSelector");
  158. if (progressSelector == null)
  159. throw new ArgumentNullException("progressSelector");
  160. return AsyncInfo.Run<TResult, TProgress>((ct, progress) =>
  161. {
  162. return Observable.Create<TResult>(observer =>
  163. {
  164. var obs = Observer.Synchronize(observer);
  165. var data = source.Publish();
  166. var progressSubscription = progressSelector(data).Subscribe(progress.Report, obs.OnError);
  167. var dataSubscription = resultSelector(data).Subscribe(obs);
  168. var connection = data.Connect();
  169. return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
  170. }).ToTask(ct);
  171. });
  172. }
  173. #endregion
  174. #endregion
  175. }
  176. }
  177. #endif