OrderedProducer.cs 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Diagnostics;
  5. using System.Linq;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. namespace System.Reactive
  10. {
  11. abstract class OrderedProducer<TSource> : Producer<TSource>
  12. {
  13. internal readonly IObservable<TSource> _source;
  14. private readonly OrderedProducer<TSource> _previous;
  15. protected OrderedProducer(IObservable<TSource> source, OrderedProducer<TSource> previous)
  16. {
  17. _source = source;
  18. _previous = previous;
  19. }
  20. protected abstract SortSink Sort(IObserver<TSource> observer, IDisposable cancel);
  21. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  22. {
  23. var sink = Sort(observer, cancel);
  24. setSink(sink);
  25. var disposables = new CompositeDisposable();
  26. var p = _previous;
  27. while (p != null)
  28. {
  29. // p.Sort may return the same sink reference that was passed to it, so we need to ensure that initialization only occurs once
  30. if (!sink._initialized)
  31. disposables.Add(sink.InitializeAndSet());
  32. sink = p.Sort(sink, Disposable.Empty);
  33. p = p._previous;
  34. }
  35. if (disposables.Count == 0)
  36. {
  37. Debug.Assert(!sink._initialized);
  38. var d = sink.InitializeAndSet();
  39. sink.Run(_source);
  40. return d;
  41. }
  42. else
  43. {
  44. if (!sink._initialized)
  45. disposables.Add(sink.InitializeAndSet());
  46. sink.Run(_source);
  47. return new CompositeDisposable(disposables.Reverse());
  48. }
  49. }
  50. protected abstract class SortSink : Sink<TSource>, IObserver<TSource>
  51. {
  52. internal bool _initialized;
  53. protected SortSink(IObserver<TSource> observer, IDisposable cancel)
  54. : base(observer, cancel)
  55. {
  56. }
  57. internal IDisposable InitializeAndSet()
  58. {
  59. _initialized = true;
  60. return Initialize();
  61. }
  62. public abstract IDisposable Initialize();
  63. public abstract void Run(IObservable<TSource> source);
  64. public abstract void OnNext(TSource value);
  65. public abstract void OnError(Exception error);
  66. public abstract void OnCompleted();
  67. }
  68. }
  69. }