1
0

ForEach.cs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class ForEach<TSource>
  8. {
  9. public sealed class Observer : IObserver<TSource>
  10. {
  11. private readonly Action<TSource> _onNext;
  12. private readonly Action _done;
  13. private Exception? _exception;
  14. private int _stopped;
  15. public Observer(Action<TSource> onNext, Action done)
  16. {
  17. _onNext = onNext;
  18. _done = done;
  19. }
  20. public Exception? Error => _exception;
  21. public void OnNext(TSource value)
  22. {
  23. if (_stopped == 0)
  24. {
  25. try
  26. {
  27. _onNext(value);
  28. }
  29. catch (Exception ex)
  30. {
  31. OnError(ex);
  32. }
  33. }
  34. }
  35. public void OnError(Exception error)
  36. {
  37. if (Interlocked.Exchange(ref _stopped, 1) == 0)
  38. {
  39. _exception = error;
  40. _done();
  41. }
  42. }
  43. public void OnCompleted()
  44. {
  45. if (Interlocked.Exchange(ref _stopped, 1) == 0)
  46. {
  47. _done();
  48. }
  49. }
  50. }
  51. public sealed class ObserverIndexed : IObserver<TSource>
  52. {
  53. private readonly Action<TSource, int> _onNext;
  54. private readonly Action _done;
  55. private int _index;
  56. private Exception? _exception;
  57. private int _stopped;
  58. public ObserverIndexed(Action<TSource, int> onNext, Action done)
  59. {
  60. _onNext = onNext;
  61. _done = done;
  62. }
  63. public Exception? Error => _exception;
  64. public void OnNext(TSource value)
  65. {
  66. if (_stopped == 0)
  67. {
  68. try
  69. {
  70. _onNext(value, checked(_index++));
  71. }
  72. catch (Exception ex)
  73. {
  74. OnError(ex);
  75. }
  76. }
  77. }
  78. public void OnError(Exception error)
  79. {
  80. if (Interlocked.Exchange(ref _stopped, 1) == 0)
  81. {
  82. _exception = error;
  83. _done();
  84. }
  85. }
  86. public void OnCompleted()
  87. {
  88. if (Interlocked.Exchange(ref _stopped, 1) == 0)
  89. {
  90. _done();
  91. }
  92. }
  93. }
  94. }
  95. }