Intersect.cs 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second) =>
  12. Intersect(first, second, comparer: null);
  13. public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  14. {
  15. if (first == null)
  16. throw Error.ArgumentNull(nameof(first));
  17. if (second == null)
  18. throw Error.ArgumentNull(nameof(second));
  19. #if CSHARP8 && USE_ASYNC_ITERATOR
  20. return Create(Core);
  21. async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
  22. {
  23. var set = new Set<TSource>(comparer);
  24. await foreach (var element in second.WithCancellation(cancellationToken).ConfigureAwait(false))
  25. {
  26. set.Add(element);
  27. }
  28. await foreach (var element in first.WithCancellation(cancellationToken).ConfigureAwait(false))
  29. {
  30. if (set.Remove(element))
  31. {
  32. yield return element;
  33. }
  34. }
  35. }
  36. #else
  37. return new IntersectAsyncIterator<TSource>(first, second, comparer);
  38. #endif
  39. }
  40. #if !(CSHARP8 && USE_ASYNC_ITERATOR)
  41. private sealed class IntersectAsyncIterator<TSource> : AsyncIterator<TSource>
  42. {
  43. private readonly IEqualityComparer<TSource> _comparer;
  44. private readonly IAsyncEnumerable<TSource> _first;
  45. private readonly IAsyncEnumerable<TSource> _second;
  46. private IAsyncEnumerator<TSource> _firstEnumerator;
  47. private Set<TSource> _set;
  48. public IntersectAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  49. {
  50. Debug.Assert(first != null);
  51. Debug.Assert(second != null);
  52. _first = first;
  53. _second = second;
  54. _comparer = comparer;
  55. }
  56. public override AsyncIteratorBase<TSource> Clone()
  57. {
  58. return new IntersectAsyncIterator<TSource>(_first, _second, _comparer);
  59. }
  60. public override async ValueTask DisposeAsync()
  61. {
  62. if (_firstEnumerator != null)
  63. {
  64. await _firstEnumerator.DisposeAsync().ConfigureAwait(false);
  65. _firstEnumerator = null;
  66. }
  67. _set = null;
  68. await base.DisposeAsync().ConfigureAwait(false);
  69. }
  70. protected override async ValueTask<bool> MoveNextCore()
  71. {
  72. // NB: Earlier implementations of this operator constructed the set for the second source concurrently
  73. // with the first MoveNextAsync call on the first source. This resulted in an unexpected source of
  74. // concurrency, which isn't a great default behavior because it's very hard to suppress or control
  75. // this behavior.
  76. switch (_state)
  77. {
  78. case AsyncIteratorState.Allocated:
  79. _set = await AsyncEnumerableHelpers.ToSet(_second, _comparer, _cancellationToken).ConfigureAwait(false);
  80. _firstEnumerator = _first.GetAsyncEnumerator(_cancellationToken);
  81. _state = AsyncIteratorState.Iterating;
  82. goto case AsyncIteratorState.Iterating;
  83. case AsyncIteratorState.Iterating:
  84. bool moveNext;
  85. do
  86. {
  87. moveNext = await _firstEnumerator.MoveNextAsync().ConfigureAwait(false);
  88. if (moveNext)
  89. {
  90. var item = _firstEnumerator.Current;
  91. if (_set.Remove(item))
  92. {
  93. _current = item;
  94. return true;
  95. }
  96. }
  97. } while (moveNext);
  98. await DisposeAsync().ConfigureAwait(false);
  99. break;
  100. }
  101. return false;
  102. }
  103. }
  104. #endif
  105. }
  106. }