Materialize.cs 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. internal sealed class Materialize<TSource> : Producer<Notification<TSource>, Materialize<TSource>._>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. public Materialize(IObservable<TSource> source)
  10. {
  11. _source = source;
  12. }
  13. public IObservable<TSource> Dematerialize() => _source.AsObservable();
  14. protected override _ CreateSink(IObserver<Notification<TSource>> observer, IDisposable cancel) => new _(observer, cancel);
  15. protected override IDisposable Run(_ sink) => _source.SubscribeSafe(sink);
  16. internal sealed class _ : Sink<TSource, Notification<TSource>>
  17. {
  18. public _(IObserver<Notification<TSource>> observer, IDisposable cancel)
  19. : base(observer, cancel)
  20. {
  21. }
  22. public override void OnNext(TSource value)
  23. {
  24. ForwardOnNext(Notification.CreateOnNext<TSource>(value));
  25. }
  26. public override void OnError(Exception error)
  27. {
  28. ForwardOnNext(Notification.CreateOnError<TSource>(error));
  29. ForwardOnCompleted();
  30. }
  31. public override void OnCompleted()
  32. {
  33. ForwardOnNext(Notification.CreateOnCompleted<TSource>());
  34. ForwardOnCompleted();
  35. }
  36. }
  37. }
  38. }