GetEnumerator.cs 2.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Reactive.Disposables;
  8. using System.Threading;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. internal sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
  12. {
  13. private readonly ConcurrentQueue<TSource> _queue;
  14. private TSource? _current;
  15. private Exception? _error;
  16. private bool _done;
  17. private bool _disposed;
  18. private SingleAssignmentDisposableValue _subscription;
  19. private readonly SemaphoreSlim _gate;
  20. public GetEnumerator()
  21. {
  22. _queue = new ConcurrentQueue<TSource>();
  23. _gate = new SemaphoreSlim(0);
  24. }
  25. public IEnumerator<TSource> Run(IObservable<TSource> source)
  26. {
  27. //
  28. // [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
  29. //
  30. _subscription.Disposable = source.Subscribe/*Unsafe*/(this);
  31. return this;
  32. }
  33. public void OnNext(TSource value)
  34. {
  35. _queue.Enqueue(value);
  36. _gate.Release();
  37. }
  38. public void OnError(Exception error)
  39. {
  40. _error = error;
  41. _subscription.Dispose();
  42. _gate.Release();
  43. }
  44. public void OnCompleted()
  45. {
  46. _done = true;
  47. _subscription.Dispose();
  48. _gate.Release();
  49. }
  50. public bool MoveNext()
  51. {
  52. _gate.Wait();
  53. if (_disposed)
  54. {
  55. throw new ObjectDisposedException("");
  56. }
  57. if (_queue.TryDequeue(out _current))
  58. {
  59. return true;
  60. }
  61. _error?.Throw();
  62. Debug.Assert(_done);
  63. _gate.Release(); // In the (rare) case the user calls MoveNext again we shouldn't block!
  64. return false;
  65. }
  66. public TSource Current => _current!; // NB: Only called after MoveNext returns true and assigns a value.
  67. object Collections.IEnumerator.Current => _current!;
  68. public void Dispose()
  69. {
  70. _subscription.Dispose();
  71. _disposed = true;
  72. _gate.Release();
  73. }
  74. public void Reset()
  75. {
  76. throw new NotSupportedException();
  77. }
  78. }
  79. }