// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region + Buffer + #region TimeSpan only public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan) { return Buffer_(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan, IScheduler scheduler) { return Buffer_(source, timeSpan, timeSpan, scheduler); } public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan, TimeSpan timeShift) { return Buffer_(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { return Buffer_(source, timeSpan, timeShift, scheduler); } private static IObservable> Buffer_(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { return new Buffer(source, timeSpan, timeShift, scheduler); } #endregion #region TimeSpan + int public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan, int count) { return Buffer_(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Buffer(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { return Buffer_(source, timeSpan, count, scheduler); } private static IObservable> Buffer_(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { return new Buffer(source, timeSpan, count, scheduler); } #endregion #endregion #region + Delay + #region TimeSpan public virtual IObservable Delay(IObservable source, TimeSpan dueTime) { return Delay_(source, dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Delay(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return Delay_(source, dueTime, scheduler); } private static IObservable Delay_(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return new Delay(source, dueTime, scheduler); } #endregion #region DateTimeOffset public virtual IObservable Delay(IObservable source, DateTimeOffset dueTime) { return Delay_(source, dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Delay(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) { return Delay_(source, dueTime, scheduler); } private static IObservable Delay_(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) { return new Delay(source, dueTime, scheduler); } #endregion #region Duration selector public virtual IObservable Delay(IObservable source, Func> delayDurationSelector) { return Delay_(source, null, delayDurationSelector); } public virtual IObservable Delay(IObservable source, IObservable subscriptionDelay, Func> delayDurationSelector) { return Delay_(source, subscriptionDelay, delayDurationSelector); } private static IObservable Delay_(IObservable source, IObservable subscriptionDelay, Func> delayDurationSelector) { return new Delay(source, subscriptionDelay, delayDurationSelector); } #endregion #endregion #region + DelaySubscription + public virtual IObservable DelaySubscription(IObservable source, TimeSpan dueTime) { return DelaySubscription_(source, dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable DelaySubscription(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return DelaySubscription_(source, dueTime, scheduler); } private static IObservable DelaySubscription_(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return new DelaySubscription.Relative(source, dueTime, scheduler); } public virtual IObservable DelaySubscription(IObservable source, DateTimeOffset dueTime) { return DelaySubscription_(source, dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable DelaySubscription(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) { return DelaySubscription_(source, dueTime, scheduler); } private static IObservable DelaySubscription_(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) { return new DelaySubscription.Absolute(source, dueTime, scheduler); } #endregion #region + Generate + public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { return Generate_(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IScheduler scheduler) { return Generate_(initialState, condition, iterate, resultSelector, timeSelector, scheduler); } private static IObservable Generate_(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IScheduler scheduler) { return new Generate(initialState, condition, iterate, resultSelector, timeSelector, scheduler); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector) { return Generate_(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IScheduler scheduler) { return Generate_(initialState, condition, iterate, resultSelector, timeSelector, scheduler); } private static IObservable Generate_(TState initialState, Func condition, Func iterate, Func resultSelector, Func timeSelector, IScheduler scheduler) { return new Generate(initialState, condition, iterate, resultSelector, timeSelector, scheduler); } #endregion #region + Interval + public virtual IObservable Interval(TimeSpan period) { return Timer_(period, period, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Interval(TimeSpan period, IScheduler scheduler) { return Timer_(period, period, scheduler); } #endregion #region + Sample + public virtual IObservable Sample(IObservable source, TimeSpan interval) { return Sample_(source, interval, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Sample(IObservable source, TimeSpan interval, IScheduler scheduler) { return Sample_(source, interval, scheduler); } private static IObservable Sample_(IObservable source, TimeSpan interval, IScheduler scheduler) { return new Sample(source, interval, scheduler); } public virtual IObservable Sample(IObservable source, IObservable sampler) { return Sample_(source, sampler); } private static IObservable Sample_(IObservable source, IObservable sampler) { return new Sample(source, sampler); } #endregion #region + Skip + public virtual IObservable Skip(IObservable source, TimeSpan duration) { return Skip_(source, duration, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Skip(IObservable source, TimeSpan duration, IScheduler scheduler) { return Skip_(source, duration, scheduler); } private static IObservable Skip_(IObservable source, TimeSpan duration, IScheduler scheduler) { var skip = source as Skip.Time; if (skip != null && skip._scheduler == scheduler) return skip.Combine(duration); return new Skip.Time(source, duration, scheduler); } #endregion #region + SkipLast + public virtual IObservable SkipLast(IObservable source, TimeSpan duration) { return SkipLast_(source, duration, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable SkipLast(IObservable source, TimeSpan duration, IScheduler scheduler) { return SkipLast_(source, duration, scheduler); } private static IObservable SkipLast_(IObservable source, TimeSpan duration, IScheduler scheduler) { return new SkipLast.Time(source, duration, scheduler); } #endregion #region + SkipUntil + public virtual IObservable SkipUntil(IObservable source, DateTimeOffset startTime) { return SkipUntil_(source, startTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable SkipUntil(IObservable source, DateTimeOffset startTime, IScheduler scheduler) { return SkipUntil_(source, startTime, scheduler); } private static IObservable SkipUntil_(IObservable source, DateTimeOffset startTime, IScheduler scheduler) { var skipUntil = source as SkipUntil; if (skipUntil != null && skipUntil._scheduler == scheduler) return skipUntil.Combine(startTime); return new SkipUntil(source, startTime, scheduler); } #endregion #region + Take + public virtual IObservable Take(IObservable source, TimeSpan duration) { return Take_(source, duration, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Take(IObservable source, TimeSpan duration, IScheduler scheduler) { return Take_(source, duration, scheduler); } private static IObservable Take_(IObservable source, TimeSpan duration, IScheduler scheduler) { var take = source as Take.Time; if (take != null && take._scheduler == scheduler) return take.Combine(duration); return new Take.Time(source, duration, scheduler); } #endregion #region + TakeLast + public virtual IObservable TakeLast(IObservable source, TimeSpan duration) { return TakeLast_(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration); } public virtual IObservable TakeLast(IObservable source, TimeSpan duration, IScheduler scheduler) { return TakeLast_(source, duration, scheduler, SchedulerDefaults.Iteration); } public virtual IObservable TakeLast(IObservable source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler) { return TakeLast_(source, duration, timerScheduler, loopScheduler); } private static IObservable TakeLast_(IObservable source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler) { return new TakeLast.Time(source, duration, timerScheduler, loopScheduler); } public virtual IObservable> TakeLastBuffer(IObservable source, TimeSpan duration) { return TakeLastBuffer_(source, duration, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> TakeLastBuffer(IObservable source, TimeSpan duration, IScheduler scheduler) { return TakeLastBuffer_(source, duration, scheduler); } private static IObservable> TakeLastBuffer_(IObservable source, TimeSpan duration, IScheduler scheduler) { return new TakeLastBuffer.Time(source, duration, scheduler); } #endregion #region + TakeUntil + public virtual IObservable TakeUntil(IObservable source, DateTimeOffset endTime) { return TakeUntil_(source, endTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable TakeUntil(IObservable source, DateTimeOffset endTime, IScheduler scheduler) { return TakeUntil_(source, endTime, scheduler); } private static IObservable TakeUntil_(IObservable source, DateTimeOffset endTime, IScheduler scheduler) { var takeUntil = source as TakeUntil; if (takeUntil != null && takeUntil._scheduler == scheduler) return takeUntil.Combine(endTime); return new TakeUntil(source, endTime, scheduler); } #endregion #region + Throttle + public virtual IObservable Throttle(IObservable source, TimeSpan dueTime) { return Throttle_(source, dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Throttle(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return Throttle_(source, dueTime, scheduler); } private static IObservable Throttle_(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return new Throttle(source, dueTime, scheduler); } public virtual IObservable Throttle(IObservable source, Func> throttleDurationSelector) { return new Throttle(source, throttleDurationSelector); } #endregion #region + TimeInterval + public virtual IObservable> TimeInterval(IObservable source) { return TimeInterval_(source, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> TimeInterval(IObservable source, IScheduler scheduler) { return TimeInterval_(source, scheduler); } private static IObservable> TimeInterval_(IObservable source, IScheduler scheduler) { return new TimeInterval(source, scheduler); } #endregion #region + Timeout + #region TimeSpan public virtual IObservable Timeout(IObservable source, TimeSpan dueTime) { return Timeout_(source, dueTime, Observable.Throw(new TimeoutException()), SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timeout(IObservable source, TimeSpan dueTime, IScheduler scheduler) { return Timeout_(source, dueTime, Observable.Throw(new TimeoutException()), scheduler); } public virtual IObservable Timeout(IObservable source, TimeSpan dueTime, IObservable other) { return Timeout_(source, dueTime, other, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timeout(IObservable source, TimeSpan dueTime, IObservable other, IScheduler scheduler) { return Timeout_(source, dueTime, other, scheduler); } private static IObservable Timeout_(IObservable source, TimeSpan dueTime, IObservable other, IScheduler scheduler) { return new Timeout.Relative(source, dueTime, other, scheduler); } #endregion #region DateTimeOffset public virtual IObservable Timeout(IObservable source, DateTimeOffset dueTime) { return Timeout_(source, dueTime, Observable.Throw(new TimeoutException()), SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timeout(IObservable source, DateTimeOffset dueTime, IScheduler scheduler) { return Timeout_(source, dueTime, Observable.Throw(new TimeoutException()), scheduler); } public virtual IObservable Timeout(IObservable source, DateTimeOffset dueTime, IObservable other) { return Timeout_(source, dueTime, other, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timeout(IObservable source, DateTimeOffset dueTime, IObservable other, IScheduler scheduler) { return Timeout_(source, dueTime, other, scheduler); } private static IObservable Timeout_(IObservable source, DateTimeOffset dueTime, IObservable other, IScheduler scheduler) { return new Timeout.Absolute(source, dueTime, other, scheduler); } #endregion #region Duration selector public virtual IObservable Timeout(IObservable source, Func> timeoutDurationSelector) { return Timeout_(source, Observable.Never(), timeoutDurationSelector, Observable.Throw(new TimeoutException())); } public virtual IObservable Timeout(IObservable source, Func> timeoutDurationSelector, IObservable other) { return Timeout_(source, Observable.Never(), timeoutDurationSelector, other); } public virtual IObservable Timeout(IObservable source, IObservable firstTimeout, Func> timeoutDurationSelector) { return Timeout_(source, firstTimeout, timeoutDurationSelector, Observable.Throw(new TimeoutException())); } public virtual IObservable Timeout(IObservable source, IObservable firstTimeout, Func> timeoutDurationSelector, IObservable other) { return Timeout_(source, firstTimeout, timeoutDurationSelector, other); } private static IObservable Timeout_(IObservable source, IObservable firstTimeout, Func> timeoutDurationSelector, IObservable other) { return new Timeout(source, firstTimeout, timeoutDurationSelector, other); } #endregion #endregion #region + Timer + public virtual IObservable Timer(TimeSpan dueTime) { return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timer(DateTimeOffset dueTime) { return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timer(TimeSpan dueTime, TimeSpan period) { return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timer(DateTimeOffset dueTime, TimeSpan period) { return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable Timer(TimeSpan dueTime, IScheduler scheduler) { return Timer_(dueTime, scheduler); } public virtual IObservable Timer(DateTimeOffset dueTime, IScheduler scheduler) { return Timer_(dueTime, scheduler); } public virtual IObservable Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) { return Timer_(dueTime, period, scheduler); } public virtual IObservable Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler) { return Timer_(dueTime, period, scheduler); } private static IObservable Timer_(TimeSpan dueTime, IScheduler scheduler) { return new Timer.Single.Relative(dueTime, scheduler); } private static IObservable Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler) { return new Timer.Periodic.Relative(dueTime, period, scheduler); } private static IObservable Timer_(DateTimeOffset dueTime, IScheduler scheduler) { return new Timer.Single.Absolute(dueTime, scheduler); } private static IObservable Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler) { return new Timer.Periodic.Absolute(dueTime, period, scheduler); } #endregion #region + Timestamp + public virtual IObservable> Timestamp(IObservable source) { return Timestamp_(source, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Timestamp(IObservable source, IScheduler scheduler) { return Timestamp_(source, scheduler); } private static IObservable> Timestamp_(IObservable source, IScheduler scheduler) { return new Timestamp(source, scheduler); } #endregion #region + Window + #region TimeSpan only public virtual IObservable> Window(IObservable source, TimeSpan timeSpan) { return Window_(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Window(IObservable source, TimeSpan timeSpan, IScheduler scheduler) { return Window_(source, timeSpan, timeSpan, scheduler); } public virtual IObservable> Window(IObservable source, TimeSpan timeSpan, TimeSpan timeShift) { return Window_(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Window(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { return Window_(source, timeSpan, timeShift, scheduler); } private static IObservable> Window_(IObservable source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler) { return new Window(source, timeSpan, timeShift, scheduler); } #endregion #region TimeSpan + int public virtual IObservable> Window(IObservable source, TimeSpan timeSpan, int count) { return Window_(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations); } public virtual IObservable> Window(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { return Window_(source, timeSpan, count, scheduler); } private static IObservable> Window_(IObservable source, TimeSpan timeSpan, int count, IScheduler scheduler) { return new Window(source, timeSpan, count, scheduler); } #endregion #endregion } }