ConnectableAsyncObservable.cs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. namespace System.Reactive.Subjects
  8. {
  9. internal sealed class ConnectableAsyncObservable<TSource, TResult> : IConnectableAsyncObservable<TResult>
  10. {
  11. private readonly IAsyncSubject<TSource, TResult> _subject;
  12. private readonly IAsyncObservable<TSource> _source;
  13. private readonly AsyncGate _gate = new();
  14. private Connection _connection;
  15. public ConnectableAsyncObservable(IAsyncObservable<TSource> source, IAsyncSubject<TSource, TResult> subject)
  16. {
  17. _subject = subject;
  18. _source = source.AsAsyncObservable();
  19. }
  20. public async ValueTask<IAsyncDisposable> ConnectAsync()
  21. {
  22. using (await _gate.LockAsync().ConfigureAwait(false))
  23. {
  24. if (_connection == null)
  25. {
  26. var subscription = await _source.SubscribeAsync(_subject).ConfigureAwait(false);
  27. _connection = new Connection(this, subscription);
  28. }
  29. return _connection;
  30. }
  31. }
  32. private sealed class Connection : IAsyncDisposable
  33. {
  34. private readonly ConnectableAsyncObservable<TSource, TResult> _parent;
  35. private IAsyncDisposable _subscription;
  36. public Connection(ConnectableAsyncObservable<TSource, TResult> parent, IAsyncDisposable subscription)
  37. {
  38. _parent = parent;
  39. _subscription = subscription;
  40. }
  41. public async ValueTask DisposeAsync()
  42. {
  43. using (await _parent._gate.LockAsync().ConfigureAwait(false))
  44. {
  45. if (_subscription != null)
  46. {
  47. await _subscription.DisposeAsync().ConfigureAwait(false);
  48. _subscription = null;
  49. _parent._connection = null;
  50. }
  51. }
  52. }
  53. }
  54. public ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<TResult> observer)
  55. {
  56. if (observer == null)
  57. throw new ArgumentNullException(nameof(observer));
  58. return _subject.SubscribeAsync(observer);
  59. }
  60. }
  61. }