DefaultIfEmpty.cs 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. namespace System.Reactive.Linq
  5. {
  6. public partial class AsyncObservable
  7. {
  8. public static IAsyncObservable<TSource> DefaultIfEmpty<TSource>(this IAsyncObservable<TSource> source)
  9. {
  10. if (source == null)
  11. throw new ArgumentNullException(nameof(source));
  12. return Create(source, static (source, observer) => source.SubscribeSafeAsync(AsyncObserver.DefaultIfEmpty(observer)));
  13. }
  14. public static IAsyncObservable<TSource> DefaultIfEmpty<TSource>(this IAsyncObservable<TSource> source, TSource defaultValue)
  15. {
  16. if (source == null)
  17. throw new ArgumentNullException(nameof(source));
  18. return Create(
  19. source,
  20. defaultValue,
  21. static (source, defaultValue, observer) => source.SubscribeSafeAsync(AsyncObserver.DefaultIfEmpty(observer, defaultValue)));
  22. }
  23. }
  24. public partial class AsyncObserver
  25. {
  26. public static IAsyncObserver<TSource> DefaultIfEmpty<TSource>(IAsyncObserver<TSource> observer)
  27. {
  28. if (observer == null)
  29. throw new ArgumentNullException(nameof(observer));
  30. return DefaultIfEmpty(observer, default);
  31. }
  32. public static IAsyncObserver<TSource> DefaultIfEmpty<TSource>(IAsyncObserver<TSource> observer, TSource defaultValue)
  33. {
  34. if (observer == null)
  35. throw new ArgumentNullException(nameof(observer));
  36. var hasValue = false;
  37. return Create<TSource>(
  38. x =>
  39. {
  40. hasValue = true;
  41. return observer.OnNextAsync(x);
  42. },
  43. observer.OnErrorAsync,
  44. async () =>
  45. {
  46. if (!hasValue)
  47. {
  48. await observer.OnNextAsync(defaultValue).ConfigureAwait(false);
  49. }
  50. await observer.OnCompletedAsync().ConfigureAwait(false);
  51. }
  52. );
  53. }
  54. }
  55. }