Dematerialize.cs 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. internal sealed class Dematerialize<TSource> : Producer<TSource, Dematerialize<TSource>._>
  7. {
  8. private readonly IObservable<Notification<TSource>> _source;
  9. public Dematerialize(IObservable<Notification<TSource>> source)
  10. {
  11. _source = source;
  12. }
  13. protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);
  14. protected override void Run(_ sink) => sink.Run(_source);
  15. internal sealed class _ : Sink<Notification<TSource>, TSource>
  16. {
  17. public _(IObserver<TSource> observer)
  18. : base(observer)
  19. {
  20. }
  21. public override void OnNext(Notification<TSource> value)
  22. {
  23. switch (value.Kind)
  24. {
  25. case NotificationKind.OnNext:
  26. ForwardOnNext(value.Value);
  27. break;
  28. case NotificationKind.OnError:
  29. ForwardOnError(value.Exception!);
  30. break;
  31. case NotificationKind.OnCompleted:
  32. ForwardOnCompleted();
  33. break;
  34. }
  35. }
  36. }
  37. }
  38. }