ToObservable.cs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Linq
  6. {
  7. public static partial class AsyncEnumerable
  8. {
  9. public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. return new ToObservableObservable<TSource>(source);
  14. }
  15. private sealed class ToObservableObservable<T> : IObservable<T>
  16. {
  17. private readonly IAsyncEnumerable<T> source;
  18. public ToObservableObservable(IAsyncEnumerable<T> source)
  19. {
  20. this.source = source;
  21. }
  22. public IDisposable Subscribe(IObserver<T> observer)
  23. {
  24. var ctd = new CancellationTokenDisposable();
  25. var e = source.GetAsyncEnumerator();
  26. var f = default(Action);
  27. f = () => e.MoveNextAsync().ContinueWith(
  28. async t =>
  29. {
  30. if (t.IsFaulted)
  31. {
  32. observer.OnError(t.Exception);
  33. await e.DisposeAsync().ConfigureAwait(false);
  34. }
  35. else if (t.IsCanceled)
  36. {
  37. await e.DisposeAsync().ConfigureAwait(false);
  38. }
  39. else if (t.IsCompleted)
  40. {
  41. if (t.Result)
  42. {
  43. observer.OnNext(e.Current);
  44. if (!ctd.Token.IsCancellationRequested)
  45. {
  46. f();
  47. }
  48. // In case cancellation is requested, this could only have happened
  49. // by disposing the returned composite disposable (see below).
  50. // In that case, e will be disposed too, so there is no need to dispose e here.
  51. }
  52. else
  53. {
  54. observer.OnCompleted();
  55. await e.DisposeAsync().ConfigureAwait(false);
  56. }
  57. }
  58. }, ctd.Token);
  59. f();
  60. return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); /* REVIEW: fire-and-forget? */ }));
  61. }
  62. }
  63. }
  64. }