Scan.cs 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Threading.Tasks;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  12. {
  13. if (source == null)
  14. throw new ArgumentNullException(nameof(source));
  15. if (accumulator == null)
  16. throw new ArgumentNullException(nameof(accumulator));
  17. return new ScanAsyncEnumerable<TSource, TAccumulate>(source, seed, accumulator);
  18. }
  19. public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  20. {
  21. if (source == null)
  22. throw new ArgumentNullException(nameof(source));
  23. if (accumulator == null)
  24. throw new ArgumentNullException(nameof(accumulator));
  25. return new ScanAsyncEnumerable<TSource>(source, accumulator);
  26. }
  27. private sealed class ScanAsyncEnumerable<TSource, TAccumulate> : AsyncIterator<TAccumulate>
  28. {
  29. private readonly Func<TAccumulate, TSource, TAccumulate> accumulator;
  30. private readonly TAccumulate seed;
  31. private readonly IAsyncEnumerable<TSource> source;
  32. private TAccumulate accumulated;
  33. private IAsyncEnumerator<TSource> enumerator;
  34. public ScanAsyncEnumerable(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
  35. {
  36. Debug.Assert(source != null);
  37. Debug.Assert(accumulator != null);
  38. this.source = source;
  39. this.seed = seed;
  40. this.accumulator = accumulator;
  41. }
  42. public override AsyncIterator<TAccumulate> Clone()
  43. {
  44. return new ScanAsyncEnumerable<TSource, TAccumulate>(source, seed, accumulator);
  45. }
  46. public override async Task DisposeAsync()
  47. {
  48. if (enumerator != null)
  49. {
  50. await enumerator.DisposeAsync().ConfigureAwait(false);
  51. enumerator = null;
  52. accumulated = default(TAccumulate);
  53. }
  54. await base.DisposeAsync().ConfigureAwait(false);
  55. }
  56. protected override async Task<bool> MoveNextCore()
  57. {
  58. switch (state)
  59. {
  60. case AsyncIteratorState.Allocated:
  61. enumerator = source.GetAsyncEnumerator();
  62. accumulated = seed;
  63. state = AsyncIteratorState.Iterating;
  64. goto case AsyncIteratorState.Iterating;
  65. case AsyncIteratorState.Iterating:
  66. if (await enumerator.MoveNextAsync()
  67. .ConfigureAwait(false))
  68. {
  69. var item = enumerator.Current;
  70. accumulated = accumulator(accumulated, item);
  71. current = accumulated;
  72. return true;
  73. }
  74. break;
  75. }
  76. await DisposeAsync().ConfigureAwait(false);
  77. return false;
  78. }
  79. }
  80. private sealed class ScanAsyncEnumerable<TSource> : AsyncIterator<TSource>
  81. {
  82. private readonly Func<TSource, TSource, TSource> accumulator;
  83. private readonly IAsyncEnumerable<TSource> source;
  84. private TSource accumulated;
  85. private IAsyncEnumerator<TSource> enumerator;
  86. private bool hasSeed;
  87. public ScanAsyncEnumerable(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator)
  88. {
  89. Debug.Assert(source != null);
  90. Debug.Assert(accumulator != null);
  91. this.source = source;
  92. this.accumulator = accumulator;
  93. }
  94. public override AsyncIterator<TSource> Clone()
  95. {
  96. return new ScanAsyncEnumerable<TSource>(source, accumulator);
  97. }
  98. public override async Task DisposeAsync()
  99. {
  100. if (enumerator != null)
  101. {
  102. await enumerator.DisposeAsync().ConfigureAwait(false);
  103. enumerator = null;
  104. accumulated = default(TSource);
  105. }
  106. await base.DisposeAsync().ConfigureAwait(false);
  107. }
  108. protected override async Task<bool> MoveNextCore()
  109. {
  110. switch (state)
  111. {
  112. case AsyncIteratorState.Allocated:
  113. enumerator = source.GetAsyncEnumerator();
  114. hasSeed = false;
  115. accumulated = default(TSource);
  116. state = AsyncIteratorState.Iterating;
  117. goto case AsyncIteratorState.Iterating;
  118. case AsyncIteratorState.Iterating:
  119. while (await enumerator.MoveNextAsync().ConfigureAwait(false))
  120. {
  121. var item = enumerator.Current;
  122. if (!hasSeed)
  123. {
  124. hasSeed = true;
  125. accumulated = item;
  126. continue; // loop
  127. }
  128. accumulated = accumulator(accumulated, item);
  129. current = accumulated;
  130. return true;
  131. }
  132. break; // case
  133. }
  134. await DisposeAsync().ConfigureAwait(false);
  135. return false;
  136. }
  137. }
  138. }
  139. }