GetEnumerator.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Diagnostics;
  7. using System.Reactive.Disposables;
  8. using System.Threading;
  9. namespace System.Reactive.Linq.ObservableImpl
  10. {
  11. internal sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
  12. {
  13. private readonly ConcurrentQueue<TSource> _queue;
  14. private TSource _current;
  15. private Exception _error;
  16. private bool _done;
  17. private bool _disposed;
  18. private readonly SemaphoreSlim _gate;
  19. private readonly SingleAssignmentDisposable _subscription;
  20. public GetEnumerator()
  21. {
  22. _queue = new ConcurrentQueue<TSource>();
  23. _gate = new SemaphoreSlim(0);
  24. _subscription = new SingleAssignmentDisposable();
  25. }
  26. public IEnumerator<TSource> Run(IObservable<TSource> source)
  27. {
  28. //
  29. // [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
  30. //
  31. _subscription.Disposable = source.Subscribe/*Unsafe*/(this);
  32. return this;
  33. }
  34. public void OnNext(TSource value)
  35. {
  36. _queue.Enqueue(value);
  37. _gate.Release();
  38. }
  39. public void OnError(Exception error)
  40. {
  41. _error = error;
  42. _subscription.Dispose();
  43. _gate.Release();
  44. }
  45. public void OnCompleted()
  46. {
  47. _done = true;
  48. _subscription.Dispose();
  49. _gate.Release();
  50. }
  51. public bool MoveNext()
  52. {
  53. _gate.Wait();
  54. if (_disposed)
  55. throw new ObjectDisposedException("");
  56. if (_queue.TryDequeue(out _current))
  57. return true;
  58. _error.ThrowIfNotNull();
  59. Debug.Assert(_done);
  60. _gate.Release(); // In the (rare) case the user calls MoveNext again we shouldn't block!
  61. return false;
  62. }
  63. public TSource Current
  64. {
  65. get { return _current; }
  66. }
  67. object Collections.IEnumerator.Current
  68. {
  69. get { return _current; }
  70. }
  71. public void Dispose()
  72. {
  73. _subscription.Dispose();
  74. _disposed = true;
  75. _gate.Release();
  76. }
  77. public void Reset()
  78. {
  79. throw new NotSupportedException();
  80. }
  81. }
  82. }