OnErrorResumeNext.cs 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. internal sealed class OnErrorResumeNext<TSource> : Producer<TSource>
  8. {
  9. private readonly IEnumerable<IObservable<TSource>> _sources;
  10. public OnErrorResumeNext(IEnumerable<IObservable<TSource>> sources)
  11. {
  12. _sources = sources;
  13. }
  14. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  15. {
  16. var sink = new _(observer, cancel);
  17. setSink(sink);
  18. return sink.Run(_sources);
  19. }
  20. private sealed class _ : TailRecursiveSink<TSource>
  21. {
  22. public _(IObserver<TSource> observer, IDisposable cancel)
  23. : base(observer, cancel)
  24. {
  25. }
  26. protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
  27. {
  28. var oern = source as OnErrorResumeNext<TSource>;
  29. if (oern != null)
  30. return oern._sources;
  31. return null;
  32. }
  33. public override void OnNext(TSource value)
  34. {
  35. base._observer.OnNext(value);
  36. }
  37. public override void OnError(Exception error)
  38. {
  39. _recurse();
  40. }
  41. public override void OnCompleted()
  42. {
  43. _recurse();
  44. }
  45. protected override bool Fail(Exception error)
  46. {
  47. //
  48. // Note that the invocation of _recurse in OnError will
  49. // cause the next MoveNext operation to be enqueued, so
  50. // we will still return to the caller immediately.
  51. //
  52. OnError(error);
  53. return true;
  54. }
  55. }
  56. }
  57. }