소스 검색

Use a dedicated implementation of IObserver instead of passing three delegates to Subscribe.

Daniel C. Weber 7 년 전
부모
커밋
33c85e06e2
1개의 변경된 파일62개의 추가작업 그리고 33개의 파일을 삭제
  1. 62 33
      Rx.NET/Source/src/System.Reactive/EventPatternSourceBase.cs

+ 62 - 33
Rx.NET/Source/src/System.Reactive/EventPatternSourceBase.cs

@@ -15,6 +15,66 @@ namespace System.Reactive
     /// <typeparam name="TEventArgs">The type of the event data generated by the event.</typeparam>
     public abstract class EventPatternSourceBase<TSender, TEventArgs>
     {
+        private sealed class Observer : ObserverBase<EventPattern<TSender, TEventArgs>>, ISafeObserver<EventPattern<TSender, TEventArgs>>
+        {
+            private bool _isDone;
+            private bool _isAdded;
+            private readonly Delegate _handler;
+            private readonly object _gate = new object();
+            private readonly Action<TSender, TEventArgs> _invoke;
+            private readonly EventPatternSourceBase<TSender, TEventArgs> _sourceBase;
+
+            public Observer(EventPatternSourceBase<TSender, TEventArgs> sourceBase, Delegate handler, Action<TSender, TEventArgs> invoke)
+            {
+                _handler = handler;
+                _invoke = invoke;
+                _sourceBase = sourceBase;
+            }
+
+            protected override void OnNextCore(EventPattern<TSender, TEventArgs> value)
+            {
+                _sourceBase._invokeHandler(_invoke, value);
+            }
+
+            protected override void OnErrorCore(Exception error)
+            {
+                Remove();
+                error.Throw();
+            }
+
+            protected override void OnCompletedCore()
+            {
+                Remove();
+            }
+
+            private void Remove()
+            {
+                lock (_gate)
+                {
+                    if (_isAdded)
+                    {
+                        _sourceBase.Remove(_handler);
+                    }
+                    else
+                    {
+                        _isDone = true;
+                    }
+                }
+            }
+
+            public void SetResource(IDisposable resource)
+            {
+                lock (_gate)
+                {
+                    if (!_isDone)
+                    {
+                        _sourceBase.Add(_handler, resource);
+                        _isAdded = true;
+                    }
+                }
+            }
+        }
+
         private readonly IObservable<EventPattern<TSender, TEventArgs>> _source;
         private readonly Dictionary<Delegate, Stack<IDisposable>> _subscriptions;
         private readonly Action<Action<TSender, TEventArgs>, /*object,*/ EventPattern<TSender, TEventArgs>> _invokeHandler;
@@ -50,42 +110,11 @@ namespace System.Reactive
                 throw new ArgumentNullException(nameof(invoke));
             }
 
-            var gate = new object();
-            var isAdded = false;
-            var isDone = false;
-
-            var remove = new Action(() =>
-            {
-                lock (gate)
-                {
-                    if (isAdded)
-                    {
-                        Remove(handler);
-                    }
-                    else
-                    {
-                        isDone = true;
-                    }
-                }
-            });
-
+            var observer = new Observer(this, handler, invoke);
             //
             // [OK] Use of unsafe Subscribe: non-pretentious wrapper of an observable in an event; exceptions can occur during +=.
             //
-            var d = _source.Subscribe/*Unsafe*/(
-                x => _invokeHandler(invoke, /*this,*/ x),
-                ex => { remove(); ex.Throw(); },
-                remove
-            );
-
-            lock (gate)
-            {
-                if (!isDone)
-                {
-                    Add(handler, d);
-                    isAdded = true;
-                }
-            }
+            observer.SetResource(_source.Subscribe(observer));
         }
 
         private void Add(Delegate handler, IDisposable disposable)