1
0

Take.cs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Take<TSource> : Producer<TSource>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly int _count;
  12. private readonly TimeSpan _duration;
  13. internal readonly IScheduler _scheduler;
  14. public Take(IObservable<TSource> source, int count)
  15. {
  16. _source = source;
  17. _count = count;
  18. }
  19. public Take(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)
  20. {
  21. _source = source;
  22. _duration = duration;
  23. _scheduler = scheduler;
  24. }
  25. public IObservable<TSource> Omega(int count)
  26. {
  27. //
  28. // Minimum semantics:
  29. //
  30. // xs --o--o--o--o--o--o--| xs --o--o--o--o--o--o--|
  31. // xs.Take(5) --o--o--o--o--o| xs.Take(3) --o--o--o|
  32. // xs.Take(5).Take(3) --o--o--o| xs.Take(3).Take(5) --o--o--o|
  33. //
  34. if (_count <= count)
  35. return this;
  36. else
  37. return new Take<TSource>(_source, count);
  38. }
  39. public IObservable<TSource> Omega(TimeSpan duration)
  40. {
  41. //
  42. // Minimum semantics:
  43. //
  44. // t 0--1--2--3--4--5--6--7-> t 0--1--2--3--4--5--6--7->
  45. //
  46. // xs --o--o--o--o--o--o--| xs --o--o--o--o--o--o--|
  47. // xs.Take(5s) --o--o--o--o--o| xs.Take(3s) --o--o--o|
  48. // xs.Take(5s).Take(3s) --o--o--o| xs.Take(3s).Take(5s) --o--o--o|
  49. //
  50. if (_duration <= duration)
  51. return this;
  52. else
  53. return new Take<TSource>(_source, duration, _scheduler);
  54. }
  55. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  56. {
  57. if (_scheduler == null)
  58. {
  59. var sink = new _(this, observer, cancel);
  60. setSink(sink);
  61. return _source.SubscribeSafe(sink);
  62. }
  63. else
  64. {
  65. var sink = new TakeImpl(this, observer, cancel);
  66. setSink(sink);
  67. return sink.Run();
  68. }
  69. }
  70. class _ : Sink<TSource>, IObserver<TSource>
  71. {
  72. private readonly Take<TSource> _parent;
  73. private int _remaining;
  74. public _(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  75. : base(observer, cancel)
  76. {
  77. _parent = parent;
  78. _remaining = _parent._count;
  79. }
  80. public void OnNext(TSource value)
  81. {
  82. if (_remaining > 0)
  83. {
  84. --_remaining;
  85. base._observer.OnNext(value);
  86. if (_remaining == 0)
  87. {
  88. base._observer.OnCompleted();
  89. base.Dispose();
  90. }
  91. }
  92. }
  93. public void OnError(Exception error)
  94. {
  95. base._observer.OnError(error);
  96. base.Dispose();
  97. }
  98. public void OnCompleted()
  99. {
  100. base._observer.OnCompleted();
  101. base.Dispose();
  102. }
  103. }
  104. class TakeImpl : Sink<TSource>, IObserver<TSource>
  105. {
  106. private readonly Take<TSource> _parent;
  107. public TakeImpl(Take<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  108. : base(observer, cancel)
  109. {
  110. _parent = parent;
  111. }
  112. private object _gate;
  113. public IDisposable Run()
  114. {
  115. _gate = new object();
  116. var t = _parent._scheduler.Schedule(_parent._duration, Tick);
  117. var d = _parent._source.SubscribeSafe(this);
  118. return new CompositeDisposable(t, d);
  119. }
  120. private void Tick()
  121. {
  122. lock (_gate)
  123. {
  124. base._observer.OnCompleted();
  125. base.Dispose();
  126. }
  127. }
  128. public void OnNext(TSource value)
  129. {
  130. lock (_gate)
  131. {
  132. base._observer.OnNext(value);
  133. }
  134. }
  135. public void OnError(Exception error)
  136. {
  137. lock (_gate)
  138. {
  139. base._observer.OnError(error);
  140. base.Dispose();
  141. }
  142. }
  143. public void OnCompleted()
  144. {
  145. lock (_gate)
  146. {
  147. base._observer.OnCompleted();
  148. base.Dispose();
  149. }
  150. }
  151. }
  152. }
  153. }
  154. #endif