MostRecent.cs 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Threading;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class MostRecent<TSource> : PushToPullAdapter<TSource, TSource>
  9. {
  10. private readonly TSource _initialValue;
  11. public MostRecent(IObservable<TSource> source, TSource initialValue)
  12. : base(source)
  13. {
  14. _initialValue = initialValue;
  15. }
  16. protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
  17. {
  18. return new _(_initialValue, subscription);
  19. }
  20. class _ : PushToPullSink<TSource, TSource>
  21. {
  22. public _(TSource initialValue, IDisposable subscription)
  23. : base(subscription)
  24. {
  25. _kind = NotificationKind.OnNext;
  26. _value = initialValue;
  27. }
  28. private volatile NotificationKind _kind;
  29. private TSource _value;
  30. private Exception _error;
  31. public override void OnNext(TSource value)
  32. {
  33. _value = value;
  34. _kind = NotificationKind.OnNext; // Write last!
  35. }
  36. public override void OnError(Exception error)
  37. {
  38. base.Dispose();
  39. _error = error;
  40. _kind = NotificationKind.OnError; // Write last!
  41. }
  42. public override void OnCompleted()
  43. {
  44. base.Dispose();
  45. _kind = NotificationKind.OnCompleted; // Write last!
  46. }
  47. public override bool TryMoveNext(out TSource current)
  48. {
  49. //
  50. // Notice the _kind field is marked volatile and read before the other fields.
  51. //
  52. // In case of a concurrent change, we may read a stale OnNext value, which is
  53. // fine because this push-to-pull adapter is about sampling.
  54. //
  55. switch (_kind)
  56. {
  57. case NotificationKind.OnNext:
  58. current = _value;
  59. return true;
  60. case NotificationKind.OnError:
  61. _error.Throw();
  62. break;
  63. case NotificationKind.OnCompleted:
  64. break;
  65. }
  66. current = default(TSource);
  67. return false;
  68. }
  69. }
  70. }
  71. }
  72. #endif