MostRecent.cs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System;
  6. using System.Collections.Generic;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. class MostRecent<TSource> : PushToPullAdapter<TSource, TSource>
  11. {
  12. private readonly TSource _initialValue;
  13. public MostRecent(IObservable<TSource> source, TSource initialValue)
  14. : base(source)
  15. {
  16. _initialValue = initialValue;
  17. }
  18. protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
  19. {
  20. return new _(_initialValue, subscription);
  21. }
  22. class _ : PushToPullSink<TSource, TSource>
  23. {
  24. public _(TSource initialValue, IDisposable subscription)
  25. : base(subscription)
  26. {
  27. _kind = NotificationKind.OnNext;
  28. _value = initialValue;
  29. }
  30. private volatile NotificationKind _kind;
  31. private TSource _value;
  32. private Exception _error;
  33. public override void OnNext(TSource value)
  34. {
  35. _value = value;
  36. _kind = NotificationKind.OnNext; // Write last!
  37. }
  38. public override void OnError(Exception error)
  39. {
  40. base.Dispose();
  41. _error = error;
  42. _kind = NotificationKind.OnError; // Write last!
  43. }
  44. public override void OnCompleted()
  45. {
  46. base.Dispose();
  47. _kind = NotificationKind.OnCompleted; // Write last!
  48. }
  49. public override bool TryMoveNext(out TSource current)
  50. {
  51. //
  52. // Notice the _kind field is marked volatile and read before the other fields.
  53. //
  54. // In case of a concurrent change, we may read a stale OnNext value, which is
  55. // fine because this push-to-pull adapter is about sampling.
  56. //
  57. switch (_kind)
  58. {
  59. case NotificationKind.OnNext:
  60. current = _value;
  61. return true;
  62. case NotificationKind.OnError:
  63. _error.Throw();
  64. break;
  65. case NotificationKind.OnCompleted:
  66. break;
  67. }
  68. current = default(TSource);
  69. return false;
  70. }
  71. }
  72. }
  73. }
  74. #endif