ObserverBase.cs 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading;
  5. namespace System.Reactive
  6. {
  7. /// <summary>
  8. /// Abstract base class for implementations of the IObserver&lt;T&gt; interface.
  9. /// </summary>
  10. /// <remarks>This base class enforces the grammar of observers where OnError and OnCompleted are terminal messages.</remarks>
  11. /// <typeparam name="T">The type of the elements in the sequence.</typeparam>
  12. public abstract class ObserverBase<T> : IObserver<T>, IDisposable
  13. {
  14. private int isStopped;
  15. /// <summary>
  16. /// Creates a new observer in a non-stopped state.
  17. /// </summary>
  18. protected ObserverBase()
  19. {
  20. isStopped = 0;
  21. }
  22. /// <summary>
  23. /// Notifies the observer of a new element in the sequence.
  24. /// </summary>
  25. /// <param name="value">Next element in the sequence.</param>
  26. public void OnNext(T value)
  27. {
  28. if (Volatile.Read(ref isStopped) == 0)
  29. OnNextCore(value);
  30. }
  31. /// <summary>
  32. /// Implement this method to react to the receival of a new element in the sequence.
  33. /// </summary>
  34. /// <param name="value">Next element in the sequence.</param>
  35. /// <remarks>This method only gets called when the observer hasn't stopped yet.</remarks>
  36. protected abstract void OnNextCore(T value);
  37. /// <summary>
  38. /// Notifies the observer that an exception has occurred.
  39. /// </summary>
  40. /// <param name="error">The error that has occurred.</param>
  41. /// <exception cref="ArgumentNullException"><paramref name="error"/> is null.</exception>
  42. public void OnError(Exception error)
  43. {
  44. if (error == null)
  45. throw new ArgumentNullException(nameof(error));
  46. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  47. {
  48. OnErrorCore(error);
  49. }
  50. }
  51. /// <summary>
  52. /// Implement this method to react to the occurrence of an exception.
  53. /// </summary>
  54. /// <param name="error">The error that has occurred.</param>
  55. /// <remarks>This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.</remarks>
  56. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Naming", "CA1716:IdentifiersShouldNotMatchKeywords", MessageId = "Error", Justification = "Same name as in the IObserver<T> definition of OnError in the BCL.")]
  57. protected abstract void OnErrorCore(Exception error);
  58. /// <summary>
  59. /// Notifies the observer of the end of the sequence.
  60. /// </summary>
  61. public void OnCompleted()
  62. {
  63. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  64. {
  65. OnCompletedCore();
  66. }
  67. }
  68. /// <summary>
  69. /// Implement this method to react to the end of the sequence.
  70. /// </summary>
  71. /// <remarks>This method only gets called when the observer hasn't stopped yet, and causes the observer to stop.</remarks>
  72. protected abstract void OnCompletedCore();
  73. /// <summary>
  74. /// Disposes the observer, causing it to transition to the stopped state.
  75. /// </summary>
  76. public void Dispose()
  77. {
  78. Dispose(true);
  79. GC.SuppressFinalize(this);
  80. }
  81. /// <summary>
  82. /// Core implementation of IDisposable.
  83. /// </summary>
  84. /// <param name="disposing">true if the Dispose call was triggered by the IDisposable.Dispose method; false if it was triggered by the finalizer.</param>
  85. protected virtual void Dispose(bool disposing)
  86. {
  87. if (disposing)
  88. {
  89. Volatile.Write(ref isStopped, 1);
  90. }
  91. }
  92. internal bool Fail(Exception error)
  93. {
  94. if (Interlocked.Exchange(ref isStopped, 1) == 0)
  95. {
  96. OnErrorCore(error);
  97. return true;
  98. }
  99. return false;
  100. }
  101. }
  102. }