1
0

All.cs 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. class All<TSource> : Producer<bool>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. private readonly Func<TSource, bool> _predicate;
  10. public All(IObservable<TSource> source, Func<TSource, bool> predicate)
  11. {
  12. _source = source;
  13. _predicate = predicate;
  14. }
  15. protected override IDisposable Run(IObserver<bool> observer, IDisposable cancel, Action<IDisposable> setSink)
  16. {
  17. var sink = new _(this, observer, cancel);
  18. setSink(sink);
  19. return _source.SubscribeSafe(sink);
  20. }
  21. class _ : Sink<bool>, IObserver<TSource>
  22. {
  23. private readonly All<TSource> _parent;
  24. public _(All<TSource> parent, IObserver<bool> observer, IDisposable cancel)
  25. : base(observer, cancel)
  26. {
  27. _parent = parent;
  28. }
  29. public void OnNext(TSource value)
  30. {
  31. var res = false;
  32. try
  33. {
  34. res = _parent._predicate(value);
  35. }
  36. catch (Exception ex)
  37. {
  38. base._observer.OnError(ex);
  39. base.Dispose();
  40. return;
  41. }
  42. if (!res)
  43. {
  44. base._observer.OnNext(false);
  45. base._observer.OnCompleted();
  46. base.Dispose();
  47. }
  48. }
  49. public void OnError(Exception error)
  50. {
  51. base._observer.OnError(error);
  52. base.Dispose();
  53. }
  54. public void OnCompleted()
  55. {
  56. base._observer.OnNext(true);
  57. base._observer.OnCompleted();
  58. base.Dispose();
  59. }
  60. }
  61. }
  62. }
  63. #endif