SafeObserver.cs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive
  5. {
  6. //
  7. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  8. // its implementation aspects.
  9. //
  10. class SafeObserver<TSource> : IObserver<TSource>
  11. {
  12. private readonly IObserver<TSource> _observer;
  13. private readonly IDisposable _disposable;
  14. public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable)
  15. {
  16. var a = observer as AnonymousObserver<TSource>;
  17. if (a != null)
  18. return a.MakeSafe(disposable);
  19. else
  20. return new SafeObserver<TSource>(observer, disposable);
  21. }
  22. private SafeObserver(IObserver<TSource> observer, IDisposable disposable)
  23. {
  24. _observer = observer;
  25. _disposable = disposable;
  26. }
  27. public void OnNext(TSource value)
  28. {
  29. var __noError = false;
  30. try
  31. {
  32. _observer.OnNext(value);
  33. __noError = true;
  34. }
  35. finally
  36. {
  37. if (!__noError)
  38. _disposable.Dispose();
  39. }
  40. }
  41. public void OnError(Exception error)
  42. {
  43. try
  44. {
  45. _observer.OnError(error);
  46. }
  47. finally
  48. {
  49. _disposable.Dispose();
  50. }
  51. }
  52. public void OnCompleted()
  53. {
  54. try
  55. {
  56. _observer.OnCompleted();
  57. }
  58. finally
  59. {
  60. _disposable.Dispose();
  61. }
  62. }
  63. }
  64. }