MostRecent.cs 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Threading;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. class MostRecent<TSource> : PushToPullAdapter<TSource, TSource>
  10. {
  11. private readonly TSource _initialValue;
  12. public MostRecent(IObservable<TSource> source, TSource initialValue)
  13. : base(source)
  14. {
  15. _initialValue = initialValue;
  16. }
  17. protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
  18. {
  19. return new _(_initialValue, subscription);
  20. }
  21. class _ : PushToPullSink<TSource, TSource>
  22. {
  23. public _(TSource initialValue, IDisposable subscription)
  24. : base(subscription)
  25. {
  26. _kind = NotificationKind.OnNext;
  27. _value = initialValue;
  28. }
  29. private volatile NotificationKind _kind;
  30. private TSource _value;
  31. private Exception _error;
  32. public override void OnNext(TSource value)
  33. {
  34. _value = value;
  35. _kind = NotificationKind.OnNext; // Write last!
  36. }
  37. public override void OnError(Exception error)
  38. {
  39. base.Dispose();
  40. _error = error;
  41. _kind = NotificationKind.OnError; // Write last!
  42. }
  43. public override void OnCompleted()
  44. {
  45. base.Dispose();
  46. _kind = NotificationKind.OnCompleted; // Write last!
  47. }
  48. public override bool TryMoveNext(out TSource current)
  49. {
  50. //
  51. // Notice the _kind field is marked volatile and read before the other fields.
  52. //
  53. // In case of a concurrent change, we may read a stale OnNext value, which is
  54. // fine because this push-to-pull adapter is about sampling.
  55. //
  56. switch (_kind)
  57. {
  58. case NotificationKind.OnNext:
  59. current = _value;
  60. return true;
  61. case NotificationKind.OnError:
  62. _error.Throw();
  63. break;
  64. case NotificationKind.OnCompleted:
  65. break;
  66. }
  67. current = default(TSource);
  68. return false;
  69. }
  70. }
  71. }
  72. }