SafeObserver.cs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive
  6. {
  7. //
  8. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  9. // its implementation aspects.
  10. //
  11. internal abstract class SafeObserver<TSource> : ISafeObserver<TSource>
  12. {
  13. private sealed class WrappingSafeObserver : SafeObserver<TSource>
  14. {
  15. private readonly IObserver<TSource> _observer;
  16. public WrappingSafeObserver(IObserver<TSource> observer)
  17. {
  18. _observer = observer;
  19. }
  20. public override void OnNext(TSource value)
  21. {
  22. var __noError = false;
  23. try
  24. {
  25. _observer.OnNext(value);
  26. __noError = true;
  27. }
  28. finally
  29. {
  30. if (!__noError)
  31. {
  32. Dispose();
  33. }
  34. }
  35. }
  36. public override void OnError(Exception error)
  37. {
  38. using (this)
  39. {
  40. _observer.OnError(error);
  41. }
  42. }
  43. public override void OnCompleted()
  44. {
  45. using (this)
  46. {
  47. _observer.OnCompleted();
  48. }
  49. }
  50. }
  51. public static ISafeObserver<TSource> Wrap(IObserver<TSource> observer)
  52. {
  53. if (observer is AnonymousObserver<TSource> a)
  54. {
  55. return a.MakeSafe();
  56. }
  57. else
  58. {
  59. return new WrappingSafeObserver(observer);
  60. }
  61. }
  62. private IDisposable _disposable;
  63. public abstract void OnNext(TSource value);
  64. public abstract void OnError(Exception error);
  65. public abstract void OnCompleted();
  66. public void SetResource(IDisposable resource)
  67. {
  68. Disposable.SetSingle(ref _disposable, resource);
  69. }
  70. public void Dispose()
  71. {
  72. Dispose(true);
  73. }
  74. protected virtual void Dispose(bool disposing)
  75. {
  76. if (disposing)
  77. {
  78. Disposable.TryDispose(ref _disposable);
  79. }
  80. }
  81. }
  82. }