1
0

Where.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. class Where<TSource> : Producer<TSource>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. private readonly Func<TSource, bool> _predicate;
  10. private readonly Func<TSource, int, bool> _predicateI;
  11. public Where(IObservable<TSource> source, Func<TSource, bool> predicate)
  12. {
  13. _source = source;
  14. _predicate = predicate;
  15. }
  16. public Where(IObservable<TSource> source, Func<TSource, int, bool> predicate)
  17. {
  18. _source = source;
  19. _predicateI = predicate;
  20. }
  21. public IObservable<TSource> Omega(Func<TSource, bool> predicate)
  22. {
  23. if (_predicate != null)
  24. return new Where<TSource>(_source, x => _predicate(x) && predicate(x));
  25. else
  26. return new Where<TSource>(this, predicate);
  27. }
  28. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  29. {
  30. if (_predicate != null)
  31. {
  32. var sink = new _(this, observer, cancel);
  33. setSink(sink);
  34. return _source.SubscribeSafe(sink);
  35. }
  36. else
  37. {
  38. var sink = new WhereImpl(this, observer, cancel);
  39. setSink(sink);
  40. return _source.SubscribeSafe(sink);
  41. }
  42. }
  43. class _ : Sink<TSource>, IObserver<TSource>
  44. {
  45. private readonly Where<TSource> _parent;
  46. public _(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  47. : base(observer, cancel)
  48. {
  49. _parent = parent;
  50. }
  51. public void OnNext(TSource value)
  52. {
  53. var shouldRun = default(bool);
  54. try
  55. {
  56. shouldRun = _parent._predicate(value);
  57. }
  58. catch (Exception exception)
  59. {
  60. base._observer.OnError(exception);
  61. base.Dispose();
  62. return;
  63. }
  64. if (shouldRun)
  65. base._observer.OnNext(value);
  66. }
  67. public void OnError(Exception error)
  68. {
  69. base._observer.OnError(error);
  70. base.Dispose();
  71. }
  72. public void OnCompleted()
  73. {
  74. base._observer.OnCompleted();
  75. base.Dispose();
  76. }
  77. }
  78. class WhereImpl : Sink<TSource>, IObserver<TSource>
  79. {
  80. private readonly Where<TSource> _parent;
  81. private int _index;
  82. public WhereImpl(Where<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  83. : base(observer, cancel)
  84. {
  85. _parent = parent;
  86. _index = 0;
  87. }
  88. public void OnNext(TSource value)
  89. {
  90. var shouldRun = default(bool);
  91. try
  92. {
  93. shouldRun = _parent._predicateI(value, checked(_index++));
  94. }
  95. catch (Exception exception)
  96. {
  97. base._observer.OnError(exception);
  98. base.Dispose();
  99. return;
  100. }
  101. if (shouldRun)
  102. base._observer.OnNext(value);
  103. }
  104. public void OnError(Exception error)
  105. {
  106. base._observer.OnError(error);
  107. base.Dispose();
  108. }
  109. public void OnCompleted()
  110. {
  111. base._observer.OnCompleted();
  112. base.Dispose();
  113. }
  114. }
  115. }
  116. }
  117. #endif