QueryLanguage.Awaiter.cs 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_AWAIT
  3. using System;
  4. using System.Threading;
  5. using System.Reactive.Disposables;
  6. using System.Reactive.Subjects;
  7. namespace System.Reactive.Linq
  8. {
  9. internal partial class QueryLanguage
  10. {
  11. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IObservable<TSource> source)
  12. {
  13. return RunAsync<TSource>(source, CancellationToken.None);
  14. }
  15. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IConnectableObservable<TSource> source)
  16. {
  17. return RunAsync<TSource>(source, CancellationToken.None);
  18. }
  19. public virtual AsyncSubject<TSource> RunAsync<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
  20. {
  21. var s = new AsyncSubject<TSource>();
  22. if (cancellationToken.IsCancellationRequested)
  23. {
  24. return Cancel(s, cancellationToken);
  25. }
  26. var d = source.SubscribeSafe(s);
  27. if (cancellationToken.CanBeCanceled)
  28. {
  29. RegisterCancelation(s, d, cancellationToken);
  30. }
  31. return s;
  32. }
  33. public virtual AsyncSubject<TSource> RunAsync<TSource>(IConnectableObservable<TSource> source, CancellationToken cancellationToken)
  34. {
  35. var s = new AsyncSubject<TSource>();
  36. if (cancellationToken.IsCancellationRequested)
  37. {
  38. return Cancel(s, cancellationToken);
  39. }
  40. var d = source.SubscribeSafe(s);
  41. var c = source.Connect();
  42. if (cancellationToken.CanBeCanceled)
  43. {
  44. RegisterCancelation(s, StableCompositeDisposable.Create(d, c), cancellationToken);
  45. }
  46. return s;
  47. }
  48. private static AsyncSubject<T> Cancel<T>(AsyncSubject<T> subject, CancellationToken cancellationToken)
  49. {
  50. subject.OnError(new OperationCanceledException(cancellationToken));
  51. return subject;
  52. }
  53. private static void RegisterCancelation<T>(AsyncSubject<T> subject, IDisposable subscription, CancellationToken token)
  54. {
  55. //
  56. // Separate method used to avoid heap allocation of closure when no cancellation is needed,
  57. // e.g. when CancellationToken.None is provided to the RunAsync overloads.
  58. //
  59. var ctr = token.Register(() =>
  60. {
  61. subscription.Dispose();
  62. Cancel(subject, token);
  63. });
  64. //
  65. // No null-check for ctr is needed:
  66. //
  67. // - CancellationTokenRegistration is a struct
  68. // - Registration will succeed 99% of the time, no warranting an attempt to avoid spurious Subscribe calls
  69. //
  70. subject.Subscribe(Stubs<T>.Ignore, _ => ctr.Dispose(), ctr.Dispose);
  71. }
  72. }
  73. }
  74. #endif