RefCount.cs 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Reactive.Subjects;
  6. using System.Threading;
  7. namespace System.Reactive.Linq
  8. {
  9. partial class AsyncObservable
  10. {
  11. public static IAsyncObservable<TSource> RefCount<TSource>(this IConnectableAsyncObservable<TSource> source)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. var gate = new AsyncLock();
  16. var count = 0;
  17. var connectable = default(IAsyncDisposable);
  18. return Create<TSource>(async observer =>
  19. {
  20. var subscription = await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
  21. using (await gate.LockAsync().ConfigureAwait(false))
  22. {
  23. if (++count == 1)
  24. {
  25. connectable = await source.ConnectAsync().ConfigureAwait(false);
  26. }
  27. }
  28. return AsyncDisposable.Create(async () =>
  29. {
  30. await subscription.DisposeAsync().ConfigureAwait(false);
  31. using (await gate.LockAsync().ConfigureAwait(false))
  32. {
  33. if (--count == 0)
  34. {
  35. await connectable.DisposeAsync().ConfigureAwait(false);
  36. }
  37. }
  38. });
  39. });
  40. }
  41. }
  42. }