// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq { internal class QueryLanguageEx : IQueryLanguageEx { #region Create public virtual IObservable Create(Func, IEnumerable>> iteratorMethod) { return new CreateWithEnumerableObservable(iteratorMethod); } private sealed class CreateWithEnumerableObservable : ObservableBase { private readonly Func, IEnumerable>> _iteratorMethod; public CreateWithEnumerableObservable(Func, IEnumerable>> iteratorMethod) { _iteratorMethod = iteratorMethod; } protected override IDisposable SubscribeCore(IObserver observer) { return _iteratorMethod(observer) .Concat() .Subscribe(new TerminalOnlyObserver(observer)); } } private sealed class TerminalOnlyObserver : IObserver { private readonly IObserver _observer; public TerminalOnlyObserver(IObserver observer) { _observer = observer; } public void OnCompleted() { _observer.OnCompleted(); } public void OnError(Exception error) { _observer.OnError(error); } public void OnNext(object value) { // deliberately ignored } } public virtual IObservable Create(Func>> iteratorMethod) { return new CreateWithOnlyEnumerableObservable(iteratorMethod); } private sealed class CreateWithOnlyEnumerableObservable : ObservableBase { private readonly Func>> _iteratorMethod; public CreateWithOnlyEnumerableObservable(Func>> iteratorMethod) { _iteratorMethod = iteratorMethod; } protected override IDisposable SubscribeCore(IObserver observer) { return _iteratorMethod() .Concat() .Subscribe(new TerminalOnlyObserver(observer)); } } #endregion #region Expand public virtual IObservable Expand(IObservable source, Func> selector, IScheduler scheduler) { return new ExpandObservable(source, selector, scheduler); } private sealed class ExpandObservable : ObservableBase { private readonly IObservable _source; private readonly Func> _selector; private readonly IScheduler _scheduler; public ExpandObservable(IObservable source, Func> selector, IScheduler scheduler) { _source = source; _selector = selector; _scheduler = scheduler; } protected override IDisposable SubscribeCore(IObserver observer) { var outGate = new object(); var q = new Queue>(); var m = new SerialDisposable(); var d = new CompositeDisposable { m }; var activeCount = 0; var isAcquired = false; void ensureActive() { var isOwner = false; lock (q) { if (q.Count > 0) { isOwner = !isAcquired; isAcquired = true; } } if (isOwner) { m.Disposable = _scheduler.Schedule(self => { var work = default(IObservable); lock (q) { if (q.Count > 0) { work = q.Dequeue(); } else { isAcquired = false; return; } } var m1 = new SingleAssignmentDisposable(); d.Add(m1); m1.Disposable = work.Subscribe( x => { lock (outGate) { observer.OnNext(x); } var result = default(IObservable); try { result = _selector(x); } catch (Exception exception) { lock (outGate) { observer.OnError(exception); } } lock (q) { q.Enqueue(result); activeCount++; } ensureActive(); }, exception => { lock (outGate) { observer.OnError(exception); } }, () => { d.Remove(m1); var done = false; lock (q) { activeCount--; if (activeCount == 0) { done = true; } } if (done) { lock (outGate) { observer.OnCompleted(); } } }); self(); }); } } lock (q) { q.Enqueue(_source); activeCount++; } ensureActive(); return d; } } public virtual IObservable Expand(IObservable source, Func> selector) { return source.Expand(selector, SchedulerDefaults.Iteration); } #endregion #region ForkJoin public virtual IObservable ForkJoin(IObservable first, IObservable second, Func resultSelector) { return Combine(first, second, (observer, leftSubscription, rightSubscription) => { var leftStopped = false; var rightStopped = false; var hasLeft = false; var hasRight = false; var lastLeft = default(TFirst); var lastRight = default(TSecond); return new BinaryObserver( left => { switch (left.Kind) { case NotificationKind.OnNext: hasLeft = true; lastLeft = left.Value; break; case NotificationKind.OnError: rightSubscription.Dispose(); observer.OnError(left.Exception); break; case NotificationKind.OnCompleted: leftStopped = true; if (rightStopped) { if (!hasLeft) { observer.OnCompleted(); } else if (!hasRight) { observer.OnCompleted(); } else { TResult result; try { result = resultSelector(lastLeft, lastRight); } catch (Exception exception) { observer.OnError(exception); return; } observer.OnNext(result); observer.OnCompleted(); } } break; } }, right => { switch (right.Kind) { case NotificationKind.OnNext: hasRight = true; lastRight = right.Value; break; case NotificationKind.OnError: leftSubscription.Dispose(); observer.OnError(right.Exception); break; case NotificationKind.OnCompleted: rightStopped = true; if (leftStopped) { if (!hasLeft) { observer.OnCompleted(); } else if (!hasRight) { observer.OnCompleted(); } else { TResult result; try { result = resultSelector(lastLeft, lastRight); } catch (Exception exception) { observer.OnError(exception); return; } observer.OnNext(result); observer.OnCompleted(); } } break; } }); }); } public virtual IObservable ForkJoin(params IObservable[] sources) { return sources.ForkJoin(); } public virtual IObservable ForkJoin(IEnumerable> sources) { return new ForkJoinObservable(sources); } private sealed class ForkJoinObservable : ObservableBase { private readonly IEnumerable> _sources; public ForkJoinObservable(IEnumerable> sources) { _sources = sources; } protected override IDisposable SubscribeCore(IObserver observer) { var allSources = _sources.ToArray(); var count = allSources.Length; if (count == 0) { observer.OnCompleted(); return Disposable.Empty; } var group = new CompositeDisposable(allSources.Length); var gate = new object(); var finished = false; var hasResults = new bool[count]; var hasCompleted = new bool[count]; var results = new List(count); lock (gate) { for (var index = 0; index < count; index++) { var currentIndex = index; var source = allSources[index]; results.Add(default); group.Add(source.Subscribe( value => { lock (gate) { if (!finished) { hasResults[currentIndex] = true; results[currentIndex] = value; } } }, error => { lock (gate) { finished = true; observer.OnError(error); group.Dispose(); } }, () => { lock (gate) { if (!finished) { if (!hasResults[currentIndex]) { observer.OnCompleted(); return; } hasCompleted[currentIndex] = true; foreach (var completed in hasCompleted) { if (!completed) { return; } } finished = true; observer.OnNext(results.ToArray()); observer.OnCompleted(); } } })); } } return group; } } #endregion #region Let public virtual IObservable Let(IObservable source, Func, IObservable> function) { return function(source); } #endregion #region ManySelect public virtual IObservable ManySelect(IObservable source, Func, TResult> selector) { return ManySelect(source, selector, DefaultScheduler.Instance); } public virtual IObservable ManySelect(IObservable source, Func, TResult> selector, IScheduler scheduler) { return Observable.Defer(() => { var chain = default(ChainObservable); return source .Select( x => { var curr = new ChainObservable(x); if (chain != null) { chain.OnNext(curr); } chain = curr; return (IObservable)curr; }) .Do( _ => { }, exception => { if (chain != null) { chain.OnError(exception); } }, () => { if (chain != null) { chain.OnCompleted(); } }) .ObserveOn(scheduler) .Select(selector); }); } private class ChainObservable : ISubject, T> { private readonly T _head; private readonly AsyncSubject> _tail = new AsyncSubject>(); public ChainObservable(T head) { _head = head; } public IDisposable Subscribe(IObserver observer) { var g = new CompositeDisposable(); g.Add(CurrentThreadScheduler.Instance.ScheduleAction((observer, g, @this: this), state => { state.observer.OnNext(state.@this._head); state.g.Add(state.@this._tail.Merge().Subscribe(state.observer)); })); return g; } public void OnCompleted() { OnNext(Observable.Empty()); } public void OnError(Exception error) { OnNext(Observable.Throw(error)); } public void OnNext(IObservable value) { _tail.OnNext(value); _tail.OnCompleted(); } } #endregion #region ToListObservable public virtual ListObservable ToListObservable(IObservable source) { return new ListObservable(source); } #endregion #region |> Helpers <| private static IObservable Combine(IObservable leftSource, IObservable rightSource, Func, IDisposable, IDisposable, IObserver, Notification>>> combinerSelector) { return new CombineObservable(leftSource, rightSource, combinerSelector); } private sealed class CombineObservable : ObservableBase { private readonly IObservable _leftSource; private readonly IObservable _rightSource; private readonly Func, IDisposable, IDisposable, IObserver, Notification>>> _combinerSelector; public CombineObservable(IObservable leftSource, IObservable rightSource, Func, IDisposable, IDisposable, IObserver, Notification>>> combinerSelector) { _leftSource = leftSource; _rightSource = rightSource; _combinerSelector = combinerSelector; } protected override IDisposable SubscribeCore(IObserver observer) { var leftSubscription = new SingleAssignmentDisposable(); var rightSubscription = new SingleAssignmentDisposable(); var combiner = _combinerSelector(observer, leftSubscription, rightSubscription); var gate = new object(); leftSubscription.Disposable = _leftSource.Materialize().Select(x => Either, Notification>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner); rightSubscription.Disposable = _rightSource.Materialize().Select(x => Either, Notification>.CreateRight(x)).Synchronize(gate).Subscribe(combiner); return StableCompositeDisposable.Create(leftSubscription, rightSubscription); } } #endregion } }