123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Reactive.Disposables;
- using System.Reactive.Joins;
- namespace System.Reactive.Linq
- {
- internal partial class QueryLanguage
- {
- #region And
- public virtual Pattern<TLeft, TRight> And<TLeft, TRight>(IObservable<TLeft> left, IObservable<TRight> right)
- {
- return new Pattern<TLeft, TRight>(left, right);
- }
- #endregion
- #region Then
- public virtual Plan<TResult> Then<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
- {
- return new Pattern<TSource>(source).Then(selector);
- }
- #endregion
- #region When
- public virtual IObservable<TResult> When<TResult>(params Plan<TResult>[] plans)
- {
- return When((IEnumerable<Plan<TResult>>)plans);
- }
- public virtual IObservable<TResult> When<TResult>(IEnumerable<Plan<TResult>> plans)
- {
- return new AnonymousObservable<TResult>(observer =>
- {
- var externalSubscriptions = new Dictionary<object, IJoinObserver>();
- var gate = new object();
- var activePlans = new List<ActivePlan>();
- var outObserver = Observer.Create<TResult>(observer.OnNext,
- exception =>
- {
- foreach (var po in externalSubscriptions.Values)
- {
- po.Dispose();
- }
- observer.OnError(exception);
- },
- observer.OnCompleted);
- try
- {
- foreach (var plan in plans)
- activePlans.Add(plan.Activate(externalSubscriptions, outObserver,
- activePlan =>
- {
- activePlans.Remove(activePlan);
- if (activePlans.Count == 0)
- outObserver.OnCompleted();
- }));
- }
- catch (Exception e)
- {
- //
- // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
- //
- return Throw<TResult>(e).Subscribe/*Unsafe*/(observer);
- }
- var group = new CompositeDisposable(externalSubscriptions.Values.Count);
- foreach (var joinObserver in externalSubscriptions.Values)
- {
- joinObserver.Subscribe(gate);
- group.Add(joinObserver);
- }
- return group;
- });
- }
- #endregion
- }
- }
|