// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region ForEachAsync public virtual Task ForEachAsync(IObservable source, Action onNext) { return ForEachAsync_(source, onNext, CancellationToken.None); } public virtual Task ForEachAsync(IObservable source, Action onNext, CancellationToken cancellationToken) { return ForEachAsync_(source, onNext, cancellationToken); } public virtual Task ForEachAsync(IObservable source, Action onNext) { var i = 0; return ForEachAsync_(source, x => onNext(x, checked(i++)), CancellationToken.None); } public virtual Task ForEachAsync(IObservable source, Action onNext, CancellationToken cancellationToken) { var i = 0; return ForEachAsync_(source, x => onNext(x, checked(i++)), cancellationToken); } private static Task ForEachAsync_(IObservable source, Action onNext, CancellationToken cancellationToken) { var tcs = new TaskCompletionSource(); var subscription = new SingleAssignmentDisposable(); var ctr = default(CancellationTokenRegistration); if (cancellationToken.CanBeCanceled) { ctr = cancellationToken.Register(() => { tcs.TrySetCanceled(cancellationToken); subscription.Dispose(); }); } if (!cancellationToken.IsCancellationRequested) { // Making sure we always complete, even if disposing throws. var dispose = new Action(action => { try { ctr.Dispose(); // no null-check needed (struct) subscription.Dispose(); } catch (Exception ex) { tcs.TrySetException(ex); return; } action(); }); var taskCompletionObserver = new AnonymousObserver( x => { if (!subscription.IsDisposed) { try { onNext(x); } catch (Exception exception) { dispose(() => tcs.TrySetException(exception)); } } }, exception => { dispose(() => tcs.TrySetException(exception)); }, () => { dispose(() => tcs.TrySetResult(null)); } ); // // Subtle race condition: if the source completes before we reach the line below, the SingleAssigmentDisposable // will already have been disposed. Upon assignment, the disposable resource being set will be disposed on the // spot, which may throw an exception. (See TFS 487142) // try { // // [OK] Use of unsafe Subscribe: we're catching the exception here to set the TaskCompletionSource. // // Notice we could use a safe subscription to route errors through OnError, but we still need the // exception handling logic here for the reason explained above. We cannot afford to throw here // and as a result never set the TaskCompletionSource, so we tunnel everything through here. // subscription.Disposable = source.Subscribe/*Unsafe*/(taskCompletionObserver); } catch (Exception ex) { tcs.TrySetException(ex); } } return tcs.Task; } #endregion #region + Case + public virtual IObservable Case(Func selector, IDictionary> sources) { return Case(selector, sources, Empty()); } public virtual IObservable Case(Func selector, IDictionary> sources, IScheduler scheduler) { return Case(selector, sources, Empty(scheduler)); } public virtual IObservable Case(Func selector, IDictionary> sources, IObservable defaultSource) { return new Case(selector, sources, defaultSource); } #endregion #region + DoWhile + public virtual IObservable DoWhile(IObservable source, Func condition) { return new DoWhile(source, condition); } #endregion #region + For + public virtual IObservable For(IEnumerable source, Func> resultSelector) { return new For(source, resultSelector); } #endregion #region + If + public virtual IObservable If(Func condition, IObservable thenSource) { return If(condition, thenSource, Empty()); } public virtual IObservable If(Func condition, IObservable thenSource, IScheduler scheduler) { return If(condition, thenSource, Empty(scheduler)); } public virtual IObservable If(Func condition, IObservable thenSource, IObservable elseSource) { return new If(condition, thenSource, elseSource); } #endregion #region + While + public virtual IObservable While(Func condition, IObservable source) { return new While(condition, source); } #endregion } }