Contains.cs 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. namespace System.Reactive.Linq
  6. {
  7. public partial class AsyncObservable
  8. {
  9. public static IAsyncObservable<bool> Contains<TSource>(this IAsyncObservable<TSource> source, TSource element)
  10. {
  11. if (source == null)
  12. throw new ArgumentNullException(nameof(source));
  13. return CreateAsyncObservable<bool>.From(
  14. source,
  15. element,
  16. static (source, element, observer) => source.SubscribeSafeAsync(AsyncObserver.Contains(observer, element)));
  17. }
  18. public static IAsyncObservable<bool> Contains<TSource>(this IAsyncObservable<TSource> source, TSource element, IEqualityComparer<TSource> comparer)
  19. {
  20. if (source == null)
  21. throw new ArgumentNullException(nameof(source));
  22. if (comparer == null)
  23. throw new ArgumentNullException(nameof(comparer));
  24. return CreateAsyncObservable<bool>.From(
  25. source,
  26. (element, comparer),
  27. static (source, state, observer) => source.SubscribeSafeAsync(AsyncObserver.Contains(observer, state.element, state.comparer)));
  28. }
  29. }
  30. public partial class AsyncObserver
  31. {
  32. public static IAsyncObserver<TSource> Contains<TSource>(IAsyncObserver<bool> observer, TSource element)
  33. {
  34. if (observer == null)
  35. throw new ArgumentNullException(nameof(observer));
  36. return Contains(observer, element, EqualityComparer<TSource>.Default);
  37. }
  38. public static IAsyncObserver<TSource> Contains<TSource>(IAsyncObserver<bool> observer, TSource element, IEqualityComparer<TSource> comparer)
  39. {
  40. if (observer == null)
  41. throw new ArgumentNullException(nameof(observer));
  42. if (comparer == null)
  43. throw new ArgumentNullException(nameof(comparer));
  44. return Create<TSource>(
  45. async x =>
  46. {
  47. var equals = false;
  48. try
  49. {
  50. equals = comparer.Equals(x, element);
  51. }
  52. catch (Exception ex)
  53. {
  54. await observer.OnErrorAsync(ex).ConfigureAwait(false);
  55. return;
  56. }
  57. if (equals)
  58. {
  59. await observer.OnNextAsync(true).ConfigureAwait(false);
  60. await observer.OnCompletedAsync().ConfigureAwait(false);
  61. }
  62. },
  63. observer.OnErrorAsync,
  64. async () =>
  65. {
  66. await observer.OnNextAsync(false).ConfigureAwait(false);
  67. await observer.OnCompletedAsync().ConfigureAwait(false);
  68. }
  69. );
  70. }
  71. }
  72. }