CheckedObserver.cs 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Threading;
  4. namespace System.Reactive
  5. {
  6. internal class CheckedObserver<T> : IObserver<T>
  7. {
  8. private readonly IObserver<T> _observer;
  9. private int _state;
  10. private const int IDLE = 0;
  11. private const int BUSY = 1;
  12. private const int DONE = 2;
  13. public CheckedObserver(IObserver<T> observer)
  14. {
  15. _observer = observer;
  16. }
  17. public void OnNext(T value)
  18. {
  19. CheckAccess();
  20. try
  21. {
  22. _observer.OnNext(value);
  23. }
  24. finally
  25. {
  26. Interlocked.Exchange(ref _state, IDLE);
  27. }
  28. }
  29. public void OnError(Exception error)
  30. {
  31. CheckAccess();
  32. try
  33. {
  34. _observer.OnError(error);
  35. }
  36. finally
  37. {
  38. Interlocked.Exchange(ref _state, DONE);
  39. }
  40. }
  41. public void OnCompleted()
  42. {
  43. CheckAccess();
  44. try
  45. {
  46. _observer.OnCompleted();
  47. }
  48. finally
  49. {
  50. Interlocked.Exchange(ref _state, DONE);
  51. }
  52. }
  53. private void CheckAccess()
  54. {
  55. switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE))
  56. {
  57. case BUSY:
  58. throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED);
  59. case DONE:
  60. throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED);
  61. }
  62. }
  63. }
  64. }