Where.cs 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace System.Linq
  10. {
  11. public static partial class AsyncEnumerable
  12. {
  13. public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate)
  14. {
  15. if (source == null)
  16. throw new ArgumentNullException(nameof(source));
  17. if (predicate == null)
  18. throw new ArgumentNullException(nameof(predicate));
  19. return CreateEnumerable(
  20. () =>
  21. {
  22. var e = source.GetEnumerator();
  23. var cts = new CancellationTokenDisposable();
  24. var d = Disposable.Create(cts, e);
  25. var f = default(Func<CancellationToken, Task<bool>>);
  26. f = async ct =>
  27. {
  28. if (await e.MoveNext(ct)
  29. .ConfigureAwait(false))
  30. {
  31. if (predicate(e.Current))
  32. return true;
  33. return await f(ct)
  34. .ConfigureAwait(false);
  35. }
  36. return false;
  37. };
  38. return CreateEnumerator(
  39. ct => f(cts.Token),
  40. () => e.Current,
  41. d.Dispose,
  42. e
  43. );
  44. });
  45. }
  46. public static IAsyncEnumerable<TSource> Where<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, bool> predicate)
  47. {
  48. if (source == null)
  49. throw new ArgumentNullException(nameof(source));
  50. if (predicate == null)
  51. throw new ArgumentNullException(nameof(predicate));
  52. return CreateEnumerable(
  53. () =>
  54. {
  55. var e = source.GetEnumerator();
  56. var index = 0;
  57. var cts = new CancellationTokenDisposable();
  58. var d = Disposable.Create(cts, e);
  59. var f = default(Func<CancellationToken, Task<bool>>);
  60. f = async ct =>
  61. {
  62. if (await e.MoveNext(ct)
  63. .ConfigureAwait(false))
  64. {
  65. if (predicate(e.Current, checked(index++)))
  66. return true;
  67. return await f(ct)
  68. .ConfigureAwait(false);
  69. }
  70. return false;
  71. };
  72. return CreateEnumerator(
  73. ct => f(cts.Token),
  74. () => e.Current,
  75. d.Dispose,
  76. e
  77. );
  78. });
  79. }
  80. }
  81. }