// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information.
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace System.Reactive.Subjects
{
///
/// Provides a set of static methods for creating subjects.
///
public static class Subject
{
///
/// Creates a subject from the specified observer and observable.
///
/// The type of the elements received by the observer.
/// The type of the elements produced by the observable sequence.
/// The observer used to send messages to the subject.
/// The observable used to subscribe to messages sent from the subject.
/// Subject implemented using the given observer and observable.
/// or is null.
public static ISubject Create(IObserver observer, IObservable observable)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
if (observable == null)
throw new ArgumentNullException(nameof(observable));
return new AnonymousSubject(observer, observable);
}
///
/// Creates a subject from the specified observer and observable.
///
/// The type of the elements received by the observer and produced by the observable sequence.
/// The observer used to send messages to the subject.
/// The observable used to subscribe to messages sent from the subject.
/// Subject implemented using the given observer and observable.
/// or is null.
public static ISubject Create(IObserver observer, IObservable observable)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
if (observable == null)
throw new ArgumentNullException(nameof(observable));
return new AnonymousSubject(observer, observable);
}
///
/// Synchronizes the messages sent to the subject.
///
/// The type of the elements received by the subject.
/// The type of the elements produced by the subject.
/// The subject to synchronize.
/// Subject whose messages are synchronized.
/// is null.
public static ISubject Synchronize(ISubject subject)
{
if (subject == null)
throw new ArgumentNullException(nameof(subject));
return new AnonymousSubject(Observer.Synchronize(subject), subject);
}
///
/// Synchronizes the messages sent to the subject.
///
/// The type of the elements received and produced by the subject.
/// The subject to synchronize.
/// Subject whose messages are synchronized.
/// is null.
public static ISubject Synchronize(ISubject subject)
{
if (subject == null)
throw new ArgumentNullException(nameof(subject));
return new AnonymousSubject(Observer.Synchronize(subject), subject);
}
///
/// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
///
/// The type of the elements received by the subject.
/// The type of the elements produced by the subject.
/// The subject to synchronize.
/// Scheduler to notify observers on.
/// Subject whose messages are synchronized and whose observers are notified on the given scheduler.
/// or is null.
public static ISubject Synchronize(ISubject subject, IScheduler scheduler)
{
if (subject == null)
throw new ArgumentNullException(nameof(subject));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
return new AnonymousSubject(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
}
///
/// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler.
///
/// The type of the elements received and produced by the subject.
/// The subject to synchronize.
/// Scheduler to notify observers on.
/// Subject whose messages are synchronized and whose observers are notified on the given scheduler.
/// or is null.
public static ISubject Synchronize(ISubject subject, IScheduler scheduler)
{
if (subject == null)
throw new ArgumentNullException(nameof(subject));
if (scheduler == null)
throw new ArgumentNullException(nameof(scheduler));
return new AnonymousSubject(Observer.Synchronize(subject), subject.ObserveOn(scheduler));
}
private class AnonymousSubject : ISubject
{
private readonly IObserver _observer;
private readonly IObservable _observable;
public AnonymousSubject(IObserver observer, IObservable observable)
{
_observer = observer;
_observable = observable;
}
public void OnCompleted() => _observer.OnCompleted();
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentNullException(nameof(error));
_observer.OnError(error);
}
public void OnNext(T value) => _observer.OnNext(value);
public IDisposable Subscribe(IObserver observer)
{
if (observer == null)
throw new ArgumentNullException(nameof(observer));
//
// [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence.
//
return _observable.Subscribe/*Unsafe*/(observer);
}
}
private sealed class AnonymousSubject : AnonymousSubject, ISubject
{
public AnonymousSubject(IObserver observer, IObservable observable)
: base(observer, observable)
{
}
}
}
}