Case.cs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Reactive.Disposables;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. class Case<TValue, TResult> : Producer<TResult>, IEvaluatableObservable<TResult>
  11. {
  12. private readonly Func<TValue> _selector;
  13. private readonly IDictionary<TValue, IObservable<TResult>> _sources;
  14. private readonly IObservable<TResult> _defaultSource;
  15. public Case(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)
  16. {
  17. _selector = selector;
  18. _sources = sources;
  19. _defaultSource = defaultSource;
  20. }
  21. public IObservable<TResult> Eval()
  22. {
  23. var res = default(IObservable<TResult>);
  24. if (_sources.TryGetValue(_selector(), out res))
  25. return res;
  26. return _defaultSource;
  27. }
  28. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  29. {
  30. var sink = new _(this, observer, cancel);
  31. setSink(sink);
  32. return sink.Run();
  33. }
  34. class _ : Sink<TResult>, IObserver<TResult>
  35. {
  36. private readonly Case<TValue, TResult> _parent;
  37. public _(Case<TValue, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  38. : base(observer, cancel)
  39. {
  40. _parent = parent;
  41. }
  42. public IDisposable Run()
  43. {
  44. var result = default(IObservable<TResult>);
  45. try
  46. {
  47. result = _parent.Eval();
  48. }
  49. catch (Exception exception)
  50. {
  51. base._observer.OnError(exception);
  52. base.Dispose();
  53. return Disposable.Empty;
  54. }
  55. return result.SubscribeSafe(this);
  56. }
  57. public void OnNext(TResult value)
  58. {
  59. base._observer.OnNext(value);
  60. }
  61. public void OnError(Exception error)
  62. {
  63. base._observer.OnError(error);
  64. base.Dispose();
  65. }
  66. public void OnCompleted()
  67. {
  68. base._observer.OnCompleted();
  69. base.Dispose();
  70. }
  71. }
  72. }
  73. }
  74. #endif