// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT License.
// See the LICENSE file in the project root for more information.
using System.Collections.Generic;
using System.Diagnostics;
using System.Globalization;
using System.Reactive.Concurrency;
#pragma warning disable 0659
#pragma warning disable 0661
namespace System.Reactive
{
///
/// Indicates the type of a notification.
///
public enum NotificationKind
{
///
/// Represents an OnNext notification.
///
OnNext,
///
/// Represents an OnError notification.
///
OnError,
///
/// Represents an OnCompleted notification.
///
OnCompleted
}
///
/// Represents a notification to an observer.
///
/// The type of the elements received by the observer.
#if !NO_SERIALIZABLE
[Serializable]
#endif
[Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Usage", "CA2218:OverrideGetHashCodeOnOverridingEquals", Justification = "Resembles a discriminated union with finite number of subclasses (external users shouldn't create their own subtypes), each of which does override GetHashCode itself.")]
public abstract class Notification : IEquatable>
{
///
/// Default constructor used by derived types.
///
protected internal Notification()
{
}
///
/// Returns the value of an OnNext notification or throws an exception.
///
public abstract T Value { get; }
///
/// Returns a value that indicates whether the notification has a value.
///
public abstract bool HasValue { get; }
///
/// Returns the exception of an OnError notification or returns null.
///
public abstract Exception Exception { get; }
///
/// Gets the kind of notification that is represented.
///
public abstract NotificationKind Kind { get; }
///
/// Represents an OnNext notification to an observer.
///
[DebuggerDisplay("OnNext({Value})")]
#if !NO_SERIALIZABLE
[Serializable]
#endif
internal sealed class OnNextNotification : Notification
{
///
/// Constructs a notification of a new value.
///
public OnNextNotification(T value)
{
Value = value;
}
///
/// Returns the value of an OnNext notification.
///
public override T Value { get; }
///
/// Returns null.
///
public override Exception Exception => null;
///
/// Returns true.
///
public override bool HasValue => true;
///
/// Returns .
///
public override NotificationKind Kind => NotificationKind.OnNext;
///
/// Returns the hash code for this instance.
///
public override int GetHashCode() => EqualityComparer.Default.GetHashCode(Value);
///
/// Indicates whether this instance and a specified object are equal.
///
public override bool Equals(Notification other)
{
if (ReferenceEquals(this, other))
{
return true;
}
if (other is null)
{
return false;
}
if (other.Kind != NotificationKind.OnNext)
{
return false;
}
return EqualityComparer.Default.Equals(Value, other.Value);
}
///
/// Returns a string representation of this instance.
///
public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnNext({0})", Value);
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
observer.OnNext(Value);
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
return observer.OnNext(Value);
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
onNext(Value);
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
return onNext(Value);
}
}
///
/// Represents an OnError notification to an observer.
///
[DebuggerDisplay("OnError({Exception})")]
#if !NO_SERIALIZABLE
[Serializable]
#endif
internal sealed class OnErrorNotification : Notification
{
///
/// Constructs a notification of an exception.
///
public OnErrorNotification(Exception exception)
{
Exception = exception;
}
///
/// Throws the exception.
///
public override T Value { get { Exception.Throw(); return default; } }
///
/// Returns the exception.
///
public override Exception Exception { get; }
///
/// Returns false.
///
public override bool HasValue => false;
///
/// Returns .
///
public override NotificationKind Kind => NotificationKind.OnError;
///
/// Returns the hash code for this instance.
///
public override int GetHashCode() => Exception.GetHashCode();
///
/// Indicates whether this instance and other are equal.
///
public override bool Equals(Notification other)
{
if (ReferenceEquals(this, other))
{
return true;
}
if (other is null)
{
return false;
}
if (other.Kind != NotificationKind.OnError)
{
return false;
}
return Equals(Exception, other.Exception);
}
///
/// Returns a string representation of this instance.
///
public override string ToString() => string.Format(CultureInfo.CurrentCulture, "OnError({0})", Exception.GetType().FullName);
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
observer.OnError(Exception);
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
return observer.OnError(Exception);
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
onError(Exception);
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
return onError(Exception);
}
}
///
/// Represents an OnCompleted notification to an observer.
///
#if !NO_DEBUGGER_ATTRIBUTES
[DebuggerDisplay("OnCompleted()")]
#endif
#if !NO_SERIALIZABLE
[Serializable]
#endif
internal sealed class OnCompletedNotification : Notification
{
///
/// Complete notifications are stateless thus only one instance
/// can ever exist per type.
///
internal static readonly Notification Instance = new OnCompletedNotification();
///
/// Constructs a notification of the end of a sequence.
///
private OnCompletedNotification()
{
}
///
/// Throws an .
///
public override T Value { get { throw new InvalidOperationException(Strings_Core.COMPLETED_NO_VALUE); } }
///
/// Returns null.
///
public override Exception Exception => null;
///
/// Returns false.
///
public override bool HasValue => false;
///
/// Returns .
///
public override NotificationKind Kind => NotificationKind.OnCompleted;
///
/// Returns the hash code for this instance.
///
public override int GetHashCode() => typeof(T).GetHashCode() ^ 8510;
///
/// Indicates whether this instance and other are equal.
///
public override bool Equals(Notification other)
{
if (ReferenceEquals(this, other))
{
return true;
}
if (other is null)
{
return false;
}
return other.Kind == NotificationKind.OnCompleted;
}
///
/// Returns a string representation of this instance.
///
public override string ToString() => "OnCompleted()";
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public override void Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
observer.OnCompleted();
}
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// Observer to invoke the notification on.
/// Result produced by the observation.
public override TResult Accept(IObserver observer)
{
if (observer == null)
{
throw new ArgumentNullException(nameof(observer));
}
return observer.OnCompleted();
}
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public override void Accept(Action onNext, Action onError, Action onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
onCompleted();
}
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public override TResult Accept(Func onNext, Func onError, Func onCompleted)
{
if (onNext == null)
{
throw new ArgumentNullException(nameof(onNext));
}
if (onError == null)
{
throw new ArgumentNullException(nameof(onError));
}
if (onCompleted == null)
{
throw new ArgumentNullException(nameof(onCompleted));
}
return onCompleted();
}
}
///
/// Determines whether the current object has the same observer message payload as a specified value.
///
/// An object to compare to the current object.
/// true if both objects have the same observer message payload; otherwise, false.
///
/// Equality of objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public abstract bool Equals(Notification other);
///
/// Determines whether the two specified objects have the same observer message payload.
///
/// The first to compare, or null.
/// The second to compare, or null.
/// true if the first value has the same observer message payload as the second value; otherwise, false.
///
/// Equality of objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public static bool operator ==(Notification left, Notification right)
{
if (ReferenceEquals(left, right))
{
return true;
}
if (left is null || right is null)
{
return false;
}
return left.Equals(right);
}
///
/// Determines whether the two specified objects have a different observer message payload.
///
/// The first to compare, or null.
/// The second to compare, or null.
/// true if the first value has a different observer message payload as the second value; otherwise, false.
///
/// Equality of objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two objects represent a different observer method call, use Object.ReferenceEquals identity equality instead.
///
public static bool operator !=(Notification left, Notification right) => !(left == right);
///
/// Determines whether the specified System.Object is equal to the current .
///
/// The System.Object to compare with the current .
/// true if the specified System.Object is equal to the current ; otherwise, false.
///
/// Equality of objects is based on the equality of the observer message payload they represent, including the notification Kind and the Value or Exception (if any).
/// This means two objects can be equal even though they don't represent the same observer method call, but have the same Kind and have equal parameters passed to the observer method.
/// In case one wants to determine whether two objects represent the same observer method call, use Object.ReferenceEquals identity equality instead.
///
public override bool Equals(object obj) => Equals(obj as Notification);
///
/// Invokes the observer's method corresponding to the notification.
///
/// Observer to invoke the notification on.
public abstract void Accept(IObserver observer);
///
/// Invokes the observer's method corresponding to the notification and returns the produced result.
///
/// The type of the result returned from the observer's notification handlers.
/// Observer to invoke the notification on.
/// Result produced by the observation.
public abstract TResult Accept(IObserver observer);
///
/// Invokes the delegate corresponding to the notification.
///
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
public abstract void Accept(Action onNext, Action onError, Action onCompleted);
///
/// Invokes the delegate corresponding to the notification and returns the produced result.
///
/// The type of the result returned from the notification handler delegates.
/// Delegate to invoke for an OnNext notification.
/// Delegate to invoke for an OnError notification.
/// Delegate to invoke for an OnCompleted notification.
/// Result produced by the observation.
public abstract TResult Accept(Func onNext, Func onError, Func onCompleted);
///
/// Returns an observable sequence with a single notification, using the immediate scheduler.
///
/// The observable sequence that surfaces the behavior of the notification upon subscription.
public IObservable ToObservable() => ToObservable(ImmediateScheduler.Instance);
///
/// Returns an observable sequence with a single notification.
///
/// Scheduler to send out the notification calls on.
/// The observable sequence that surfaces the behavior of the notification upon subscription.
public IObservable ToObservable(IScheduler scheduler)
{
if (scheduler == null)
{
throw new ArgumentNullException(nameof(scheduler));
}
return new NotificationToObservable(scheduler, this);
}
private sealed class NotificationToObservable : ObservableBase
{
private readonly IScheduler _scheduler;
private readonly Notification _parent;
public NotificationToObservable(IScheduler scheduler, Notification parent)
{
_scheduler = scheduler;
_parent = parent;
}
protected override IDisposable SubscribeCore(IObserver observer)
{
return _scheduler.ScheduleAction((_parent, observer), state =>
{
var parent = state._parent;
var o = state.observer;
parent.Accept(o);
if (parent.Kind == NotificationKind.OnNext)
{
o.OnCompleted();
}
});
}
}
}
///
/// Provides a set of static methods for constructing notifications.
///
public static class Notification
{
///
/// Creates an object that represents an OnNext notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The value contained in the notification.
/// The OnNext notification containing the value.
public static Notification CreateOnNext(T value)
{
return new Notification.OnNextNotification(value);
}
///
/// Creates an object that represents an OnError notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The exception contained in the notification.
/// The OnError notification containing the exception.
/// is null.
public static Notification CreateOnError(Exception error)
{
if (error == null)
{
throw new ArgumentNullException(nameof(error));
}
return new Notification.OnErrorNotification(error);
}
///
/// Creates an object that represents an OnCompleted notification to an observer.
///
/// The type of the elements received by the observer. Upon dematerialization of the notifications into an observable sequence, this type is used as the element type for the sequence.
/// The OnCompleted notification.
public static Notification CreateOnCompleted()
{
return Notification.OnCompletedNotification.Instance;
}
}
}
#pragma warning restore 0659
#pragma warning restore 0661