AnonymousSafeObserver.cs 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading;
  5. namespace System.Reactive
  6. {
  7. //
  8. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  9. // its implementation aspects.
  10. //
  11. /// <summary>
  12. /// This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer
  13. /// needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its
  14. /// input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods
  15. /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
  16. /// helps debugging and some performance.
  17. /// </summary>
  18. internal sealed class AnonymousSafeObserver<T> : SafeObserver<T>
  19. {
  20. private readonly Action<T> _onNext;
  21. private readonly Action<Exception> _onError;
  22. private readonly Action _onCompleted;
  23. private int _isStopped;
  24. public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  25. {
  26. _onNext = onNext;
  27. _onError = onError;
  28. _onCompleted = onCompleted;
  29. }
  30. public override void OnNext(T value)
  31. {
  32. if (_isStopped == 0)
  33. {
  34. var noError = false;
  35. try
  36. {
  37. _onNext(value);
  38. noError = true;
  39. }
  40. finally
  41. {
  42. if (!noError)
  43. {
  44. Dispose();
  45. }
  46. }
  47. }
  48. }
  49. public override void OnError(Exception error)
  50. {
  51. if (Interlocked.Exchange(ref _isStopped, 1) == 0)
  52. {
  53. using (this)
  54. {
  55. _onError(error);
  56. }
  57. }
  58. }
  59. public override void OnCompleted()
  60. {
  61. if (Interlocked.Exchange(ref _isStopped, 1) == 0)
  62. {
  63. using (this)
  64. {
  65. _onCompleted();
  66. }
  67. }
  68. }
  69. }
  70. }