Defer.cs 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.Observαble
  6. {
  7. class Defer<TValue> : Producer<TValue>, IEvaluatableObservable<TValue>
  8. {
  9. private readonly Func<IObservable<TValue>> _observableFactory;
  10. public Defer(Func<IObservable<TValue>> observableFactory)
  11. {
  12. _observableFactory = observableFactory;
  13. }
  14. protected override IDisposable Run(IObserver<TValue> observer, IDisposable cancel, Action<IDisposable> setSink)
  15. {
  16. var sink = new _(this, observer, cancel);
  17. setSink(sink);
  18. return sink.Run();
  19. }
  20. public IObservable<TValue> Eval()
  21. {
  22. return _observableFactory();
  23. }
  24. class _ : Sink<TValue>, IObserver<TValue>
  25. {
  26. private readonly Defer<TValue> _parent;
  27. public _(Defer<TValue> parent, IObserver<TValue> observer, IDisposable cancel)
  28. : base(observer, cancel)
  29. {
  30. _parent = parent;
  31. }
  32. public IDisposable Run()
  33. {
  34. var result = default(IObservable<TValue>);
  35. try
  36. {
  37. result = _parent.Eval();
  38. }
  39. catch (Exception exception)
  40. {
  41. base._observer.OnError(exception);
  42. base.Dispose();
  43. return Disposable.Empty;
  44. }
  45. return result.SubscribeSafe(this);
  46. }
  47. public void OnNext(TValue value)
  48. {
  49. base._observer.OnNext(value);
  50. }
  51. public void OnError(Exception error)
  52. {
  53. base._observer.OnError(error);
  54. base.Dispose();
  55. }
  56. public void OnCompleted()
  57. {
  58. base._observer.OnCompleted();
  59. base.Dispose();
  60. }
  61. }
  62. }
  63. }
  64. #endif