SafeObserver.cs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. namespace System.Reactive
  4. {
  5. //
  6. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  7. // its implementation aspects.
  8. //
  9. class SafeObserver<TSource> : IObserver<TSource>
  10. {
  11. private readonly IObserver<TSource> _observer;
  12. private readonly IDisposable _disposable;
  13. public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable)
  14. {
  15. var a = observer as AnonymousObserver<TSource>;
  16. if (a != null)
  17. return a.MakeSafe(disposable);
  18. else
  19. return new SafeObserver<TSource>(observer, disposable);
  20. }
  21. private SafeObserver(IObserver<TSource> observer, IDisposable disposable)
  22. {
  23. _observer = observer;
  24. _disposable = disposable;
  25. }
  26. public void OnNext(TSource value)
  27. {
  28. var __noError = false;
  29. try
  30. {
  31. _observer.OnNext(value);
  32. __noError = true;
  33. }
  34. finally
  35. {
  36. if (!__noError)
  37. _disposable.Dispose();
  38. }
  39. }
  40. public void OnError(Exception error)
  41. {
  42. try
  43. {
  44. _observer.OnError(error);
  45. }
  46. finally
  47. {
  48. _disposable.Dispose();
  49. }
  50. }
  51. public void OnCompleted()
  52. {
  53. try
  54. {
  55. _observer.OnCompleted();
  56. }
  57. finally
  58. {
  59. _disposable.Dispose();
  60. }
  61. }
  62. }
  63. }