AsyncLockObserver.cs 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Reactive.Concurrency;
  3. namespace System.Reactive
  4. {
  5. internal class AsyncLockObserver<T> : ObserverBase<T>
  6. {
  7. private readonly AsyncLock _gate;
  8. private readonly IObserver<T> _observer;
  9. public AsyncLockObserver(IObserver<T> observer, AsyncLock gate)
  10. {
  11. _gate = gate;
  12. _observer = observer;
  13. }
  14. protected override void OnNextCore(T value)
  15. {
  16. _gate.Wait(() =>
  17. {
  18. _observer.OnNext(value);
  19. });
  20. }
  21. protected override void OnErrorCore(Exception exception)
  22. {
  23. _gate.Wait(() =>
  24. {
  25. _observer.OnError(exception);
  26. });
  27. }
  28. protected override void OnCompletedCore()
  29. {
  30. _gate.Wait(() =>
  31. {
  32. _observer.OnCompleted();
  33. });
  34. }
  35. }
  36. }