瀏覽代碼

Review join observer (#647)

* Save a SingleAssignmentDisposable.

* Style fixes as suggested by .editorconfig.

* Add a note about the copying of a list that is enumerated.
Daniel C. Weber 7 年之前
父節點
當前提交
e7faa2eff9
共有 1 個文件被更改,包括 22 次插入23 次删除
  1. 22 23
      Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs

+ 22 - 23
Rx.NET/Source/src/System.Reactive/Joins/JoinObserver.cs

@@ -16,32 +16,31 @@ namespace System.Reactive.Joins
 
     internal sealed class JoinObserver<T> : ObserverBase<Notification<T>>, IJoinObserver
     {
-        private object gate;
-        private readonly IObservable<T> source;
-        private readonly Action<Exception> onError;
-        private List<ActivePlan> activePlans;
+        private object _gate;
+        private readonly IObservable<T> _source;
+        private readonly Action<Exception> _onError;
+        private readonly List<ActivePlan> _activePlans;
         public Queue<Notification<T>> Queue { get; private set; }
-        private readonly SingleAssignmentDisposable subscription;
-        private bool isDisposed;
+        private IDisposable _subscription;
+        private bool _isDisposed;
 
         public JoinObserver(IObservable<T> source, Action<Exception> onError)
         {
-            this.source = source;
-            this.onError = onError;
+            _source = source;
+            _onError = onError;
             Queue = new Queue<Notification<T>>();
-            subscription = new SingleAssignmentDisposable();
-            activePlans = new List<ActivePlan>();
+            _activePlans = new List<ActivePlan>();
         }
 
         public void AddActivePlan(ActivePlan activePlan)
         {
-            activePlans.Add(activePlan);
+            _activePlans.Add(activePlan);
         }
 
         public void Subscribe(object gate)
         {
-            this.gate = gate;
-            subscription.Disposable = source.Materialize().SubscribeSafe(this);
+            _gate = gate;
+            Disposable.SetSingle(ref _subscription, _source.Materialize().SubscribeSafe(this));
         }
 
         public void Dequeue()
@@ -51,18 +50,18 @@ namespace System.Reactive.Joins
 
         protected override void OnNextCore(Notification<T> notification)
         {
-            lock (gate)
+            lock (_gate)
             {
-                if (!isDisposed)
+                if (!_isDisposed)
                 {
                     if (notification.Kind == NotificationKind.OnError)
                     {
-                        onError(notification.Exception);
+                        _onError(notification.Exception);
                         return;
                     }
 
                     Queue.Enqueue(notification);
-                    foreach (var activePlan in activePlans.ToArray())
+                    foreach (var activePlan in _activePlans.ToArray()) // Working on a copy since _activePlans might change while iterating.
                         activePlan.Match();
                 }
             }
@@ -78,8 +77,8 @@ namespace System.Reactive.Joins
 
         internal void RemoveActivePlan(ActivePlan activePlan)
         {
-            activePlans.Remove(activePlan);
-            if (activePlans.Count == 0)
+            _activePlans.Remove(activePlan);
+            if (_activePlans.Count == 0)
                 Dispose();
         }
 
@@ -87,13 +86,13 @@ namespace System.Reactive.Joins
         {
             base.Dispose(disposing);
 
-            if (!isDisposed)
+            if (!_isDisposed)
             {
                 if (disposing)
-                    subscription.Dispose();
+                    Disposable.TryDispose(ref _subscription);
 
-                isDisposed = true;
+                _isDisposed = true;
             }
         }
     }
-}
+}