1
0

DelaySubscription.cs 2.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Reactive.Concurrency;
  5. namespace System.Reactive.Linq.Observαble
  6. {
  7. class DelaySubscription<TSource> : Producer<TSource>
  8. {
  9. private readonly IObservable<TSource> _source;
  10. private readonly DateTimeOffset? _dueTimeA;
  11. private readonly TimeSpan? _dueTimeR;
  12. private readonly IScheduler _scheduler;
  13. public DelaySubscription(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)
  14. {
  15. _source = source;
  16. _dueTimeA = dueTime;
  17. _scheduler = scheduler;
  18. }
  19. public DelaySubscription(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)
  20. {
  21. _source = source;
  22. _dueTimeR = dueTime;
  23. _scheduler = scheduler;
  24. }
  25. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  26. {
  27. var sink = new _(observer, cancel);
  28. setSink(sink);
  29. if (_dueTimeA.HasValue)
  30. {
  31. return _scheduler.Schedule(sink, _dueTimeA.Value, Subscribe);
  32. }
  33. else
  34. {
  35. return _scheduler.Schedule(sink, _dueTimeR.Value, Subscribe);
  36. }
  37. }
  38. private IDisposable Subscribe(IScheduler _, _ sink)
  39. {
  40. return _source.SubscribeSafe(sink);
  41. }
  42. class _ : Sink<TSource>, IObserver<TSource>
  43. {
  44. public _(IObserver<TSource> observer, IDisposable cancel)
  45. : base(observer, cancel)
  46. {
  47. }
  48. public void OnNext(TSource value)
  49. {
  50. base._observer.OnNext(value);
  51. }
  52. public void OnError(Exception error)
  53. {
  54. base._observer.OnError(error);
  55. base.Dispose();
  56. }
  57. public void OnCompleted()
  58. {
  59. base._observer.OnCompleted();
  60. base.Dispose();
  61. }
  62. }
  63. }
  64. }
  65. #endif