// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Subjects; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region + Multicast + public virtual IConnectableObservable Multicast(IObservable source, ISubject subject) { return new ConnectableObservable(source, subject); } public virtual IObservable Multicast(IObservable source, Func> subjectSelector, Func, IObservable> selector) { return new Multicast(source, subjectSelector, selector); } #endregion #region + Publish + public virtual IConnectableObservable Publish(IObservable source) { return source.Multicast(new Subject()); } public virtual IObservable Publish(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new Subject(), selector); } public virtual IConnectableObservable Publish(IObservable source, TSource initialValue) { return source.Multicast(new BehaviorSubject(initialValue)); } public virtual IObservable Publish(IObservable source, Func, IObservable> selector, TSource initialValue) { return source.Multicast(() => new BehaviorSubject(initialValue), selector); } #endregion #region + PublishLast + public virtual IConnectableObservable PublishLast(IObservable source) { return source.Multicast(new AsyncSubject()); } public virtual IObservable PublishLast(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new AsyncSubject(), selector); } #endregion #region + RefCount + public virtual IObservable RefCount(IConnectableObservable source) { return RefCount(source, 1); } public virtual IObservable RefCount(IConnectableObservable source, TimeSpan disconnectTime) { return RefCount(source, disconnectTime, Scheduler.Default); } public virtual IObservable RefCount(IConnectableObservable source, TimeSpan disconnectTime, IScheduler scheduler) { return RefCount(source, 1, disconnectTime, scheduler); } public virtual IObservable RefCount(IConnectableObservable source, int minObservers) { return new RefCount.Eager(source, minObservers); } public virtual IObservable RefCount(IConnectableObservable source, int minObservers, TimeSpan disconnectTime) { return RefCount(source, minObservers, disconnectTime, Scheduler.Default); } public virtual IObservable RefCount(IConnectableObservable source, int minObservers, TimeSpan disconnectTime, IScheduler scheduler) { return new RefCount.Lazy(source, disconnectTime, scheduler, minObservers); } #endregion #region + AutoConnect + public virtual IObservable AutoConnect(IConnectableObservable source, int minObservers = 1, Action onConnect = null) { if (minObservers <= 0) { var d = source.Connect(); onConnect?.Invoke(d); return source; } return new AutoConnect(source, minObservers, onConnect); } #endregion #region + Replay + public virtual IConnectableObservable Replay(IObservable source) { return source.Multicast(new ReplaySubject()); } public virtual IConnectableObservable Replay(IObservable source, IScheduler scheduler) { return source.Multicast(new ReplaySubject(scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector) { return source.Multicast(() => new ReplaySubject(), selector); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, TimeSpan window) { return source.Multicast(new ReplaySubject(window)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, TimeSpan window) { return source.Multicast(() => new ReplaySubject(window), selector); } public virtual IConnectableObservable Replay(IObservable source, TimeSpan window, IScheduler scheduler) { return source.Multicast(new ReplaySubject(window, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, TimeSpan window, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(window, scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, IScheduler scheduler) { return source.Multicast(new ReplaySubject(bufferSize, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(bufferSize, scheduler), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize) { return source.Multicast(new ReplaySubject(bufferSize)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize) { return source.Multicast(() => new ReplaySubject(bufferSize), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, TimeSpan window) { return source.Multicast(new ReplaySubject(bufferSize, window)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window) { return source.Multicast(() => new ReplaySubject(bufferSize, window), selector); } public virtual IConnectableObservable Replay(IObservable source, int bufferSize, TimeSpan window, IScheduler scheduler) { return source.Multicast(new ReplaySubject(bufferSize, window, scheduler)); } public virtual IObservable Replay(IObservable source, Func, IObservable> selector, int bufferSize, TimeSpan window, IScheduler scheduler) { return source.Multicast(() => new ReplaySubject(bufferSize, window, scheduler), selector); } #endregion } }