AnonymousSafeObserver.cs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Threading;
  4. namespace System.Reactive
  5. {
  6. //
  7. // See AutoDetachObserver.cs for more information on the safeguarding requirement and
  8. // its implementation aspects.
  9. //
  10. /// <summary>
  11. /// This class fuses logic from ObserverBase, AnonymousObserver, and SafeObserver into one class. When an observer
  12. /// needs to be safeguarded, an instance of this type can be created by SafeObserver.Create when it detects its
  13. /// input is an AnonymousObserver, which is commonly used by end users when using the Subscribe extension methods
  14. /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
  15. /// helps debugging and some performance.
  16. /// </summary>
  17. class AnonymousSafeObserver<T> : IObserver<T>
  18. {
  19. private readonly Action<T> _onNext;
  20. private readonly Action<Exception> _onError;
  21. private readonly Action _onCompleted;
  22. private readonly IDisposable _disposable;
  23. private int isStopped;
  24. public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
  25. {
  26. _onNext = onNext;
  27. _onError = onError;
  28. _onCompleted = onCompleted;
  29. _disposable = disposable;
  30. }
  31. public void OnNext(T value)
  32. {
  33. if (isStopped == 0)
  34. {
  35. var __noError = false;
  36. try
  37. {
  38. _onNext(value);
  39. __noError = true;
  40. }
  41. finally
  42. {
  43. if (!__noError)
  44. _disposable.Dispose();
  45. }
  46. }
  47. }
  48. public void OnError(Exception error)
  49. {
  50. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  51. {
  52. try
  53. {
  54. _onError(error);
  55. }
  56. finally
  57. {
  58. _disposable.Dispose();
  59. }
  60. }
  61. }
  62. public void OnCompleted()
  63. {
  64. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  65. {
  66. try
  67. {
  68. _onCompleted();
  69. }
  70. finally
  71. {
  72. _disposable.Dispose();
  73. }
  74. }
  75. }
  76. }
  77. }