// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Linq; namespace System.Reactive.Subjects { /// /// Provides a set of static methods for creating subjects. /// public static class Subject { /// /// Creates a subject from the specified observer and observable. /// /// The type of the elements received by the observer. /// The type of the elements produced by the observable sequence. /// The observer used to send messages to the subject. /// The observable used to subscribe to messages sent from the subject. /// Subject implemented using the given observer and observable. /// or is null. public static ISubject Create(IObserver observer, IObservable observable) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (observable == null) throw new ArgumentNullException(nameof(observable)); return new AnonymousSubject(observer, observable); } /// /// Creates a subject from the specified observer and observable. /// /// The type of the elements received by the observer and produced by the observable sequence. /// The observer used to send messages to the subject. /// The observable used to subscribe to messages sent from the subject. /// Subject implemented using the given observer and observable. /// or is null. public static ISubject Create(IObserver observer, IObservable observable) { if (observer == null) throw new ArgumentNullException(nameof(observer)); if (observable == null) throw new ArgumentNullException(nameof(observable)); return new AnonymousSubject(observer, observable); } /// /// Synchronizes the messages sent to the subject. /// /// The type of the elements received by the subject. /// The type of the elements produced by the subject. /// The subject to synchronize. /// Subject whose messages are synchronized. /// is null. public static ISubject Synchronize(ISubject subject) { if (subject == null) throw new ArgumentNullException(nameof(subject)); return new AnonymousSubject(Observer.Synchronize(subject), subject); } /// /// Synchronizes the messages sent to the subject. /// /// The type of the elements received and produced by the subject. /// The subject to synchronize. /// Subject whose messages are synchronized. /// is null. public static ISubject Synchronize(ISubject subject) { if (subject == null) throw new ArgumentNullException(nameof(subject)); return new AnonymousSubject(Observer.Synchronize(subject), subject); } /// /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler. /// /// The type of the elements received by the subject. /// The type of the elements produced by the subject. /// The subject to synchronize. /// Scheduler to notify observers on. /// Subject whose messages are synchronized and whose observers are notified on the given scheduler. /// or is null. public static ISubject Synchronize(ISubject subject, IScheduler scheduler) { if (subject == null) throw new ArgumentNullException(nameof(subject)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return new AnonymousSubject(Observer.Synchronize(subject), subject.ObserveOn(scheduler)); } /// /// Synchronizes the messages sent to the subject and notifies observers on the specified scheduler. /// /// The type of the elements received and produced by the subject. /// The subject to synchronize. /// Scheduler to notify observers on. /// Subject whose messages are synchronized and whose observers are notified on the given scheduler. /// or is null. public static ISubject Synchronize(ISubject subject, IScheduler scheduler) { if (subject == null) throw new ArgumentNullException(nameof(subject)); if (scheduler == null) throw new ArgumentNullException(nameof(scheduler)); return new AnonymousSubject(Observer.Synchronize(subject), subject.ObserveOn(scheduler)); } private class AnonymousSubject : ISubject { private readonly IObserver _observer; private readonly IObservable _observable; public AnonymousSubject(IObserver observer, IObservable observable) { _observer = observer; _observable = observable; } public void OnCompleted() => _observer.OnCompleted(); public void OnError(Exception error) { if (error == null) throw new ArgumentNullException(nameof(error)); _observer.OnError(error); } public void OnNext(T value) => _observer.OnNext(value); public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException(nameof(observer)); // // [OK] Use of unsafe Subscribe: non-pretentious wrapping of an observable sequence. // return _observable.Subscribe/*Unsafe*/(observer); } } private sealed class AnonymousSubject : AnonymousSubject, ISubject { public AnonymousSubject(IObserver observer, IObservable observable) : base(observer, observable) { } } } }