ソースを参照

Merge pull request #1694 from AvaloniaUI/nonrx-expressionnode

Don't use rx for ExpressionNodes.
Steven Kirk 7 年 前
コミット
de9644fd0f

+ 0 - 5
src/Avalonia.Base/Data/Core/EmptyExpressionNode.cs

@@ -9,10 +9,5 @@ namespace Avalonia.Data.Core
     internal class EmptyExpressionNode : ExpressionNode
     {
         public override string Description => ".";
-
-        protected override IObservable<object> StartListeningCore(WeakReference reference)
-        {
-            return Observable.Return(reference.Target);
-        }
     }
 }

+ 58 - 62
src/Avalonia.Base/Data/Core/ExpressionNode.cs

@@ -2,22 +2,18 @@
 // Licensed under the MIT license. See licence.md file in the project root for full license information.
 
 using System;
-using System.Reactive.Disposables;
-using System.Reactive.Linq;
-using System.Reactive.Subjects;
-using Avalonia.Data;
 
 namespace Avalonia.Data.Core
 {
-    internal abstract class ExpressionNode : ISubject<object>
+    internal abstract class ExpressionNode
     {
         private static readonly object CacheInvalid = new object();
         protected static readonly WeakReference UnsetReference = 
             new WeakReference(AvaloniaProperty.UnsetValue);
 
         private WeakReference _target = UnsetReference;
-        private IDisposable _valueSubscription;
-        private IObserver<object> _observer;
+        private Action<object> _subscriber;
+        private bool _listening;
 
         protected WeakReference LastValue { get; private set; }
 
@@ -33,92 +29,66 @@ namespace Avalonia.Data.Core
 
                 var oldTarget = _target?.Target;
                 var newTarget = value.Target;
-                var running = _valueSubscription != null;
 
                 if (!ReferenceEquals(oldTarget, newTarget))
                 {
-                    _valueSubscription?.Dispose();
-                    _valueSubscription = null;
+                    if (_listening)
+                    {
+                        StopListening();
+                    }
+
                     _target = value;
 
-                    if (running)
+                    if (_subscriber != null)
                     {
-                        _valueSubscription = StartListening();
+                        StartListening();
                     }
                 }
             }
         }
 
-        public IDisposable Subscribe(IObserver<object> observer)
+        public void Subscribe(Action<object> subscriber)
         {
-            if (_observer != null)
+            if (_subscriber != null)
             {
                 throw new AvaloniaInternalException("ExpressionNode can only be subscribed once.");
             }
 
-            _observer = observer;
-            var nextSubscription = Next?.Subscribe(this);
-            _valueSubscription = StartListening();
-
-            return Disposable.Create(() =>
-            {
-                _valueSubscription?.Dispose();
-                _valueSubscription = null;
-                LastValue = null;
-                nextSubscription?.Dispose();
-                _observer = null;
-            });
+            _subscriber = subscriber;
+            Next?.Subscribe(NextValueChanged);
+            StartListening();
         }
 
-        void IObserver<object>.OnCompleted()
+        public void Unsubscribe()
         {
-            throw new AvaloniaInternalException("ExpressionNode.OnCompleted should not be called.");
-        }
+            Next?.Unsubscribe();
 
-        void IObserver<object>.OnError(Exception error)
-        {
-            throw new AvaloniaInternalException("ExpressionNode.OnError should not be called.");
+            if (_listening)
+            {
+                StopListening();
+            }
+
+            LastValue = null;
+            _subscriber = null;
         }
 
-        void IObserver<object>.OnNext(object value)
+        protected virtual void StartListeningCore(WeakReference reference)
         {
-            NextValueChanged(value);
+            ValueChanged(reference.Target);
         }
 
-        protected virtual IObservable<object> StartListeningCore(WeakReference reference)
+        protected virtual void StopListeningCore()
         {
-            return Observable.Return(reference.Target);
         }
 
         protected virtual void NextValueChanged(object value)
         {
             var bindingBroken = BindingNotification.ExtractError(value) as MarkupBindingChainException;
             bindingBroken?.AddNode(Description);
-            _observer.OnNext(value);
-        }
-
-        private IDisposable StartListening()
-        {
-            var target = _target.Target;
-            IObservable<object> source;
-
-            if (target == null)
-            {
-                source = Observable.Return(TargetNullNotification());
-            }
-            else if (target == AvaloniaProperty.UnsetValue)
-            {
-                source = Observable.Empty<object>();
-            }
-            else
-            {
-                source = StartListeningCore(_target);
-            }
-
-            return source.Subscribe(ValueChanged);
+            _subscriber(value);
         }
 
-        private void ValueChanged(object value)
+        protected void ValueChanged(object value)
         {
             var notification = value as BindingNotification;
 
@@ -131,24 +101,50 @@ namespace Avalonia.Data.Core
                 }
                 else
                 {
-                    _observer.OnNext(value);
+                    _subscriber(value);
                 }
             }
             else
             {
                 LastValue = new WeakReference(notification.Value);
+
                 if (Next != null)
                 {
                     Next.Target = new WeakReference(notification.Value);
                 }
-                
+
                 if (Next == null || notification.Error != null)
                 {
-                    _observer.OnNext(value);
+                    _subscriber(value);
                 }
             }
         }
 
+        private void StartListening()
+        {
+            var target = _target.Target;
+
+            if (target == null)
+            {
+                ValueChanged(TargetNullNotification());
+                _listening = false;
+            }
+            else if (target != AvaloniaProperty.UnsetValue)
+            {
+                StartListeningCore(_target);
+                _listening = true;
+            }
+            else
+            {
+                _listening = false;
+            }
+        }
+
+        private void StopListening()
+        {
+            StopListeningCore();
+        }
+
         private BindingNotification TargetNullNotification()
         {
             return new BindingNotification(

+ 12 - 23
src/Avalonia.Base/Data/Core/ExpressionObserver.cs

@@ -14,9 +14,7 @@ namespace Avalonia.Data.Core
     /// <summary>
     /// Observes and sets the value of an expression on an object.
     /// </summary>
-    public class ExpressionObserver : LightweightObservableBase<object>,
-        IDescription,
-        IObserver<object>
+    public class ExpressionObserver : LightweightObservableBase<object>, IDescription
     {
         /// <summary>
         /// An ordered collection of property accessor plugins that can be used to customize
@@ -55,7 +53,6 @@ namespace Avalonia.Data.Core
 
         private static readonly object UninitializedValue = new object();
         private readonly ExpressionNode _node;
-        private IDisposable _nodeSubscription;
         private object _root;
         private IDisposable _rootSubscription;
         private WeakReference<object> _value;
@@ -202,34 +199,18 @@ namespace Avalonia.Data.Core
             }
         }
 
-        void IObserver<object>.OnNext(object value)
-        {
-            var broken = BindingNotification.ExtractError(value) as MarkupBindingChainException;
-            broken?.Commit(Description);
-            _value = new WeakReference<object>(value);
-            PublishNext(value);
-        }
-
-        void IObserver<object>.OnCompleted()
-        {
-        }
-
-        void IObserver<object>.OnError(Exception error)
-        {
-        }
-
         protected override void Initialize()
         {
             _value = null;
-            _nodeSubscription = _node.Subscribe(this);
+            _node.Subscribe(ValueChanged);
             StartRoot();
         }
 
         protected override void Deinitialize()
         {
             _rootSubscription?.Dispose();
-            _nodeSubscription?.Dispose();
-            _rootSubscription = _nodeSubscription = null;
+            _rootSubscription = null;
+            _node.Unsubscribe();
         }
 
         protected override void Subscribed(IObserver<object> observer, bool first)
@@ -266,5 +247,13 @@ namespace Avalonia.Data.Core
                 _node.Target = (WeakReference)_root;
             }
         }
+
+        private void ValueChanged(object value)
+        {
+            var broken = BindingNotification.ExtractError(value) as MarkupBindingChainException;
+            broken?.Commit(Description);
+            _value = new WeakReference<object>(value);
+            PublishNext(value);
+        }
     }
 }

+ 9 - 2
src/Avalonia.Base/Data/Core/IndexerNode.cs

@@ -17,6 +17,8 @@ namespace Avalonia.Data.Core
 {
     internal class IndexerNode :  SettableNode
     {
+        private IDisposable _subscription;
+
         public IndexerNode(IList<string> arguments)
         {
             Arguments = arguments;
@@ -24,7 +26,7 @@ namespace Avalonia.Data.Core
 
         public override string Description => "[" + string.Join(",", Arguments) + "]";
 
-        protected override IObservable<object> StartListeningCore(WeakReference reference)
+        protected override void StartListeningCore(WeakReference reference)
         {
             var target = reference.Target;
             var incc = target as INotifyCollectionChanged;
@@ -49,7 +51,12 @@ namespace Avalonia.Data.Core
                     .Select(_ => GetValue(target)));
             }
 
-            return Observable.Merge(inputs).StartWith(GetValue(target));
+            _subscription = Observable.Merge(inputs).StartWith(GetValue(target)).Subscribe(ValueChanged);
+        }
+
+        protected override void StopListeningCore()
+        {
+            _subscription.Dispose();
         }
 
         protected override bool SetTargetValueCore(object value, BindingPriority priority)

+ 5 - 5
src/Avalonia.Base/Data/Core/Plugins/AvaloniaPropertyAccessorPlugin.cs

@@ -145,15 +145,15 @@ namespace Avalonia.Data.Core.Plugins
                 return false;
             }
 
-            protected override void Dispose(bool disposing)
+            protected override void SubscribeCore()
             {
-                _subscription?.Dispose();
-                _subscription = null;
+                _subscription = Instance?.GetObservable(_property).Subscribe(PublishValue);
             }
 
-            protected override void SubscribeCore(IObserver<object> observer)
+            protected override void UnsubscribeCore()
             {
-                _subscription = Instance?.GetObservable(_property).Subscribe(observer);
+                _subscription?.Dispose();
+                _subscription = null;
             }
         }
     }

+ 5 - 5
src/Avalonia.Base/Data/Core/Plugins/DataValidatiorBase.cs

@@ -55,13 +55,13 @@ namespace Avalonia.Data.Core.Plugins
         /// <param name="value">The value.</param>
         void IObserver<object>.OnNext(object value) => InnerValueChanged(value);
 
-        /// <inheritdoc/>
-        protected override void Dispose(bool disposing) => _inner.Dispose();
-
         /// <summary>
         /// Begins listening to the inner <see cref="IPropertyAccessor"/>.
         /// </summary>
-        protected override void SubscribeCore(IObserver<object> observer) => _inner.Subscribe(this);
+        protected override void SubscribeCore() => _inner.Subscribe(InnerValueChanged);
+
+        /// <inheritdoc/>
+        protected override void UnsubscribeCore() => _inner.Dispose();
 
         /// <summary>
         /// Called when the inner <see cref="IPropertyAccessor"/> notifies with a new value.
@@ -74,7 +74,7 @@ namespace Avalonia.Data.Core.Plugins
         protected virtual void InnerValueChanged(object value)
         {
             var notification = value as BindingNotification ?? new BindingNotification(value);
-            Observer.OnNext(notification);
+            PublishValue(notification);
         }
     }
 }

+ 2 - 3
src/Avalonia.Base/Data/Core/Plugins/ExceptionValidationPlugin.cs

@@ -1,7 +1,6 @@
 // Copyright (c) The Avalonia Project. All rights reserved.
 // Licensed under the MIT license. See licence.md file in the project root for full license information.
 
-using Avalonia.Data;
 using System;
 using System.Reflection;
 
@@ -36,11 +35,11 @@ namespace Avalonia.Data.Core.Plugins
                 }
                 catch (TargetInvocationException ex)
                 {
-                    Observer.OnNext(new BindingNotification(ex.InnerException, BindingErrorType.DataValidationError));
+                    PublishValue(new BindingNotification(ex.InnerException, BindingErrorType.DataValidationError));
                 }
                 catch (Exception ex)
                 {
-                    Observer.OnNext(new BindingNotification(ex, BindingErrorType.DataValidationError));
+                    PublishValue(new BindingNotification(ex, BindingErrorType.DataValidationError));
                 }
 
                 return false;

+ 12 - 2
src/Avalonia.Base/Data/Core/Plugins/IPropertyAccessor.cs

@@ -2,7 +2,6 @@
 // Licensed under the MIT license. See licence.md file in the project root for full license information.
 
 using System;
-using Avalonia.Data;
 
 namespace Avalonia.Data.Core.Plugins
 {
@@ -10,7 +9,7 @@ namespace Avalonia.Data.Core.Plugins
     /// Defines an accessor to a property on an object returned by a 
     /// <see cref="IPropertyAccessorPlugin"/>
     /// </summary>
-    public interface IPropertyAccessor : IObservable<object>, IDisposable
+    public interface IPropertyAccessor : IDisposable
     {
         /// <summary>
         /// Gets the type of the property.
@@ -38,5 +37,16 @@ namespace Avalonia.Data.Core.Plugins
         /// True if the property was set; false if the property could not be set.
         /// </returns>
         bool SetValue(object value, BindingPriority priority);
+
+        /// <summary>
+        /// Subscribes to the value of the member.
+        /// </summary>
+        /// <param name="listener">A method that receives the values.</param>
+        void Subscribe(Action<object> listener);
+
+        /// <summary>
+        /// Unsubscribes to the value of the member.
+        /// </summary>
+        void Unsubscribe();
     }
 }

+ 9 - 10
src/Avalonia.Base/Data/Core/Plugins/IndeiValidationPlugin.cs

@@ -5,7 +5,6 @@ using System;
 using System.Collections.Generic;
 using System.ComponentModel;
 using System.Linq;
-using Avalonia.Data;
 using Avalonia.Utilities;
 
 namespace Avalonia.Data.Core.Plugins
@@ -40,43 +39,43 @@ namespace Avalonia.Data.Core.Plugins
             {
                 if (e.PropertyName == _name || string.IsNullOrEmpty(e.PropertyName))
                 {
-                    Observer.OnNext(CreateBindingNotification(Value));
+                    PublishValue(CreateBindingNotification(Value));
                 }
             }
 
-            protected override void Dispose(bool disposing)
+            protected override void SubscribeCore()
             {
-                base.Dispose(disposing);
-
                 var target = _reference.Target as INotifyDataErrorInfo;
 
                 if (target != null)
                 {
-                    WeakSubscriptionManager.Unsubscribe(
+                    WeakSubscriptionManager.Subscribe(
                         target,
                         nameof(target.ErrorsChanged),
                         this);
                 }
+
+                base.SubscribeCore();
             }
 
-            protected override void SubscribeCore(IObserver<object> observer)
+            protected override void UnsubscribeCore()
             {
                 var target = _reference.Target as INotifyDataErrorInfo;
 
                 if (target != null)
                 {
-                    WeakSubscriptionManager.Subscribe(
+                    WeakSubscriptionManager.Unsubscribe(
                         target,
                         nameof(target.ErrorsChanged),
                         this);
                 }
 
-                base.SubscribeCore(observer);
+                base.UnsubscribeCore();
             }
 
             protected override void InnerValueChanged(object value)
             {
-                base.InnerValueChanged(CreateBindingNotification(value));
+                PublishValue(CreateBindingNotification(value));
             }
 
             private BindingNotification CreateBindingNotification(object value)

+ 8 - 8
src/Avalonia.Base/Data/Core/Plugins/InpcPropertyAccessorPlugin.cs

@@ -103,7 +103,13 @@ namespace Avalonia.Data.Core.Plugins
                 }
             }
 
-            protected override void Dispose(bool disposing)
+            protected override void SubscribeCore()
+            {
+                SendCurrentValue();
+                SubscribeToChanges();
+            }
+
+            protected override void UnsubscribeCore()
             {
                 var inpc = _reference.Target as INotifyPropertyChanged;
 
@@ -116,18 +122,12 @@ namespace Avalonia.Data.Core.Plugins
                 }
             }
 
-            protected override void SubscribeCore(IObserver<object> observer)
-            {
-                SendCurrentValue();
-                SubscribeToChanges();
-            }
-
             private void SendCurrentValue()
             {
                 try
                 {
                     var value = Value;
-                    Observer.OnNext(value);
+                    PublishValue(value);
                 }
                 catch { }
             }

+ 6 - 2
src/Avalonia.Base/Data/Core/Plugins/MethodAccessorPlugin.cs

@@ -74,14 +74,18 @@ namespace Avalonia.Data.Core.Plugins
 
             public override bool SetValue(object value, BindingPriority priority) => false;
 
-            protected override void SubscribeCore(IObserver<object> observer)
+            protected override void SubscribeCore()
             {
                 try
                 {
-                    Observer.OnNext(Value);
+                    PublishValue(Value);
                 }
                 catch { }
             }
+
+            protected override void UnsubscribeCore()
+            {
+            }
         }
     }
 }

+ 38 - 30
src/Avalonia.Base/Data/Core/Plugins/PropertyAccessorBase.cs

@@ -2,67 +2,75 @@
 // Licensed under the MIT license. See licence.md file in the project root for full license information.
 
 using System;
-using Avalonia.Data;
 
 namespace Avalonia.Data.Core.Plugins
 {
     /// <summary>
     /// Defines a default base implementation for a <see cref="IPropertyAccessor"/>.
     /// </summary>
-    /// <remarks>
-    /// <see cref="IPropertyAccessor"/> is an observable that will only be subscribed to one time.
-    /// In addition, the subscription can be disposed by calling <see cref="Dispose()"/> on the
-    /// property accessor itself - this prevents needing to hold two references for a subscription.
-    /// </remarks>
     public abstract class PropertyAccessorBase : IPropertyAccessor
     {
+        private Action<object> _listener;
+
         /// <inheritdoc/>
         public abstract Type PropertyType { get; }
 
         /// <inheritdoc/>
         public abstract object Value { get; }
 
-        /// <summary>
-        /// Stops the subscription.
-        /// </summary>
-        public void Dispose() => Dispose(true);
+        /// <inheritdoc/>
+        public void Dispose()
+        {
+            if (_listener != null)
+            {
+                Unsubscribe();
+            }
+        }
 
         /// <inheritdoc/>
         public abstract bool SetValue(object value, BindingPriority priority);
 
-        /// <summary>
-        /// The currently subscribed observer.
-        /// </summary>
-        protected IObserver<object> Observer { get; private set; }
-
         /// <inheritdoc/>
-        public IDisposable Subscribe(IObserver<object> observer)
+        public void Subscribe(Action<object> listener)
         {
-            Contract.Requires<ArgumentNullException>(observer != null);
+            Contract.Requires<ArgumentNullException>(listener != null);
 
-            if (Observer != null)
+            if (_listener != null)
             {
                 throw new InvalidOperationException(
-                    "A property accessor can be subscribed to only once.");
+                    "A member accessor can be subscribed to only once.");
             }
 
-            Observer = observer;
-            SubscribeCore(observer);
-            return this;
+            _listener = listener;
+            SubscribeCore();
         }
 
+        public void Unsubscribe()
+        {
+            if (_listener == null)
+            {
+                throw new InvalidOperationException(
+                    "The member accessor was not subscribed.");
+            }
+
+            UnsubscribeCore();
+            _listener = null;
+        }
+
+        /// <summary>
+        /// Publishes a value to the listener.
+        /// </summary>
+        /// <param name="value">The value.</param>
+        protected void PublishValue(object value) => _listener?.Invoke(value);
+
         /// <summary>
-        /// Stops listening to the property.
+        /// When overridden in a derived class, begins listening to the member.
         /// </summary>
-        /// <param name="disposing">
-        /// True if the <see cref="Dispose()"/> method was called, false if the object is being
-        /// finalized.
-        /// </param>
-        protected virtual void Dispose(bool disposing) => Observer = null;
+        protected abstract void SubscribeCore();
 
         /// <summary>
-        /// When overridden in a derived class, begins listening to the property.
+        /// When overridden in a derived class, stops listening to the member.
         /// </summary>
-        protected abstract void SubscribeCore(IObserver<object> observer);
+        protected abstract void UnsubscribeCore();
     }
 }

+ 6 - 5
src/Avalonia.Base/Data/Core/Plugins/PropertyError.cs

@@ -1,6 +1,4 @@
 using System;
-using System.Reactive.Disposables;
-using Avalonia.Data;
 
 namespace Avalonia.Data.Core.Plugins
 {
@@ -37,10 +35,13 @@ namespace Avalonia.Data.Core.Plugins
             return false;
         }
 
-        public IDisposable Subscribe(IObserver<object> observer)
+        public void Subscribe(Action<object> listener)
+        {
+            listener(_error);
+        }
+
+        public void Unsubscribe()
         {
-            observer.OnNext(_error);
-            return Disposable.Empty;
         }
     }
 }

+ 15 - 14
src/Avalonia.Base/Data/Core/PropertyAccessorNode.cs

@@ -3,9 +3,7 @@
 
 using System;
 using System.Linq;
-using System.Reactive.Disposables;
 using System.Reactive.Linq;
-using Avalonia.Data;
 using Avalonia.Data.Core.Plugins;
 
 namespace Avalonia.Data.Core
@@ -39,7 +37,7 @@ namespace Avalonia.Data.Core
             return false;
         }
 
-        protected override IObservable<object> StartListeningCore(WeakReference reference)
+        protected override void StartListeningCore(WeakReference reference)
         {
             var plugin = ExpressionObserver.PropertyAccessors.FirstOrDefault(x => x.Match(reference.Target, PropertyName));
             var accessor = plugin?.Start(reference, PropertyName);
@@ -55,17 +53,20 @@ namespace Avalonia.Data.Core
                 }
             }
 
-            // Ensure that _accessor is set for the duration of the subscription.
-            return Observable.Using(
-                () =>
-                {
-                    _accessor = accessor;
-                    return Disposable.Create(() =>
-                    {
-                        _accessor = null;
-                    });
-                },
-                _ => accessor);
+            if (accessor == null)
+            {
+                throw new NotSupportedException(
+                    $"Could not find a matching property accessor for {PropertyName}.");
+            }
+
+            accessor.Subscribe(ValueChanged);
+            _accessor = accessor;
+        }
+
+        protected override void StopListeningCore()
+        {
+            _accessor.Dispose();
+            _accessor = null;
         }
     }
 }

+ 12 - 5
src/Avalonia.Base/Data/Core/StreamNode.cs

@@ -2,30 +2,37 @@
 // Licensed under the MIT license. See licence.md file in the project root for full license information.
 
 using System;
-using System.Globalization;
-using Avalonia.Data;
 using System.Reactive.Linq;
 
 namespace Avalonia.Data.Core
 {
     internal class StreamNode : ExpressionNode
     {
+        private IDisposable _subscription;
+
         public override string Description => "^";
 
-        protected override IObservable<object> StartListeningCore(WeakReference reference)
+        protected override void StartListeningCore(WeakReference reference)
         {
             foreach (var plugin in ExpressionObserver.StreamHandlers)
             {
                 if (plugin.Match(reference))
                 {
-                    return plugin.Start(reference);
+                    _subscription = plugin.Start(reference).Subscribe(ValueChanged);
+                    return;
                 }
             }
 
             // TODO: Improve error.
-            return Observable.Return(new BindingNotification(
+            ValueChanged(new BindingNotification(
                 new MarkupBindingChainException("Stream operator applied to unsupported type", Description),
                 BindingErrorType.Error));
         }
+
+        protected override void StopListeningCore()
+        {
+            _subscription?.Dispose();
+            _subscription = null;
+        }
     }
 }

+ 2 - 3
tests/Avalonia.Base.UnitTests/Data/Core/Plugins/IndeiValidationPluginTests.cs

@@ -4,7 +4,6 @@
 using System;
 using System.Collections;
 using System.Collections.Generic;
-using System.Reactive.Linq;
 using Avalonia.Data;
 using Avalonia.Data.Core.Plugins;
 using Xunit;
@@ -58,9 +57,9 @@ namespace Avalonia.Base.UnitTests.Data.Core.Plugins
             var validator = validatorPlugin.Start(new WeakReference(data), nameof(data.Value), accessor);
 
             Assert.Equal(0, data.ErrorsChangedSubscriptionCount);
-            var sub = validator.Subscribe(_ => { });
+            validator.Subscribe(_ => { });
             Assert.Equal(1, data.ErrorsChangedSubscriptionCount);
-            sub.Dispose();
+            validator.Unsubscribe();
             Assert.Equal(0, data.ErrorsChangedSubscriptionCount);
         }