QueryLanguage.Awaiter.cs 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_AWAIT
  3. using System.Threading;
  4. using System.Reactive.Disposables;
  5. using System.Reactive.Subjects;
  6. namespace System.Reactive.Linq
  7. {
  8. internal partial class QueryLanguage
  9. {
  10. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IObservable<TSource> source)
  11. {
  12. var s = new AsyncSubject<TSource>();
  13. source.SubscribeSafe(s);
  14. return s;
  15. }
  16. public virtual AsyncSubject<TSource> GetAwaiter<TSource>(IConnectableObservable<TSource> source)
  17. {
  18. var s = new AsyncSubject<TSource>();
  19. source.SubscribeSafe(s);
  20. source.Connect();
  21. return s;
  22. }
  23. public virtual AsyncSubject<TSource> RunAsync<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
  24. {
  25. var s = new AsyncSubject<TSource>();
  26. var cancel = new Action(() => s.OnError(new OperationCanceledException()));
  27. if (cancellationToken.IsCancellationRequested)
  28. {
  29. cancel();
  30. return s;
  31. }
  32. var d = source.SubscribeSafe(s);
  33. cancellationToken.Register(d.Dispose);
  34. cancellationToken.Register(cancel);
  35. return s;
  36. }
  37. public virtual AsyncSubject<TSource> RunAsync<TSource>(IConnectableObservable<TSource> source, CancellationToken cancellationToken)
  38. {
  39. var s = new AsyncSubject<TSource>();
  40. var cancel = new Action(() => s.OnError(new OperationCanceledException()));
  41. if (cancellationToken.IsCancellationRequested)
  42. {
  43. cancel();
  44. return s;
  45. }
  46. var d = new CompositeDisposable(source.SubscribeSafe(s), source.Connect());
  47. cancellationToken.Register(d.Dispose);
  48. cancellationToken.Register(cancel);
  49. return s;
  50. }
  51. }
  52. }
  53. #endif