Dematerialize.cs 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.Observαble
  5. {
  6. class Dematerialize<TSource> : Producer<TSource>
  7. {
  8. private readonly IObservable<Notification<TSource>> _source;
  9. public Dematerialize(IObservable<Notification<TSource>> source)
  10. {
  11. _source = source;
  12. }
  13. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  14. {
  15. var sink = new _(observer, cancel);
  16. setSink(sink);
  17. return _source.SubscribeSafe(sink);
  18. }
  19. class _ : Sink<TSource>, IObserver<Notification<TSource>>
  20. {
  21. public _(IObserver<TSource> observer, IDisposable cancel)
  22. : base(observer, cancel)
  23. {
  24. }
  25. public void OnNext(Notification<TSource> value)
  26. {
  27. switch (value.Kind)
  28. {
  29. case NotificationKind.OnNext:
  30. base._observer.OnNext(value.Value);
  31. break;
  32. case NotificationKind.OnError:
  33. base._observer.OnError(value.Exception);
  34. base.Dispose();
  35. break;
  36. case NotificationKind.OnCompleted:
  37. base._observer.OnCompleted();
  38. base.Dispose();
  39. break;
  40. }
  41. }
  42. public void OnError(Exception error)
  43. {
  44. base._observer.OnError(error);
  45. base.Dispose();
  46. }
  47. public void OnCompleted()
  48. {
  49. base._observer.OnCompleted();
  50. base.Dispose();
  51. }
  52. }
  53. }
  54. }
  55. #endif