ConnectableObservable.cs 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Reactive.Linq;
  6. namespace System.Reactive.Subjects
  7. {
  8. /// <summary>
  9. /// Represents an observable wrapper that can be connected and disconnected from its underlying observable sequence.
  10. /// </summary>
  11. /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
  12. /// <typeparam name="TResult">The type of the elements in the resulting sequence, after transformation through the subject.</typeparam>
  13. internal class ConnectableObservable<TSource, TResult> : IConnectableObservable<TResult>
  14. {
  15. private readonly ISubject<TSource, TResult> _subject;
  16. private readonly IObservable<TSource> _source;
  17. private readonly object _gate;
  18. private Connection _connection;
  19. /// <summary>
  20. /// Creates an observable that can be connected and disconnected from its source.
  21. /// </summary>
  22. /// <param name="source">Underlying observable source sequence that can be connected and disconnected from the wrapper.</param>
  23. /// <param name="subject">Subject exposed by the connectable observable, receiving data from the underlying source sequence upon connection.</param>
  24. public ConnectableObservable(IObservable<TSource> source, ISubject<TSource, TResult> subject)
  25. {
  26. _subject = subject;
  27. _source = source.AsObservable(); // This gets us auto-detach behavior; otherwise, we'd have to roll our own, including trampoline installation.
  28. _gate = new object();
  29. }
  30. /// <summary>
  31. /// Connects the observable wrapper to its source. All subscribed observers will receive values from the underlying observable sequence as long as the connection is established.
  32. /// </summary>
  33. /// <returns>Disposable object used to disconnect the observable wrapper from its source, causing subscribed observer to stop receiving values from the underlying observable sequence.</returns>
  34. public IDisposable Connect()
  35. {
  36. lock (_gate)
  37. {
  38. if (_connection == null)
  39. {
  40. var subscription = _source.SubscribeSafe(_subject);
  41. _connection = new Connection(this, subscription);
  42. }
  43. return _connection;
  44. }
  45. }
  46. class Connection : IDisposable
  47. {
  48. private readonly ConnectableObservable<TSource, TResult> _parent;
  49. private IDisposable _subscription;
  50. public Connection(ConnectableObservable<TSource, TResult> parent, IDisposable subscription)
  51. {
  52. _parent = parent;
  53. _subscription = subscription;
  54. }
  55. public void Dispose()
  56. {
  57. lock (_parent._gate)
  58. {
  59. if (_subscription != null)
  60. {
  61. _subscription.Dispose();
  62. _subscription = null;
  63. _parent._connection = null;
  64. }
  65. }
  66. }
  67. }
  68. /// <summary>
  69. /// Subscribes an observer to the observable sequence. No values from the underlying observable source will be received unless a connection was established through the Connect method.
  70. /// </summary>
  71. /// <param name="observer">Observer that will receive values from the underlying observable source when the current ConnectableObservable instance is connected through a call to Connect.</param>
  72. /// <returns>Disposable used to unsubscribe from the observable sequence.</returns>
  73. public IDisposable Subscribe(IObserver<TResult> observer)
  74. {
  75. if (observer == null)
  76. throw new ArgumentNullException(nameof(observer));
  77. return _subject.SubscribeSafe(observer);
  78. }
  79. }
  80. }