Dematerialize.cs 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. namespace System.Reactive.Linq.ObservableImpl
  6. {
  7. class Dematerialize<TSource> : Producer<TSource>
  8. {
  9. private readonly IObservable<Notification<TSource>> _source;
  10. public Dematerialize(IObservable<Notification<TSource>> source)
  11. {
  12. _source = source;
  13. }
  14. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  15. {
  16. var sink = new _(observer, cancel);
  17. setSink(sink);
  18. return _source.SubscribeSafe(sink);
  19. }
  20. class _ : Sink<TSource>, IObserver<Notification<TSource>>
  21. {
  22. public _(IObserver<TSource> observer, IDisposable cancel)
  23. : base(observer, cancel)
  24. {
  25. }
  26. public void OnNext(Notification<TSource> value)
  27. {
  28. switch (value.Kind)
  29. {
  30. case NotificationKind.OnNext:
  31. base._observer.OnNext(value.Value);
  32. break;
  33. case NotificationKind.OnError:
  34. base._observer.OnError(value.Exception);
  35. base.Dispose();
  36. break;
  37. case NotificationKind.OnCompleted:
  38. base._observer.OnCompleted();
  39. base.Dispose();
  40. break;
  41. }
  42. }
  43. public void OnError(Exception error)
  44. {
  45. base._observer.OnError(error);
  46. base.Dispose();
  47. }
  48. public void OnCompleted()
  49. {
  50. base._observer.OnCompleted();
  51. base.Dispose();
  52. }
  53. }
  54. }
  55. }