Synchronize.cs 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. namespace System.Reactive.Linq.ObservableImpl
  5. {
  6. class Synchronize<TSource> : Producer<TSource>
  7. {
  8. private readonly IObservable<TSource> _source;
  9. private readonly object _gate;
  10. public Synchronize(IObservable<TSource> source, object gate)
  11. {
  12. _source = source;
  13. _gate = gate;
  14. }
  15. public Synchronize(IObservable<TSource> source)
  16. {
  17. _source = source;
  18. }
  19. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  20. {
  21. var sink = new _(this, observer, cancel);
  22. setSink(sink);
  23. return _source.Subscribe(sink);
  24. }
  25. class _ : Sink<TSource>, IObserver<TSource>
  26. {
  27. private readonly Synchronize<TSource> _parent;
  28. private object _gate;
  29. public _(Synchronize<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
  30. : base(observer, cancel)
  31. {
  32. _parent = parent;
  33. _gate = _parent._gate ?? new object();
  34. }
  35. public void OnNext(TSource value)
  36. {
  37. lock (_gate)
  38. {
  39. base._observer.OnNext(value);
  40. }
  41. }
  42. public void OnError(Exception error)
  43. {
  44. lock (_gate)
  45. {
  46. base._observer.OnError(error);
  47. base.Dispose();
  48. }
  49. }
  50. public void OnCompleted()
  51. {
  52. lock (_gate)
  53. {
  54. base._observer.OnCompleted();
  55. base.Dispose();
  56. }
  57. }
  58. }
  59. }
  60. }
  61. #endif