Observers.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive
  5. {
  6. internal sealed class NopObserver<T> : IObserver<T>
  7. {
  8. public static readonly IObserver<T> Instance = new NopObserver<T>();
  9. public void OnCompleted() { }
  10. public void OnError(Exception error) { }
  11. public void OnNext(T value) { }
  12. }
  13. internal sealed class DoneObserver<T> : IObserver<T>
  14. {
  15. public static readonly IObserver<T> Completed = new DoneObserver<T>();
  16. public Exception Exception { get; set; }
  17. public void OnCompleted() { }
  18. public void OnError(Exception error) { }
  19. public void OnNext(T value) { }
  20. }
  21. internal sealed class DisposedObserver<T> : IObserver<T>
  22. {
  23. public static readonly IObserver<T> Instance = new DisposedObserver<T>();
  24. public void OnCompleted()
  25. {
  26. throw new ObjectDisposedException("");
  27. }
  28. public void OnError(Exception error)
  29. {
  30. throw new ObjectDisposedException("");
  31. }
  32. public void OnNext(T value)
  33. {
  34. throw new ObjectDisposedException("");
  35. }
  36. }
  37. internal sealed class Observer<T> : IObserver<T>
  38. {
  39. private readonly ImmutableList<IObserver<T>> _observers;
  40. public Observer(ImmutableList<IObserver<T>> observers)
  41. {
  42. _observers = observers;
  43. }
  44. public void OnCompleted()
  45. {
  46. foreach (var observer in _observers.Data)
  47. {
  48. observer.OnCompleted();
  49. }
  50. }
  51. public void OnError(Exception error)
  52. {
  53. foreach (var observer in _observers.Data)
  54. {
  55. observer.OnError(error);
  56. }
  57. }
  58. public void OnNext(T value)
  59. {
  60. foreach (var observer in _observers.Data)
  61. {
  62. observer.OnNext(value);
  63. }
  64. }
  65. internal IObserver<T> Add(IObserver<T> observer) => new Observer<T>(_observers.Add(observer));
  66. internal IObserver<T> Remove(IObserver<T> observer)
  67. {
  68. var i = Array.IndexOf(_observers.Data, observer);
  69. if (i < 0)
  70. {
  71. return this;
  72. }
  73. if (_observers.Data.Length == 2)
  74. {
  75. return _observers.Data[1 - i];
  76. }
  77. else
  78. {
  79. return new Observer<T>(_observers.Remove(observer));
  80. }
  81. }
  82. }
  83. }