Count.cs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.Observαble
  5. {
  6. class Count<TSource> : Producer<int>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. private readonly Func<TSource, bool> _predicate;
  10. public Count(IObservable<TSource> source)
  11. {
  12. _source = source;
  13. }
  14. public Count(IObservable<TSource> source, Func<TSource, bool> predicate)
  15. {
  16. _source = source;
  17. _predicate = predicate;
  18. }
  19. protected override IDisposable Run(IObserver<int> observer, IDisposable cancel, Action<IDisposable> setSink)
  20. {
  21. if (_predicate == null)
  22. {
  23. var sink = new _(observer, cancel);
  24. setSink(sink);
  25. return _source.SubscribeSafe(sink);
  26. }
  27. else
  28. {
  29. var sink = new π(this, observer, cancel);
  30. setSink(sink);
  31. return _source.SubscribeSafe(sink);
  32. }
  33. }
  34. class _ : Sink<int>, IObserver<TSource>
  35. {
  36. private int _count;
  37. public _(IObserver<int> observer, IDisposable cancel)
  38. : base(observer, cancel)
  39. {
  40. _count = 0;
  41. }
  42. public void OnNext(TSource value)
  43. {
  44. try
  45. {
  46. checked
  47. {
  48. _count++;
  49. }
  50. }
  51. catch (Exception ex)
  52. {
  53. base._observer.OnError(ex);
  54. base.Dispose();
  55. }
  56. }
  57. public void OnError(Exception error)
  58. {
  59. base._observer.OnError(error);
  60. base.Dispose();
  61. }
  62. public void OnCompleted()
  63. {
  64. base._observer.OnNext(_count);
  65. base._observer.OnCompleted();
  66. base.Dispose();
  67. }
  68. }
  69. class π : Sink<int>, IObserver<TSource>
  70. {
  71. private readonly Count<TSource> _parent;
  72. private int _count;
  73. public π(Count<TSource> parent, IObserver<int> observer, IDisposable cancel)
  74. : base(observer, cancel)
  75. {
  76. _parent = parent;
  77. _count = 0;
  78. }
  79. public void OnNext(TSource value)
  80. {
  81. try
  82. {
  83. checked
  84. {
  85. if (_parent._predicate(value))
  86. _count++;
  87. }
  88. }
  89. catch (Exception ex)
  90. {
  91. base._observer.OnError(ex);
  92. base.Dispose();
  93. }
  94. }
  95. public void OnError(Exception error)
  96. {
  97. base._observer.OnError(error);
  98. base.Dispose();
  99. }
  100. public void OnCompleted()
  101. {
  102. base._observer.OnNext(_count);
  103. base._observer.OnCompleted();
  104. base.Dispose();
  105. }
  106. }
  107. }
  108. }
  109. #endif