Throw.cs 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal sealed class Throw<TResult> : Producer<TResult, Throw<TResult>._>
  9. {
  10. private readonly Exception _exception;
  11. private readonly IScheduler _scheduler;
  12. public Throw(Exception exception, IScheduler scheduler)
  13. {
  14. _exception = exception;
  15. _scheduler = scheduler;
  16. }
  17. protected override _ CreateSink(IObserver<TResult> observer) => new _(_exception, observer);
  18. protected override void Run(_ sink) => sink.Run(_scheduler);
  19. internal sealed class _ : IdentitySink<TResult>
  20. {
  21. private readonly Exception _exception;
  22. public _(Exception exception, IObserver<TResult> observer)
  23. : base(observer)
  24. {
  25. _exception = exception;
  26. }
  27. public void Run(IScheduler scheduler)
  28. {
  29. SetUpstream(scheduler.ScheduleAction(this, static @this => @this.ForwardOnError(@this._exception)));
  30. }
  31. }
  32. }
  33. // There is no need for a full Producer/IdentitySink as there is no
  34. // way to stop a first task running on the immediate scheduler
  35. // as it is always synchronous.
  36. internal sealed class ThrowImmediate<TSource> : BasicProducer<TSource>
  37. {
  38. private readonly Exception _exception;
  39. public ThrowImmediate(Exception exception)
  40. {
  41. _exception = exception;
  42. }
  43. protected override IDisposable Run(IObserver<TSource> observer)
  44. {
  45. observer.OnError(_exception);
  46. return Disposable.Empty;
  47. }
  48. }
  49. }