StartAsync.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace System.Reactive.Linq
  9. {
  10. partial class AsyncObservable
  11. {
  12. public static IAsyncObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync) => StartAsync(functionAsync, ImmediateAsyncScheduler.Instance);
  13. public static IAsyncObservable<TSource> StartAsync<TSource>(Func<Task<TSource>> functionAsync, IAsyncScheduler scheduler)
  14. {
  15. if (functionAsync == null)
  16. throw new ArgumentNullException(nameof(functionAsync));
  17. if (scheduler == null)
  18. throw new ArgumentNullException(nameof(scheduler));
  19. var task = default(Task<TSource>);
  20. try
  21. {
  22. task = functionAsync();
  23. }
  24. catch (Exception ex)
  25. {
  26. return Throw<TSource>(ex);
  27. }
  28. return task.ToAsyncObservable(scheduler);
  29. }
  30. public static IAsyncObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync) => StartAsync(functionAsync, ImmediateAsyncScheduler.Instance);
  31. public static IAsyncObservable<TSource> StartAsync<TSource>(Func<CancellationToken, Task<TSource>> functionAsync, IAsyncScheduler scheduler)
  32. {
  33. if (functionAsync == null)
  34. throw new ArgumentNullException(nameof(functionAsync));
  35. if (scheduler == null)
  36. throw new ArgumentNullException(nameof(scheduler));
  37. var cancel = new CancellationAsyncDisposable();
  38. var task = default(Task<TSource>);
  39. try
  40. {
  41. task = functionAsync(cancel.Token);
  42. }
  43. catch (Exception ex)
  44. {
  45. return Throw<TSource>(ex);
  46. }
  47. return Create<TSource>(async observer =>
  48. {
  49. var subscription = await task.ToAsyncObservable(scheduler).SubscribeAsync(observer).ConfigureAwait(false);
  50. return StableCompositeAsyncDisposable.Create(cancel, subscription);
  51. });
  52. }
  53. public static IAsyncObservable<Unit> StartAsync(Func<Task> actionAsync) => StartAsync(actionAsync, ImmediateAsyncScheduler.Instance);
  54. public static IAsyncObservable<Unit> StartAsync(Func<Task> actionAsync, IAsyncScheduler scheduler)
  55. {
  56. if (actionAsync == null)
  57. throw new ArgumentNullException(nameof(actionAsync));
  58. if (scheduler == null)
  59. throw new ArgumentNullException(nameof(scheduler));
  60. var task = default(Task);
  61. try
  62. {
  63. task = actionAsync();
  64. }
  65. catch (Exception ex)
  66. {
  67. return Throw<Unit>(ex);
  68. }
  69. return task.ToAsyncObservable(scheduler);
  70. }
  71. public static IAsyncObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync) => StartAsync(actionAsync, ImmediateAsyncScheduler.Instance);
  72. public static IAsyncObservable<Unit> StartAsync(Func<CancellationToken, Task> actionAsync, IAsyncScheduler scheduler)
  73. {
  74. if (actionAsync == null)
  75. throw new ArgumentNullException(nameof(actionAsync));
  76. if (scheduler == null)
  77. throw new ArgumentNullException(nameof(scheduler));
  78. var cancel = new CancellationAsyncDisposable();
  79. var task = default(Task);
  80. try
  81. {
  82. task = actionAsync(cancel.Token);
  83. }
  84. catch (Exception ex)
  85. {
  86. return Throw<Unit>(ex);
  87. }
  88. return Create<Unit>(async observer =>
  89. {
  90. var subscription = await task.ToAsyncObservable(scheduler).SubscribeAsync(observer).ConfigureAwait(false);
  91. return StableCompositeAsyncDisposable.Create(cancel, subscription);
  92. });
  93. }
  94. }
  95. }