浏览代码

Adding IScheduledObserver for future refactoring.

Bart De Smet 10 年之前
父节点
当前提交
00793558e4

+ 7 - 1
Rx.NET/Source/System.Reactive.Core/Reactive/Internal/ScheduledObserver.cs

@@ -11,7 +11,7 @@ namespace System.Reactive
     using System.Collections.Concurrent;
     using System.Diagnostics;
 
-    internal class ScheduledObserver<T> : ObserverBase<T>, IDisposable
+    internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
     {
         private volatile int _state = 0;
         private const int STOPPED = 0;
@@ -442,4 +442,10 @@ namespace System.Reactive
             }
         }
     }
+
+    interface IScheduledObserver<T> : IObserver<T>, IDisposable
+    {
+        void EnsureActive();
+        void EnsureActive(int count);
+    }
 }

+ 11 - 11
Rx.NET/Source/System.Reactive.Linq/Reactive/Subjects/ReplaySubject.cs

@@ -257,7 +257,7 @@ namespace System.Reactive.Subjects
             private bool _isStopped;
             private Exception _error;
 
-            private ImmutableList<ScheduledObserver<T>> _observers;
+            private ImmutableList<IScheduledObserver<T>> _observers;
             private bool _isDisposed;
 
             private readonly object _gate = new object();
@@ -280,7 +280,7 @@ namespace System.Reactive.Subjects
                 _isStopped = false;
                 _error = null;
 
-                _observers = ImmutableList<ScheduledObserver<T>>.Empty;
+                _observers = ImmutableList<IScheduledObserver<T>>.Empty;
             }
 
             public ReplayByTime(int bufferSize, TimeSpan window)
@@ -327,7 +327,7 @@ namespace System.Reactive.Subjects
 
             public void OnNext(T value)
             {
-                var o = default(ScheduledObserver<T>[]);
+                var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
                 {
                     CheckDisposed();
@@ -354,7 +354,7 @@ namespace System.Reactive.Subjects
                 if (error == null)
                     throw new ArgumentNullException("error");
 
-                var o = default(ScheduledObserver<T>[]);
+                var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
                 {
                     CheckDisposed();
@@ -370,7 +370,7 @@ namespace System.Reactive.Subjects
                         foreach (var observer in o)
                             observer.OnError(error);
 
-                        _observers = ImmutableList<ScheduledObserver<T>>.Empty;
+                        _observers = ImmutableList<IScheduledObserver<T>>.Empty;
                     }
                 }
 
@@ -381,7 +381,7 @@ namespace System.Reactive.Subjects
 
             public void OnCompleted()
             {
-                var o = default(ScheduledObserver<T>[]);
+                var o = default(IScheduledObserver<T>[]);
                 lock (_gate)
                 {
                     CheckDisposed();
@@ -396,7 +396,7 @@ namespace System.Reactive.Subjects
                         foreach (var observer in o)
                             observer.OnCompleted();
 
-                        _observers = ImmutableList<ScheduledObserver<T>>.Empty;
+                        _observers = ImmutableList<IScheduledObserver<T>>.Empty;
                     }
                 }
 
@@ -462,7 +462,7 @@ namespace System.Reactive.Subjects
                 return subscription;
             }
 
-            private void Unsubscribe(ScheduledObserver<T> observer)
+            private void Unsubscribe(IScheduledObserver<T> observer)
             {
                 lock (_gate)
                 {
@@ -477,16 +477,16 @@ namespace System.Reactive.Subjects
 
             void IReplaySubjectImplementation.Unsubscribe(IObserver<T> observer)
             {
-                var so = (ScheduledObserver<T>)observer;
+                var so = (IScheduledObserver<T>)observer;
                 Unsubscribe(so);
             }
 
             sealed class RemovableDisposable : IDisposable
             {
                 private readonly ReplayByTime _subject;
-                private readonly ScheduledObserver<T> _observer;
+                private readonly IScheduledObserver<T> _observer;
 
-                public RemovableDisposable(ReplayByTime subject, ScheduledObserver<T> observer)
+                public RemovableDisposable(ReplayByTime subject, IScheduledObserver<T> observer)
                 {
                     _subject = subject;
                     _observer = observer;