ToAsyncObservable.cs 3.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. return ToAsyncObservable(source, TaskPoolAsyncScheduler.Default, TaskPoolAsyncScheduler.Default);
  16. }
  17. public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source, IAsyncScheduler scheduler)
  18. {
  19. if (source == null)
  20. throw new ArgumentNullException(nameof(source));
  21. if (scheduler == null)
  22. throw new ArgumentNullException(nameof(scheduler));
  23. return ToAsyncObservable(source, scheduler, scheduler);
  24. }
  25. public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source, IAsyncScheduler subscribeScheduler, IAsyncScheduler disposeScheduler)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException(nameof(source));
  29. if (subscribeScheduler == null)
  30. throw new ArgumentNullException(nameof(subscribeScheduler));
  31. if (disposeScheduler == null)
  32. throw new ArgumentNullException(nameof(disposeScheduler));
  33. return Create<TSource>(async observer =>
  34. {
  35. var d = new CompositeAsyncDisposable();
  36. var subscribeTask = await subscribeScheduler.ScheduleAsync(async ct =>
  37. {
  38. ct.ThrowIfCancellationRequested();
  39. var disposable = source.Subscribe(AsyncObserver.ToObserver(observer));
  40. var disposeTask = AsyncDisposable.Create(() => disposeScheduler.ExecuteAsync(_ =>
  41. {
  42. disposable.Dispose();
  43. return Task.CompletedTask;
  44. }));
  45. await d.AddAsync(disposeTask).RendezVous(subscribeScheduler);
  46. }).ConfigureAwait(false);
  47. await d.AddAsync(subscribeTask).ConfigureAwait(false);
  48. return d;
  49. });
  50. }
  51. }
  52. partial class AsyncObserver
  53. {
  54. // REVIEW: Add a way to parameterize blocking behavior (e.g. blocking, fire-and-forget, async chaining).
  55. public static IObserver<TSource> ToObserver<TSource>(IAsyncObserver<TSource> observer)
  56. {
  57. if (observer == null)
  58. throw new ArgumentNullException(nameof(observer));
  59. return new AsyncToSyncObserver<TSource>(observer);
  60. }
  61. private sealed class AsyncToSyncObserver<T> : IObserver<T>
  62. {
  63. private readonly IAsyncObserver<T> _observer;
  64. public AsyncToSyncObserver(IAsyncObserver<T> observer)
  65. {
  66. _observer = observer;
  67. }
  68. public void OnCompleted() => _observer.OnCompletedAsync().GetAwaiter().GetResult();
  69. public void OnError(Exception error) => _observer.OnErrorAsync(error).GetAwaiter().GetResult();
  70. public void OnNext(T value) => _observer.OnNextAsync(value).GetAwaiter().GetResult();
  71. }
  72. }
  73. }