HalfSerializer.cs 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Diagnostics;
  5. using System.Threading;
  6. namespace System.Reactive
  7. {
  8. /// <summary>
  9. /// Utility methods for dealing with serializing OnXXX signals
  10. /// for an IObserver where concurrent OnNext is still not allowed
  11. /// but concurrent OnError/OnCompleted may happen.
  12. /// This serialization case is generally lower overhead than
  13. /// a full SerializedObserver wrapper and doesn't need
  14. /// allocation.
  15. /// </summary>
  16. internal static class HalfSerializer
  17. {
  18. /// <summary>
  19. /// Signals the given item to the observer in a serialized fashion
  20. /// allowing a concurrent OnError or OnCompleted emission to be delayed until
  21. /// the observer.OnNext returns.
  22. /// Do not call OnNext from multiple threads as it may lead to ignored items.
  23. /// Use a full SerializedObserver wrapper for merging multiple sequences.
  24. /// </summary>
  25. /// <typeparam name="T">The element type of the observer.</typeparam>
  26. /// <param name="sink">The observer to signal events in a serialized fashion.</param>
  27. /// <param name="item">The item to signal.</param>
  28. /// <param name="wip">Indicates there is an emission going on currently.</param>
  29. /// <param name="error">The field containing an error or terminal indicator.</param>
  30. public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exception error)
  31. {
  32. if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
  33. {
  34. sink.ForwardOnNext(item);
  35. if (Interlocked.Decrement(ref wip) != 0)
  36. {
  37. var ex = error;
  38. if (ex != ExceptionHelper.Terminated)
  39. {
  40. error = ExceptionHelper.Terminated;
  41. sink.ForwardOnError(ex);
  42. }
  43. else
  44. {
  45. sink.ForwardOnCompleted();
  46. }
  47. }
  48. }
  49. #if !NO_TRACE
  50. else if (error == null)
  51. Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer.");
  52. #endif
  53. }
  54. /// <summary>
  55. /// Signals the given exception to the observer. If there is a concurrent
  56. /// OnNext emission is happening, saves the exception into the given field
  57. /// otherwise to be picked up by <see cref="ForwardOnNext{T}"/>.
  58. /// This method can be called concurrently with itself and the other methods of this
  59. /// helper class but only one terminal signal may actually win.
  60. /// </summary>
  61. /// <typeparam name="T">The element type of the observer.</typeparam>
  62. /// <param name="sink">The observer to signal events in a serialized fashion.</param>
  63. /// <param name="ex">The exception to signal sooner or later.</param>
  64. /// <param name="wip">Indicates there is an emission going on currently.</param>
  65. /// <param name="error">The field containing an error or terminal indicator.</param>
  66. public static void ForwardOnError<T>(ISink<T> sink, Exception ex, ref int wip, ref Exception? error)
  67. {
  68. if (ExceptionHelper.TrySetException(ref error, ex))
  69. {
  70. if (Interlocked.Increment(ref wip) == 1)
  71. {
  72. error = ExceptionHelper.Terminated;
  73. sink.ForwardOnError(ex);
  74. }
  75. }
  76. }
  77. /// <summary>
  78. /// Signals OnCompleted on the observer. If there is a concurrent
  79. /// OnNext emission happening, the error field will host a special
  80. /// terminal exception signal to be picked up by <see cref="ForwardOnNext{T}"/> once it finishes with OnNext and signal the
  81. /// OnCompleted as well.
  82. /// This method can be called concurrently with itself and the other methods of this
  83. /// helper class but only one terminal signal may actually win.
  84. /// </summary>
  85. /// <typeparam name="T">The element type of the observer.</typeparam>
  86. /// <param name="sink">The observer to signal events in a serialized fashion.</param>
  87. /// <param name="wip">Indicates there is an emission going on currently.</param>
  88. /// <param name="error">The field containing an error or terminal indicator.</param>
  89. public static void ForwardOnCompleted<T>(ISink<T> sink, ref int wip, ref Exception? error)
  90. {
  91. if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
  92. {
  93. if (Interlocked.Increment(ref wip) == 1)
  94. {
  95. sink.ForwardOnCompleted();
  96. }
  97. }
  98. }
  99. }
  100. }