JoinObserver.cs 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System.Collections.Generic;
  3. using System.Reactive.Disposables;
  4. using System.Reactive.Linq;
  5. namespace System.Reactive.Joins
  6. {
  7. internal interface IJoinObserver : IDisposable
  8. {
  9. void Subscribe(object gate);
  10. void Dequeue();
  11. }
  12. internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver
  13. {
  14. private object gate;
  15. private readonly IObservable<T> source;
  16. private readonly Action<Exception> onError;
  17. private List<ActivePlan> activePlans;
  18. public Queue<Notification<T>> Queue { get; private set; }
  19. private readonly SingleAssignmentDisposable subscription;
  20. private bool isDisposed;
  21. public JoinObserver(IObservable<T> source, Action<Exception> onError)
  22. {
  23. this.source = source;
  24. this.onError = onError;
  25. Queue = new Queue<Notification<T>>();
  26. subscription = new SingleAssignmentDisposable();
  27. activePlans = new List<ActivePlan>();
  28. }
  29. public void AddActivePlan(ActivePlan activePlan)
  30. {
  31. activePlans.Add(activePlan);
  32. }
  33. public void Subscribe(object gate)
  34. {
  35. this.gate = gate;
  36. subscription.Disposable = source.Materialize().SubscribeSafe(this);
  37. }
  38. public void Dequeue()
  39. {
  40. Queue.Dequeue();
  41. }
  42. protected override void OnNextCore(Notification<T> notification)
  43. {
  44. lock (gate)
  45. {
  46. if (!isDisposed)
  47. {
  48. if (notification.Kind == NotificationKind.OnError)
  49. {
  50. onError(notification.Exception);
  51. return;
  52. }
  53. Queue.Enqueue(notification);
  54. foreach (var activePlan in activePlans.ToArray())
  55. activePlan.Match();
  56. }
  57. }
  58. }
  59. protected override void OnErrorCore(Exception exception)
  60. {
  61. }
  62. protected override void OnCompletedCore()
  63. {
  64. }
  65. internal void RemoveActivePlan(ActivePlan activePlan)
  66. {
  67. activePlans.Remove(activePlan);
  68. if (activePlans.Count == 0)
  69. Dispose();
  70. }
  71. protected override void Dispose(bool disposing)
  72. {
  73. base.Dispose(disposing);
  74. if (!isDisposed)
  75. {
  76. if (disposing)
  77. subscription.Dispose();
  78. isDisposed = true;
  79. }
  80. }
  81. }
  82. }