Count.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Threading.Tasks;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<int> Count<TSource>(this IAsyncObservable<TSource> source)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. return Create<TSource, int>(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.Count<TSource>(observer)));
  14. }
  15. public static IAsyncObservable<int> Count<TSource>(this IAsyncObservable<TSource> source, Func<TSource, bool> predicate)
  16. {
  17. if (source == null)
  18. throw new ArgumentNullException(nameof(source));
  19. if (predicate == null)
  20. throw new ArgumentNullException(nameof(predicate));
  21. return Create(
  22. source,
  23. predicate,
  24. default(int),
  25. (source, predicate, observer) => source.SubscribeSafeAsync(AsyncObserver.Count(observer, predicate)));
  26. }
  27. public static IAsyncObservable<int> Count<TSource>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<bool>> predicate)
  28. {
  29. if (source == null)
  30. throw new ArgumentNullException(nameof(source));
  31. if (predicate == null)
  32. throw new ArgumentNullException(nameof(predicate));
  33. return Create(
  34. source,
  35. predicate,
  36. default(int),
  37. (source, predicate, observer) => source.SubscribeSafeAsync(AsyncObserver.Count(observer, predicate)));
  38. }
  39. }
  40. public partial class AsyncObserver
  41. {
  42. public static IAsyncObserver<TSource> Count<TSource>(IAsyncObserver<int> observer)
  43. {
  44. if (observer == null)
  45. throw new ArgumentNullException(nameof(observer));
  46. var count = 0;
  47. return Create<TSource>(
  48. async x =>
  49. {
  50. try
  51. {
  52. checked
  53. {
  54. count++;
  55. }
  56. }
  57. catch (Exception ex)
  58. {
  59. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  60. return;
  61. }
  62. },
  63. observer.OnErrorAsync,
  64. async () =>
  65. {
  66. await observer.OnNextAsync(count).ConfigureAwait(false);
  67. await observer.OnCompletedAsync().ConfigureAwait(false);
  68. }
  69. );
  70. }
  71. public static IAsyncObserver<TSource> Count<TSource>(IAsyncObserver<int> observer, Func<TSource, bool> predicate)
  72. {
  73. if (observer == null)
  74. throw new ArgumentNullException(nameof(observer));
  75. if (predicate == null)
  76. throw new ArgumentNullException(nameof(predicate));
  77. return Where(Count<TSource>(observer), predicate);
  78. }
  79. public static IAsyncObserver<TSource> Count<TSource>(IAsyncObserver<int> observer, Func<TSource, ValueTask<bool>> predicate)
  80. {
  81. if (observer == null)
  82. throw new ArgumentNullException(nameof(observer));
  83. if (predicate == null)
  84. throw new ArgumentNullException(nameof(predicate));
  85. return Where(Count<TSource>(observer), predicate);
  86. }
  87. }
  88. }