TakeUntilPredicate.cs 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. /// <summary>
  7. /// Relays items to the downstream until the predicate returns <code>true</code>.
  8. /// </summary>
  9. /// <typeparam name="TSource">The element type of the sequence</typeparam>
  10. internal sealed class TakeUntilPredicate<TSource> :
  11. Producer<TSource, TakeUntilPredicate<TSource>.TakeUntilPredicateObserver>
  12. {
  13. readonly IObservable<TSource> _source;
  14. readonly Func<TSource, bool> _stopPredicate;
  15. public TakeUntilPredicate(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
  16. {
  17. _source = source;
  18. _stopPredicate = stopPredicate;
  19. }
  20. protected override TakeUntilPredicateObserver CreateSink(IObserver<TSource> observer) => new TakeUntilPredicateObserver(observer, _stopPredicate);
  21. protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source);
  22. internal sealed class TakeUntilPredicateObserver : IdentitySink<TSource>
  23. {
  24. readonly Func<TSource, bool> _stopPredicate;
  25. public TakeUntilPredicateObserver(IObserver<TSource> downstream,
  26. Func<TSource, bool> predicate) : base (downstream)
  27. {
  28. _stopPredicate = predicate;
  29. }
  30. public override void OnCompleted()
  31. {
  32. ForwardOnCompleted();
  33. }
  34. public override void OnError(Exception error)
  35. {
  36. ForwardOnError(error);
  37. }
  38. public override void OnNext(TSource value)
  39. {
  40. ForwardOnNext(value);
  41. var shouldStop = false;
  42. try
  43. {
  44. shouldStop = _stopPredicate(value);
  45. }
  46. catch (Exception ex)
  47. {
  48. ForwardOnError(ex);
  49. return;
  50. }
  51. if (shouldStop)
  52. {
  53. ForwardOnCompleted();
  54. }
  55. }
  56. }
  57. }
  58. }