Do.cs 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Do<TSource> : Producer<TSource>
  9. {
  10. private readonly IObservable<TSource> _source;
  11. private readonly Action<TSource> _onNext;
  12. private readonly Action<Exception> _onError;
  13. private readonly Action _onCompleted;
  14. public Do(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
  15. {
  16. _source = source;
  17. _onNext = onNext;
  18. _onError = onError;
  19. _onCompleted = onCompleted;
  20. }
  21. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  22. {
  23. var sink = new _(this, observer, cancel);
  24. setSink(sink);
  25. return _source.SubscribeSafe(sink);
  26. }
  27. class _ : Sink<TSource>, IObserver<TSource>
  28. {
  29. private readonly Do<TSource> _parent;
  30. public _(Do<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  31. : base(observer, cancel)
  32. {
  33. _parent = parent;
  34. }
  35. public void OnNext(TSource value)
  36. {
  37. try
  38. {
  39. _parent._onNext(value);
  40. }
  41. catch (Exception ex)
  42. {
  43. base._observer.OnError(ex);
  44. base.Dispose();
  45. return;
  46. }
  47. base._observer.OnNext(value);
  48. }
  49. public void OnError(Exception error)
  50. {
  51. try
  52. {
  53. _parent._onError(error);
  54. }
  55. catch (Exception ex)
  56. {
  57. base._observer.OnError(ex);
  58. base.Dispose();
  59. return;
  60. }
  61. base._observer.OnError(error);
  62. base.Dispose();
  63. }
  64. public void OnCompleted()
  65. {
  66. try
  67. {
  68. _parent._onCompleted();
  69. }
  70. catch (Exception ex)
  71. {
  72. base._observer.OnError(ex);
  73. base.Dispose();
  74. return;
  75. }
  76. base._observer.OnCompleted();
  77. base.Dispose();
  78. }
  79. }
  80. }
  81. }
  82. #endif