If.cs 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class If<TResult> : Producer<TResult, If<TResult>._>, IEvaluatableObservable<TResult>
  8. {
  9. private readonly Func<bool> _condition;
  10. private readonly IObservable<TResult> _thenSource;
  11. private readonly IObservable<TResult> _elseSource;
  12. public If(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)
  13. {
  14. _condition = condition;
  15. _thenSource = thenSource;
  16. _elseSource = elseSource;
  17. }
  18. public IObservable<TResult> Eval() => _condition() ? _thenSource : _elseSource;
  19. protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
  20. protected override void Run(_ sink) => sink.Run();
  21. internal sealed class _ : IdentitySink<TResult>
  22. {
  23. private readonly If<TResult> _parent;
  24. public _(If<TResult> parent, IObserver<TResult> observer)
  25. : base(observer)
  26. {
  27. _parent = parent;
  28. }
  29. public void Run()
  30. {
  31. var result = default(IObservable<TResult>);
  32. try
  33. {
  34. result = _parent.Eval();
  35. }
  36. catch (Exception exception)
  37. {
  38. ForwardOnError(exception);
  39. return;
  40. }
  41. SetUpstream(result.SubscribeSafe(this));
  42. }
  43. }
  44. }
  45. }