瀏覽代碼

Reduce unnecessary allocations on the receiving side

Lucas Trzesniewski 7 年之前
父節點
當前提交
13f10a6ce3

+ 10 - 4
src/Abc.Zebus/Dispatch/DynamicMessageHandlerInvoker.cs

@@ -17,11 +17,11 @@ namespace Abc.Zebus.Dispatch
             _predicates = bindingKeys.Select(x => predicateBuilder.GetPredicate(messageType, x)).ToList();
         }
 
-        class DummyHandler : IMessageHandler<IMessage>
+        private class DummyHandler : IMessageHandler<IMessage>
         {
             public void Handle(IMessage message)
             {
-                throw new NotImplementedException("This handler is only used to provide the base class with a valid implementation of IMessageHandler and is never actually used");
+                throw new NotSupportedException("This handler is only used to provide the base class with a valid implementation of IMessageHandler and is never actually used");
             }
         }
 
@@ -35,7 +35,13 @@ namespace Abc.Zebus.Dispatch
 
         public override bool ShouldHandle(IMessage message)
         {
-            return _predicates.Any(predicate => predicate(message));
+            foreach (var predicate in _predicates)
+            {
+                if (predicate(message))
+                    return true;
+            }
+
+            return false;
         }
     }
-}
+}

+ 5 - 4
src/Abc.Zebus/Dispatch/EventHandlerInvoker`1.cs

@@ -2,7 +2,8 @@
 
 namespace Abc.Zebus.Dispatch
 {
-    public class DynamicMessageHandlerInvoker<T> : MessageHandlerInvoker where T : class, IMessage
+    public class DynamicMessageHandlerInvoker<T> : MessageHandlerInvoker
+        where T : class, IMessage
     {
         private readonly Action<T> _handler;
 
@@ -12,11 +13,11 @@ namespace Abc.Zebus.Dispatch
             _handler = handler;
         }
 
-        class DummyHandler : IMessageHandler<T>
+        private class DummyHandler : IMessageHandler<T>
         {
             public void Handle(T message)
             {
-                throw new NotImplementedException("This handler is only used to provide the base class with a valid implementation of IMessageHandler and is never actually used");
+                throw new NotSupportedException("This handler is only used to provide the base class with a valid implementation of IMessageHandler and is never actually used");
             }
         }
 
@@ -28,4 +29,4 @@ namespace Abc.Zebus.Dispatch
             }
         }
     }
-}
+}

+ 3 - 2
src/Abc.Zebus/Dispatch/MessageDispatch.cs

@@ -6,8 +6,9 @@ namespace Abc.Zebus.Dispatch
 {
     public class MessageDispatch
     {
+        private static readonly object _exceptionsLock = new object();
+
         private readonly Action<MessageDispatch, DispatchResult> _continuation;
-        private readonly object _exceptionsLock = new object();
         private Dictionary<Type, Exception> _exceptions;
         private int _remainingHandlerCount;
 
@@ -55,4 +56,4 @@ namespace Abc.Zebus.Dispatch
             _remainingHandlerCount = handlerCount;
         }
     }
-}
+}

+ 4 - 0
src/Abc.Zebus/Dispatch/Pipes/PipeInvocation.cs

@@ -57,6 +57,9 @@ namespace Abc.Zebus.Dispatch.Pipes
 
         private object[] BeforeInvoke()
         {
+            if (_pipes.Count == 0)
+                return ArrayUtil.Empty<object>();
+
             var stateRef = new BeforeInvokeArgs.StateRef();
             var pipeStates = new object[_pipes.Count];
             for (var pipeIndex = 0; pipeIndex < _pipes.Count; ++pipeIndex)
@@ -65,6 +68,7 @@ namespace Abc.Zebus.Dispatch.Pipes
                 _pipes[pipeIndex].BeforeInvoke(beforeInvokeArgs);
                 pipeStates[pipeIndex] = beforeInvokeArgs.State;
             }
+
             return pipeStates;
         }
 

+ 12 - 18
src/Abc.Zebus/Dispatch/Pipes/PipeManager.cs

@@ -13,11 +13,13 @@ namespace Abc.Zebus.Dispatch.Pipes
         private readonly ConcurrentDictionary<Type, PipeList> _pipesByMessageType = new ConcurrentDictionary<Type, PipeList>();
         private readonly ConcurrentSet<string> _enabledPipeNames = new ConcurrentSet<string>();
         private readonly ConcurrentSet<string> _disabledPipeNames = new ConcurrentSet<string>();
+        private readonly Func<Type, PipeList> _createPipeList;
         private readonly IPipeSource[] _pipeSources;
 
         public PipeManager(IPipeSource[] pipeSources)
         {
             _pipeSources = pipeSources;
+            _createPipeList = CreatePipeList;
         }
 
         public void EnablePipe(string pipeName)
@@ -48,26 +50,18 @@ namespace Abc.Zebus.Dispatch.Pipes
             return new PipeInvocation(messageHandlerInvoker, messages, messageContext, pipes);
         }
 
-        public IEnumerable<IPipe> GetEnabledPipes(Type messageHandlerType)
-        {
-            return GetPipeList(messageHandlerType).EnabledPipes;
-        }
+        public IEnumerable<IPipe> GetEnabledPipes(Type messageHandlerType) 
+            => GetPipeList(messageHandlerType).EnabledPipes;
 
-        private PipeList GetPipeList(Type messageHandlerType)
-        {
-            return _pipesByMessageType.GetOrAdd(messageHandlerType, CreatePipeListEntry);
-        }
+        private PipeList GetPipeList(Type messageHandlerType) 
+            => _pipesByMessageType.GetOrAdd(messageHandlerType, _createPipeList);
 
-        private PipeList CreatePipeListEntry(Type handlerType)
-        {
-            var pipes = _pipeSources.SelectMany(x => x.GetPipes(handlerType));
-            return new PipeList(this, pipes);
-        }
+        private PipeList CreatePipeList(Type handlerType) 
+            => new PipeList(this, _pipeSources.SelectMany(x => x.GetPipes(handlerType)));
 
-        private bool IsPipeEnabled(IPipe pipe)
-        {
-            return !_disabledPipeNames.Contains(pipe.Name) && (pipe.IsAutoEnabled || _enabledPipeNames.Contains(pipe.Name));
-        }
+        private bool IsPipeEnabled(IPipe pipe) 
+            => !_disabledPipeNames.Contains(pipe.Name)
+               && (pipe.IsAutoEnabled || _enabledPipeNames.Contains(pipe.Name));
 
         private class PipeList
         {
@@ -90,4 +84,4 @@ namespace Abc.Zebus.Dispatch.Pipes
             }
         }
     }
-}
+}

+ 20 - 5
src/Abc.Zebus/MessageContext.cs

@@ -1,4 +1,5 @@
 using System;
+using System.Threading;
 using Abc.Zebus.Transport;
 using Abc.Zebus.Util;
 
@@ -33,11 +34,7 @@ namespace Abc.Zebus
         public static MessageContext Current => _current;
 
         public static IDisposable SetCurrent(MessageContext context)
-        {
-            var previous = _current;
-            _current = context;
-            return new DisposableAction(() => _current = previous);
-        }
+            => new MessageContextScope(context);
 
         public static IDisposable OverrideInitiatorUsername(string username)
         {
@@ -111,5 +108,23 @@ namespace Abc.Zebus
 
             return new ErrorStatus(ReplyCode, ReplyMessage);
         }
+
+        private class MessageContextScope : IDisposable
+        {
+            private readonly MessageContext _previous;
+            private int _disposed;
+
+            public MessageContextScope(MessageContext context)
+            {
+                _previous = _current;
+                _current = context;
+            }
+
+            public void Dispose()
+            {
+                if (Interlocked.Exchange(ref _disposed, 1) == 0)
+                    _current = _previous;
+            }
+        }
     }
 }

+ 10 - 5
src/Abc.Zebus/Transport/ZmqInboundSocket.cs

@@ -17,6 +17,7 @@ namespace Abc.Zebus.Transport
         private byte[] _readBuffer = new byte[0];
         private ZmqSocket _socket;
         private ZmqEndPoint _endPoint;
+        private TimeSpan _lastReceiveTimeout = TimeSpan.MinValue;
 
         public ZmqInboundSocket(ZmqContext context, PeerId peerId, ZmqEndPoint originalEndpoint, ZmqSocketOptions options, string environment)
         {
@@ -50,10 +51,14 @@ namespace Abc.Zebus.Transport
 
         public CodedInputStream Receive(TimeSpan? timeout = null)
         {
-            int size;
-
-            _socket.ReceiveTimeout = timeout ?? _options.ReadTimeout;
-            _readBuffer = _socket.Receive(_readBuffer, TimeSpan.MaxValue, out size);
+            var effectiveTimeout = timeout ?? _options.ReadTimeout;
+            if (effectiveTimeout != _lastReceiveTimeout)
+            {
+                _socket.ReceiveTimeout = effectiveTimeout; // This stuff allocates
+                _lastReceiveTimeout = effectiveTimeout;
+            }
+            
+            _readBuffer = _socket.Receive(_readBuffer, TimeSpan.MaxValue, out var size);
 
             if (size <= 0)
                 return null;
@@ -79,4 +84,4 @@ namespace Abc.Zebus.Transport
             _socket.Disconnect(_endPoint.Value);
         }
     }
-}
+}

+ 2 - 8
src/Abc.Zebus/Util/Extensions/ExtendIEnumerable.cs

@@ -34,14 +34,8 @@ namespace Abc.Zebus.Util.Extensions
         }
 
         [Pure]
-        public static IList<T> AsList<T>([InstantHandle] this IEnumerable<T> collection)
-        {
-            var list = collection as IList<T>;
-            if (list == null || list.IsReadOnly || list is T[])
-                return collection.ToList();
-
-            return list;
-        }
+        public static IList<T> AsList<T>([InstantHandle] this IEnumerable<T> collection) 
+            => collection is IList<T> list ? list : collection.ToList();
 
         [Pure]
         public static IEnumerable<TSource> DistinctBy<TSource, TKey>(this IEnumerable<TSource> source, Func<TSource, TKey> keySelector)