Amb.cs 3.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Disposables;
  5. using System.Threading;
  6. namespace System.Reactive.Linq
  7. {
  8. partial class AsyncObservable
  9. {
  10. public static IAsyncObservable<TSource> Amb<TSource>(this IAsyncObservable<TSource> first, IAsyncObservable<TSource> second)
  11. {
  12. if (first == null)
  13. throw new ArgumentNullException(nameof(first));
  14. if (second == null)
  15. throw new ArgumentNullException(nameof(second));
  16. return Create<TSource>(async observer =>
  17. {
  18. IAsyncDisposable firstSubscription = null;
  19. IAsyncDisposable secondSubscription = null;
  20. var (firstObserver, secondObserver) = AsyncObserver.Amb(observer, firstSubscription, secondSubscription);
  21. var firstTask = first.SubscribeAsync(firstObserver);
  22. var secondTask = second.SubscribeAsync(secondObserver);
  23. var d1 = await firstTask.ConfigureAwait(false);
  24. var d2 = await secondTask.ConfigureAwait(false);
  25. return StableCompositeAsyncDisposable.Create(d1, d2);
  26. });
  27. }
  28. }
  29. partial class AsyncObserver
  30. {
  31. public static (IAsyncObserver<TSource>, IAsyncObserver<TSource>) Amb<TSource>(IAsyncObserver<TSource> observer, IAsyncDisposable first, IAsyncDisposable second)
  32. {
  33. if (observer == null)
  34. throw new ArgumentNullException(nameof(observer));
  35. if (first == null)
  36. throw new ArgumentNullException(nameof(first));
  37. if (second == null)
  38. throw new ArgumentNullException(nameof(second));
  39. var gate = new AsyncLock();
  40. return
  41. (
  42. Create<TSource>(
  43. async x =>
  44. {
  45. using (await gate.LockAsync().ConfigureAwait(false))
  46. {
  47. }
  48. },
  49. async ex =>
  50. {
  51. using (await gate.LockAsync().ConfigureAwait(false))
  52. {
  53. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  54. }
  55. },
  56. async () =>
  57. {
  58. using (await gate.LockAsync().ConfigureAwait(false))
  59. {
  60. }
  61. }
  62. ),
  63. Create<TSource>(
  64. async x =>
  65. {
  66. using (await gate.LockAsync().ConfigureAwait(false))
  67. {
  68. }
  69. },
  70. async ex =>
  71. {
  72. using (await gate.LockAsync().ConfigureAwait(false))
  73. {
  74. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  75. }
  76. },
  77. async () =>
  78. {
  79. using (await gate.LockAsync().ConfigureAwait(false))
  80. {
  81. }
  82. }
  83. )
  84. );
  85. }
  86. }
  87. }