OnErrorResumeNext.cs 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. class OnErrorResumeNext<TSource> : Producer<TSource>
  8. {
  9. private readonly IEnumerable<IObservable<TSource>> _sources;
  10. public OnErrorResumeNext(IEnumerable<IObservable<TSource>> sources)
  11. {
  12. _sources = sources;
  13. }
  14. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  15. {
  16. var sink = new _(observer, cancel);
  17. setSink(sink);
  18. return sink.Run(_sources);
  19. }
  20. class _ : TailRecursiveSink<TSource>
  21. {
  22. public _(IObserver<TSource> observer, IDisposable cancel)
  23. : base(observer, cancel)
  24. {
  25. }
  26. protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
  27. {
  28. var oern = source as OnErrorResumeNext<TSource>;
  29. if (oern != null)
  30. return oern._sources;
  31. return null;
  32. }
  33. public override void OnNext(TSource value)
  34. {
  35. base._observer.OnNext(value);
  36. }
  37. public override void OnError(Exception error)
  38. {
  39. _recurse();
  40. }
  41. public override void OnCompleted()
  42. {
  43. _recurse();
  44. }
  45. protected override bool Fail(Exception error)
  46. {
  47. //
  48. // Note that the invocation of _recurse in OnError will
  49. // cause the next MoveNext operation to be enqueued, so
  50. // we will still return to the caller immediately.
  51. //
  52. OnError(error);
  53. return true;
  54. }
  55. }
  56. }
  57. }
  58. #endif