1
0

QueryLanguage.Awaiter.cs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if HAS_AWAIT
  5. using System;
  6. using System.Threading;
  7. using System.Reactive.Disposables;
  8. using System.Reactive.Subjects;
  9. namespace System.Reactive.Linq
  10. {
  11. internal partial class QueryLanguage
  12. {
  13. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IObservable<TSource> source)
  14. {
  15. return RunAsync<TSource>(source, CancellationToken.None);
  16. }
  17. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IConnectableObservable<TSource> source)
  18. {
  19. return RunAsync<TSource>(source, CancellationToken.None);
  20. }
  21. public virtual AsyncSubject<TSource> RunAsync<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
  22. {
  23. var s = new AsyncSubject<TSource>();
  24. if (cancellationToken.IsCancellationRequested)
  25. {
  26. return Cancel(s, cancellationToken);
  27. }
  28. var d = source.SubscribeSafe(s);
  29. if (cancellationToken.CanBeCanceled)
  30. {
  31. RegisterCancelation(s, d, cancellationToken);
  32. }
  33. return s;
  34. }
  35. public virtual AsyncSubject<TSource> RunAsync<TSource>(IConnectableObservable<TSource> source, CancellationToken cancellationToken)
  36. {
  37. var s = new AsyncSubject<TSource>();
  38. if (cancellationToken.IsCancellationRequested)
  39. {
  40. return Cancel(s, cancellationToken);
  41. }
  42. var d = source.SubscribeSafe(s);
  43. var c = source.Connect();
  44. if (cancellationToken.CanBeCanceled)
  45. {
  46. RegisterCancelation(s, StableCompositeDisposable.Create(d, c), cancellationToken);
  47. }
  48. return s;
  49. }
  50. private static AsyncSubject<T> Cancel<T>(AsyncSubject<T> subject, CancellationToken cancellationToken)
  51. {
  52. subject.OnError(new OperationCanceledException(cancellationToken));
  53. return subject;
  54. }
  55. private static void RegisterCancelation<T>(AsyncSubject<T> subject, IDisposable subscription, CancellationToken token)
  56. {
  57. //
  58. // Separate method used to avoid heap allocation of closure when no cancellation is needed,
  59. // e.g. when CancellationToken.None is provided to the RunAsync overloads.
  60. //
  61. var ctr = token.Register(() =>
  62. {
  63. subscription.Dispose();
  64. Cancel(subject, token);
  65. });
  66. //
  67. // No null-check for ctr is needed:
  68. //
  69. // - CancellationTokenRegistration is a struct
  70. // - Registration will succeed 99% of the time, no warranting an attempt to avoid spurious Subscribe calls
  71. //
  72. subject.Subscribe(Stubs<T>.Ignore, _ => ctr.Dispose(), ctr.Dispose);
  73. }
  74. }
  75. }
  76. #endif