// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Threading; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region - Chunkify - public virtual IEnumerable> Chunkify(IObservable source) { return source.Collect>(() => new List(), (lst, x) => { lst.Add(x); return lst; }, _ => new List()); } #endregion #region + Collect + public virtual IEnumerable Collect(IObservable source, Func newCollector, Func merge) { return Collect_(source, newCollector, merge, _ => newCollector()); } public virtual IEnumerable Collect(IObservable source, Func getInitialCollector, Func merge, Func getNewCollector) { return Collect_(source, getInitialCollector, merge, getNewCollector); } private static IEnumerable Collect_(IObservable source, Func getInitialCollector, Func merge, Func getNewCollector) { return new Collect(source, getInitialCollector, merge, getNewCollector); } #endregion #region First public virtual TSource First(IObservable source) { return FirstOrDefaultInternal(source, true); } public virtual TSource First(IObservable source, Func predicate) { return First(Where(source, predicate)); } #endregion #region FirstOrDefault public virtual TSource FirstOrDefault(IObservable source) { return FirstOrDefaultInternal(source, false); } public virtual TSource FirstOrDefault(IObservable source, Func predicate) { return FirstOrDefault(Where(source, predicate)); } private static TSource FirstOrDefaultInternal(IObservable source, bool throwOnEmpty) { using (var consumer = new FirstBlocking()) { using (source.Subscribe(consumer)) { consumer.Wait(); } consumer._error.ThrowIfNotNull(); if (throwOnEmpty && !consumer._hasValue) { throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); } return consumer._value; } } #endregion #region + ForEach + public virtual void ForEach(IObservable source, Action onNext) { using (var evt = new WaitAndSetOnce()) { var sink = new ForEach.Observer(onNext, () => evt.Set()); using (source.SubscribeSafe(sink)) { evt.WaitOne(); } sink.Error.ThrowIfNotNull(); } } public virtual void ForEach(IObservable source, Action onNext) { using (var evt = new WaitAndSetOnce()) { var sink = new ForEach.ObserverIndexed(onNext, () => evt.Set()); using (source.SubscribeSafe(sink)) { evt.WaitOne(); } sink.Error.ThrowIfNotNull(); } } #endregion #region + GetEnumerator + public virtual IEnumerator GetEnumerator(IObservable source) { var e = new GetEnumerator(); return e.Run(source); } #endregion #region Last public virtual TSource Last(IObservable source) { return LastOrDefaultInternal(source, true); } public virtual TSource Last(IObservable source, Func predicate) { return Last(Where(source, predicate)); } #endregion #region LastOrDefault public virtual TSource LastOrDefault(IObservable source) { return LastOrDefaultInternal(source, false); } public virtual TSource LastOrDefault(IObservable source, Func predicate) { return LastOrDefault(Where(source, predicate)); } private static TSource LastOrDefaultInternal(IObservable source, bool throwOnEmpty) { using (var consumer = new LastBlocking()) { using (source.Subscribe(consumer)) { consumer.Wait(); } consumer._error.ThrowIfNotNull(); if (throwOnEmpty && !consumer._hasValue) { throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); } return consumer._value; } } #endregion #region + Latest + public virtual IEnumerable Latest(IObservable source) { return new Latest(source); } #endregion #region + MostRecent + public virtual IEnumerable MostRecent(IObservable source, TSource initialValue) { return new MostRecent(source, initialValue); } #endregion #region + Next + public virtual IEnumerable Next(IObservable source) { return new Next(source); } #endregion #region Single public virtual TSource Single(IObservable source) { return SingleOrDefaultInternal(source, true); } public virtual TSource Single(IObservable source, Func predicate) { return Single(Where(source, predicate)); } #endregion #region SingleOrDefault public virtual TSource SingleOrDefault(IObservable source) { return SingleOrDefaultInternal(source, false); } public virtual TSource SingleOrDefault(IObservable source, Func predicate) { return SingleOrDefault(Where(source, predicate)); } private static TSource SingleOrDefaultInternal(IObservable source, bool throwOnEmpty) { var value = default(TSource); var seenValue = false; var moreThanOneElement = false; var ex = default(Exception); using (var evt = new WaitAndSetOnce()) { // // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink. // using (source.Subscribe/*Unsafe*/(new AnonymousObserver( v => { if (seenValue) { moreThanOneElement = true; evt.Set(); } value = v; seenValue = true; }, e => { ex = e; evt.Set(); }, () => { evt.Set(); }))) { evt.WaitOne(); } } ex.ThrowIfNotNull(); if (moreThanOneElement) { throw new InvalidOperationException(Strings_Linq.MORE_THAN_ONE_ELEMENT); } if (throwOnEmpty && !seenValue) { throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS); } return value; } #endregion #region Wait public virtual TSource Wait(IObservable source) { return LastOrDefaultInternal(source, true); } #endregion #region |> Helpers <| private class WaitAndSetOnce : IDisposable { private readonly ManualResetEvent _evt; private int _hasSet; public WaitAndSetOnce() { _evt = new ManualResetEvent(false); } public void Set() { if (Interlocked.Exchange(ref _hasSet, 1) == 0) { _evt.Set(); } } public void WaitOne() { _evt.WaitOne(); } public void Dispose() { #if HAS_MREEXPLICITDISPOSABLE ((IDisposable)_evt).Dispose(); #else _evt.Dispose(); #endif } } #endregion } }