PushToPullAdapter.cs 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections;
  5. using System.Collections.Generic;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. internal abstract class PushToPullAdapter<TSource, TResult> : IEnumerable<TResult>
  10. {
  11. private readonly IObservable<TSource> _source;
  12. protected PushToPullAdapter(IObservable<TSource> source)
  13. {
  14. _source = source;
  15. }
  16. IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
  17. public IEnumerator<TResult> GetEnumerator()
  18. {
  19. var res = Run();
  20. res.SetUpstream(_source.SubscribeSafe(res));
  21. return res;
  22. }
  23. protected abstract PushToPullSink<TSource, TResult> Run();
  24. }
  25. internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>
  26. {
  27. private IDisposable _upstream;
  28. public abstract void OnNext(TSource value);
  29. public abstract void OnError(Exception error);
  30. public abstract void OnCompleted();
  31. public abstract bool TryMoveNext(out TResult current);
  32. private bool _done;
  33. public bool MoveNext()
  34. {
  35. if (!_done)
  36. {
  37. if (TryMoveNext(out var current))
  38. {
  39. Current = current;
  40. return true;
  41. }
  42. _done = true;
  43. Dispose();
  44. }
  45. return false;
  46. }
  47. public TResult Current
  48. {
  49. get;
  50. private set;
  51. }
  52. object IEnumerator.Current => Current;
  53. public void Reset()
  54. {
  55. throw new NotSupportedException();
  56. }
  57. public void Dispose()
  58. {
  59. Disposable.TryDispose(ref _upstream);
  60. }
  61. public void SetUpstream(IDisposable d)
  62. {
  63. Disposable.SetSingle(ref _upstream, d);
  64. }
  65. }
  66. }