Using.cs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. class Using<TSource, TResource> : Producer<TSource>
  8. where TResource : IDisposable
  9. {
  10. private readonly Func<TResource> _resourceFactory;
  11. private readonly Func<TResource, IObservable<TSource>> _observableFactory;
  12. public Using(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory)
  13. {
  14. _resourceFactory = resourceFactory;
  15. _observableFactory = observableFactory;
  16. }
  17. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  18. {
  19. var sink = new _(this, observer, cancel);
  20. setSink(sink);
  21. return sink.Run();
  22. }
  23. class _ : Sink<TSource>, IObserver<TSource>
  24. {
  25. private readonly Using<TSource, TResource> _parent;
  26. public _(Using<TSource, TResource> parent, IObserver<TSource> observer, IDisposable cancel)
  27. : base(observer, cancel)
  28. {
  29. _parent = parent;
  30. }
  31. public IDisposable Run()
  32. {
  33. var source = default(IObservable<TSource>);
  34. var disposable = Disposable.Empty;
  35. try
  36. {
  37. var resource = _parent._resourceFactory();
  38. if (resource != null)
  39. disposable = resource;
  40. source = _parent._observableFactory(resource);
  41. }
  42. catch (Exception exception)
  43. {
  44. return StableCompositeDisposable.Create(Observable.Throw<TSource>(exception).SubscribeSafe(this), disposable);
  45. }
  46. return StableCompositeDisposable.Create(source.SubscribeSafe(this), disposable);
  47. }
  48. public void OnNext(TSource value)
  49. {
  50. base._observer.OnNext(value);
  51. }
  52. public void OnError(Exception error)
  53. {
  54. base._observer.OnError(error);
  55. base.Dispose();
  56. }
  57. public void OnCompleted()
  58. {
  59. base._observer.OnCompleted();
  60. base.Dispose();
  61. }
  62. }
  63. }
  64. }
  65. #endif