GetEnumerator.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF && !NO_CDS
  5. using System;
  6. using System.Collections.Concurrent;
  7. using System.Collections.Generic;
  8. using System.Diagnostics;
  9. using System.Reactive.Disposables;
  10. using System.Threading;
  11. namespace System.Reactive.Linq.ObservableImpl
  12. {
  13. class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
  14. {
  15. private readonly ConcurrentQueue<TSource> _queue;
  16. private TSource _current;
  17. private Exception _error;
  18. private bool _done;
  19. private bool _disposed;
  20. private readonly SemaphoreSlim _gate;
  21. private readonly SingleAssignmentDisposable _subscription;
  22. public GetEnumerator()
  23. {
  24. _queue = new ConcurrentQueue<TSource>();
  25. _gate = new SemaphoreSlim(0);
  26. _subscription = new SingleAssignmentDisposable();
  27. }
  28. public IEnumerator<TSource> Run(IObservable<TSource> source)
  29. {
  30. //
  31. // [OK] Use of unsafe Subscribe: non-pretentious exact mirror with the dual GetEnumerator method.
  32. //
  33. _subscription.Disposable = source.Subscribe/*Unsafe*/(this);
  34. return this;
  35. }
  36. public void OnNext(TSource value)
  37. {
  38. _queue.Enqueue(value);
  39. _gate.Release();
  40. }
  41. public void OnError(Exception error)
  42. {
  43. _error = error;
  44. _subscription.Dispose();
  45. _gate.Release();
  46. }
  47. public void OnCompleted()
  48. {
  49. _done = true;
  50. _subscription.Dispose();
  51. _gate.Release();
  52. }
  53. public bool MoveNext()
  54. {
  55. _gate.Wait();
  56. if (_disposed)
  57. throw new ObjectDisposedException("");
  58. if (_queue.TryDequeue(out _current))
  59. return true;
  60. _error.ThrowIfNotNull();
  61. Debug.Assert(_done);
  62. _gate.Release(); // In the (rare) case the user calls MoveNext again we shouldn't block!
  63. return false;
  64. }
  65. public TSource Current
  66. {
  67. get { return _current; }
  68. }
  69. object Collections.IEnumerator.Current
  70. {
  71. get { return _current; }
  72. }
  73. public void Dispose()
  74. {
  75. _subscription.Dispose();
  76. _disposed = true;
  77. _gate.Release();
  78. }
  79. public void Reset()
  80. {
  81. throw new NotSupportedException();
  82. }
  83. }
  84. }
  85. #endif