AnonymousSafeObserver.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. //
  9. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  10. // its implementation aspects.
  11. //
  12. /// <summary>
  13. /// This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer
  14. /// needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its
  15. /// input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods
  16. /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
  17. /// helps debugging and some performance.
  18. /// </summary>
  19. class AnonymousSafeObserver<T> : IObserver<T>
  20. {
  21. private readonly Action<T> _onNext;
  22. private readonly Action<Exception> _onError;
  23. private readonly Action _onCompleted;
  24. private readonly IDisposable _disposable;
  25. private int isStopped;
  26. public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
  27. {
  28. _onNext = onNext;
  29. _onError = onError;
  30. _onCompleted = onCompleted;
  31. _disposable = disposable;
  32. }
  33. public void OnNext(T value)
  34. {
  35. if (isStopped == 0)
  36. {
  37. var __noError = false;
  38. try
  39. {
  40. _onNext(value);
  41. __noError = true;
  42. }
  43. finally
  44. {
  45. if (!__noError)
  46. _disposable.Dispose();
  47. }
  48. }
  49. }
  50. public void OnError(Exception error)
  51. {
  52. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  53. {
  54. try
  55. {
  56. _onError(error);
  57. }
  58. finally
  59. {
  60. _disposable.Dispose();
  61. }
  62. }
  63. }
  64. public void OnCompleted()
  65. {
  66. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  67. {
  68. try
  69. {
  70. _onCompleted();
  71. }
  72. finally
  73. {
  74. _disposable.Dispose();
  75. }
  76. }
  77. }
  78. }
  79. }