Synchronize.cs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer)));
  14. }
  15. public static IAsyncObservable<TSource> Synchronize<TSource>(this IAsyncObservable<TSource> source, AsyncGate gate)
  16. {
  17. if (source == null)
  18. throw new ArgumentNullException(nameof(source));
  19. if (gate == null)
  20. throw new ArgumentNullException(nameof(gate));
  21. return CreateAsyncObservable<TSource>.From(
  22. source,
  23. gate,
  24. static (source, gate, observer) => source.SubscribeSafeAsync(AsyncObserver.Synchronize(observer, gate)));
  25. }
  26. }
  27. public partial class AsyncObserver
  28. {
  29. public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer)
  30. {
  31. if (observer == null)
  32. throw new ArgumentNullException(nameof(observer));
  33. return Synchronize(observer, new AsyncGate());
  34. }
  35. public static IAsyncObserver<TSource> Synchronize<TSource>(IAsyncObserver<TSource> observer, AsyncGate gate)
  36. {
  37. if (observer == null)
  38. throw new ArgumentNullException(nameof(observer));
  39. if (gate == null)
  40. throw new ArgumentNullException(nameof(gate));
  41. return Create<TSource>(
  42. async x =>
  43. {
  44. using (await gate.LockAsync().ConfigureAwait(false))
  45. {
  46. await observer.OnNextAsync(x).ConfigureAwait(false);
  47. }
  48. },
  49. async ex =>
  50. {
  51. using (await gate.LockAsync().ConfigureAwait(false))
  52. {
  53. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  54. }
  55. },
  56. async () =>
  57. {
  58. using (await gate.LockAsync().ConfigureAwait(false))
  59. {
  60. await observer.OnCompletedAsync().ConfigureAwait(false);
  61. }
  62. }
  63. );
  64. }
  65. }
  66. }