1
0

TimeInterval.cs 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Diagnostics;
  7. using System.Reactive.Concurrency;
  8. using System.Reactive.Disposables;
  9. using System.Threading;
  10. namespace System.Reactive.Linq.ObservableImpl
  11. {
  12. class TimeInterval<TSource> : Producer<System.Reactive.TimeInterval<TSource>>
  13. {
  14. private readonly IObservable<TSource> _source;
  15. private readonly IScheduler _scheduler;
  16. public TimeInterval(IObservable<TSource> source, IScheduler scheduler)
  17. {
  18. _source = source;
  19. _scheduler = scheduler;
  20. }
  21. protected override IDisposable Run(IObserver<System.Reactive.TimeInterval<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)
  22. {
  23. var sink = new _(this, observer, cancel);
  24. setSink(sink);
  25. return sink.Run();
  26. }
  27. class _ : Sink<System.Reactive.TimeInterval<TSource>>, IObserver<TSource>
  28. {
  29. private readonly TimeInterval<TSource> _parent;
  30. public _(TimeInterval<TSource> parent, IObserver<System.Reactive.TimeInterval<TSource>> observer, IDisposable cancel)
  31. : base(observer, cancel)
  32. {
  33. _parent = parent;
  34. }
  35. private IStopwatch _watch;
  36. private TimeSpan _last;
  37. public IDisposable Run()
  38. {
  39. _watch = _parent._scheduler.StartStopwatch();
  40. _last = TimeSpan.Zero;
  41. return _parent._source.Subscribe(this);
  42. }
  43. public void OnNext(TSource value)
  44. {
  45. var now = _watch.Elapsed;
  46. var span = now.Subtract(_last);
  47. _last = now;
  48. base._observer.OnNext(new System.Reactive.TimeInterval<TSource>(value, span));
  49. }
  50. public void OnError(Exception error)
  51. {
  52. base._observer.OnError(error);
  53. Dispose();
  54. }
  55. public void OnCompleted()
  56. {
  57. base._observer.OnCompleted();
  58. Dispose();
  59. }
  60. }
  61. }
  62. }
  63. #endif