Contains.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<bool> Contains<TSource>(this IAsyncObservable<TSource> source, TSource element)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. return Create(
  14. source,
  15. element,
  16. default(bool),
  17. (source, element, observer) => source.SubscribeSafeAsync(AsyncObserver.Contains(observer, element)));
  18. }
  19. public static IAsyncObservable<bool> Contains<TSource>(this IAsyncObservable<TSource> source, TSource element, IEqualityComparer<TSource> comparer)
  20. {
  21. if (source == null)
  22. throw new ArgumentNullException(nameof(source));
  23. if (comparer == null)
  24. throw new ArgumentNullException(nameof(comparer));
  25. return Create(
  26. source,
  27. (element, comparer),
  28. default(bool),
  29. (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Contains(observer, state.element, state.comparer)));
  30. }
  31. }
  32. public partial class AsyncObserver
  33. {
  34. public static IAsyncObserver<TSource> Contains<TSource>(IAsyncObserver<bool> observer, TSource element)
  35. {
  36. if (observer == null)
  37. throw new ArgumentNullException(nameof(observer));
  38. return Contains(observer, element, EqualityComparer<TSource>.Default);
  39. }
  40. public static IAsyncObserver<TSource> Contains<TSource>(IAsyncObserver<bool> observer, TSource element, IEqualityComparer<TSource> comparer)
  41. {
  42. if (observer == null)
  43. throw new ArgumentNullException(nameof(observer));
  44. if (comparer == null)
  45. throw new ArgumentNullException(nameof(comparer));
  46. return Create<TSource>(
  47. async x =>
  48. {
  49. var equals = false;
  50. try
  51. {
  52. equals = comparer.Equals(x, element);
  53. }
  54. catch (Exception ex)
  55. {
  56. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  57. return;
  58. }
  59. if (equals)
  60. {
  61. await observer.OnNextAsync(true).ConfigureAwait(false);
  62. await observer.OnCompletedAsync().ConfigureAwait(false);
  63. }
  64. },
  65. observer.OnErrorAsync,
  66. async () =>
  67. {
  68. await observer.OnNextAsync(false).ConfigureAwait(false);
  69. await observer.OnCompletedAsync().ConfigureAwait(false);
  70. }
  71. );
  72. }
  73. }
  74. }