SynchronizedObserver.cs 998 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. namespace System.Reactive
  3. {
  4. internal class SynchronizedObserver<T> : ObserverBase<T>
  5. {
  6. private readonly object _gate;
  7. private readonly IObserver<T> _observer;
  8. public SynchronizedObserver(IObserver<T> observer, object gate)
  9. {
  10. _gate = gate;
  11. _observer = observer;
  12. }
  13. protected override void OnNextCore(T value)
  14. {
  15. lock (_gate)
  16. {
  17. _observer.OnNext(value);
  18. }
  19. }
  20. protected override void OnErrorCore(Exception exception)
  21. {
  22. lock (_gate)
  23. {
  24. _observer.OnError(exception);
  25. }
  26. }
  27. protected override void OnCompletedCore()
  28. {
  29. lock (_gate)
  30. {
  31. _observer.OnCompleted();
  32. }
  33. }
  34. }
  35. }