1
0

DefaultIfEmpty.cs 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. class DefaultIfEmpty<TSource> : Producer<TSource>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. private readonly TSource _defaultValue;
  10. public DefaultIfEmpty(IObservable<TSource> source, TSource defaultValue)
  11. {
  12. _source = source;
  13. _defaultValue = defaultValue;
  14. }
  15. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  16. {
  17. var sink = new _(this, observer, cancel);
  18. setSink(sink);
  19. return _source.SubscribeSafe(sink);
  20. }
  21. class _ : Sink<TSource>, IObserver<TSource>
  22. {
  23. private readonly DefaultIfEmpty<TSource> _parent;
  24. private bool _found;
  25. public _(DefaultIfEmpty<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  26. : base(observer, cancel)
  27. {
  28. _parent = parent;
  29. _found = false;
  30. }
  31. public void OnNext(TSource value)
  32. {
  33. _found = true;
  34. base._observer.OnNext(value);
  35. }
  36. public void OnError(Exception error)
  37. {
  38. base._observer.OnError(error);
  39. base.Dispose();
  40. }
  41. public void OnCompleted()
  42. {
  43. if (!_found)
  44. base._observer.OnNext(_parent._defaultValue);
  45. base._observer.OnCompleted();
  46. base.Dispose();
  47. }
  48. }
  49. }
  50. }
  51. #endif