PushToPullAdapter.cs 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections;
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.Observαble
  7. {
  8. abstract class PushToPullAdapter<TSource, TResult> : IEnumerable<TResult>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. public PushToPullAdapter(IObservable<TSource> source)
  12. {
  13. _source = source;
  14. }
  15. IEnumerator IEnumerable.GetEnumerator()
  16. {
  17. return GetEnumerator();
  18. }
  19. public IEnumerator<TResult> GetEnumerator()
  20. {
  21. var d = new SingleAssignmentDisposable();
  22. var res = Run(d);
  23. d.Disposable = _source.SubscribeSafe(res);
  24. return res;
  25. }
  26. protected abstract PushToPullSink<TSource, TResult> Run(IDisposable subscription);
  27. }
  28. abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>, IDisposable
  29. {
  30. private readonly IDisposable _subscription;
  31. public PushToPullSink(IDisposable subscription)
  32. {
  33. _subscription = subscription;
  34. }
  35. public abstract void OnNext(TSource value);
  36. public abstract void OnError(Exception error);
  37. public abstract void OnCompleted();
  38. public abstract bool TryMoveNext(out TResult current);
  39. private bool _done;
  40. public bool MoveNext()
  41. {
  42. if (!_done)
  43. {
  44. var current = default(TResult);
  45. if (TryMoveNext(out current))
  46. {
  47. Current = current;
  48. return true;
  49. }
  50. else
  51. {
  52. _done = true;
  53. _subscription.Dispose();
  54. }
  55. }
  56. return false;
  57. }
  58. public TResult Current
  59. {
  60. get;
  61. private set;
  62. }
  63. object IEnumerator.Current
  64. {
  65. get { return Current; }
  66. }
  67. public void Reset()
  68. {
  69. throw new NotSupportedException();
  70. }
  71. public void Dispose()
  72. {
  73. _subscription.Dispose();
  74. }
  75. }
  76. }