// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Diagnostics;
using System.Threading;
namespace System.Reactive
{
///
/// Utility methods for dealing with serializing OnXXX signals
/// for an IObserver where concurrent OnNext is still not allowed
/// but concurrent OnError/OnCompleted may happen.
/// This serialization case is generally lower overhead than
/// a full SerializedObserver wrapper and doesn't need
/// allocation.
///
internal static class HalfSerializer
{
///
/// Signals the given item to the observer in a serialized fashion
/// allowing a concurrent OnError or OnCompleted emission to be delayed until
/// the observer.OnNext returns.
/// Do not call OnNext from multiple threads as it may lead to ignored items.
/// Use a full SerializedObserver wrapper for merging multiple sequences.
///
/// The element type of the observer.
/// The observer to signal events in a serialized fashion.
/// The item to signal.
/// Indicates there is an emission going on currently.
/// The field containing an error or terminal indicator.
public static void ForwardOnNext(ISink sink, T item, ref int wip, ref Exception error)
{
if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
{
sink.ForwardOnNext(item);
if (Interlocked.Decrement(ref wip) != 0)
{
var ex = error;
if (ex != ExceptionHelper.Terminated)
{
error = ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
}
else
{
sink.ForwardOnCompleted();
}
}
}
#if !NO_TRACE
else if (error == null)
Trace.TraceWarning("OnNext called while another OnNext call was in progress on the same Observer.");
#endif
}
///
/// Signals the given exception to the observer. If there is a concurrent
/// OnNext emission is happening, saves the exception into the given field
/// otherwise to be picked up by .
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
///
/// The element type of the observer.
/// The observer to signal events in a serialized fashion.
/// The exception to signal sooner or later.
/// Indicates there is an emission going on currently.
/// The field containing an error or terminal indicator.
public static void ForwardOnError(ISink sink, Exception ex, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ex))
{
if (Interlocked.Increment(ref wip) == 1)
{
error = ExceptionHelper.Terminated;
sink.ForwardOnError(ex);
}
}
}
///
/// Signals OnCompleted on the observer. If there is a concurrent
/// OnNext emission happening, the error field will host a special
/// terminal exception signal to be picked up by once it finishes with OnNext and signal the
/// OnCompleted as well.
/// This method can be called concurrently with itself and the other methods of this
/// helper class but only one terminal signal may actually win.
///
/// The element type of the observer.
/// The observer to signal events in a serialized fashion.
/// Indicates there is an emission going on currently.
/// The field containing an error or terminal indicator.
public static void ForwardOnCompleted(ISink sink, ref int wip, ref Exception error)
{
if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
{
if (Interlocked.Increment(ref wip) == 1)
{
sink.ForwardOnCompleted();
}
}
}
}
}