Browse Source

Support collections for routing members (breaking change)

BindingKey is now only used to represent the routing key of a
subscription. The routing member values of routable messages
are stored in the new RoutingContent type.

This is a breaking change but it should not generate deployment
issues because RoutingContent is not serialized and is only used
locally.
Olivier Coanet 3 years ago
parent
commit
bdf4548ac2

+ 1 - 1
src/Abc.Zebus.Directory/PeerDirectoryServer.cs

@@ -45,7 +45,7 @@ namespace Abc.Zebus.Directory
         public IList<Peer> GetPeersHandlingMessage(MessageBinding messageBinding)
         public IList<Peer> GetPeersHandlingMessage(MessageBinding messageBinding)
         {
         {
             return _peerRepository.GetPeers(!_configuration.DisableDynamicSubscriptionsForDirectoryOutgoingMessages)
             return _peerRepository.GetPeers(!_configuration.DisableDynamicSubscriptionsForDirectoryOutgoingMessages)
-                                  .Where(peer => peer.Subscriptions != null && peer.Subscriptions.Any(x => x.MessageTypeId == messageBinding.MessageTypeId && x.Matches(messageBinding.RoutingKey)))
+                                  .Where(peer => peer.Subscriptions != null && peer.Subscriptions.Any(x => x.Matches(messageBinding)))
                                   .Select(peerDesc => peerDesc.Peer)
                                   .Select(peerDesc => peerDesc.Peer)
                                   .ToList();
                                   .ToList();
         }
         }

+ 80 - 0
src/Abc.Zebus.Tests/Directory/MessageBindingTests.cs

@@ -0,0 +1,80 @@
+using System;
+using Abc.Zebus.Directory;
+using Abc.Zebus.Routing;
+using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Tests.Messages;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Tests.Routing
+{
+    [TestFixture]
+    public class MessageBindingTests
+    {
+        [Test, SetCulture("FR-fr")]
+        public void should_create_message_binding_from_message()
+        {
+            var message = new FakeRoutableCommand(42.42m, "name", Guid.NewGuid());
+
+            var messageBinding = MessageBinding.FromMessage(message);
+
+            messageBinding.MessageTypeId.ShouldEqual(MessageUtil.TypeId<FakeRoutableCommand>());
+            messageBinding.RoutingContent.PartCount.ShouldEqual(3);
+            messageBinding.RoutingContent[0].ShouldEqual(new RoutingContentValue("42.42"));
+            messageBinding.RoutingContent[1].ShouldEqual(new RoutingContentValue("name"));
+            messageBinding.RoutingContent[2].ShouldEqual(new RoutingContentValue(message.OtherId.ToString()));
+        }
+
+        [Test, SetCulture("FR-fr")]
+        public void should_create_message_binding_from_message_with_collections()
+        {
+            var message = new FakeRoutableCommandWithCollection
+            {
+                Name = "X",
+                IdArray = new[] { 1, 2 },
+                ValueList = new() { 1.5m },
+            };
+
+            var messageBinding = MessageBinding.FromMessage(message);
+
+            messageBinding.MessageTypeId.ShouldEqual(MessageUtil.TypeId<FakeRoutableCommandWithCollection>());
+            messageBinding.RoutingContent.PartCount.ShouldEqual(3);
+            messageBinding.RoutingContent[0].ShouldEqual(new RoutingContentValue("X"));
+            messageBinding.RoutingContent[1].ShouldEqual(new RoutingContentValue(new[] { "1", "2" }));
+            messageBinding.RoutingContent[2].ShouldEqual(new RoutingContentValue(new[] { "1.5" }));
+        }
+
+        [Test]
+        public void should_create_message_binding_from_message_with_properties()
+        {
+            var message = new FakeRoutableCommandWithProperties { Id = 100, FeedId = 200 };
+
+            var messageBinding = MessageBinding.FromMessage(message);
+
+            messageBinding.MessageTypeId.ShouldEqual(MessageUtil.TypeId<FakeRoutableCommandWithProperties>());
+            messageBinding.RoutingContent.PartCount.ShouldEqual(2);
+            messageBinding.RoutingContent[0].ShouldEqual(new RoutingContentValue("100"));
+            messageBinding.RoutingContent[1].ShouldEqual(new RoutingContentValue("200"));
+        }
+
+        [Test]
+        public void should_ignore_null_members()
+        {
+            var message = new FakeRoutableCommand(0, null);
+
+            var messageBinding = MessageBinding.FromMessage(message);
+
+            messageBinding.RoutingContent[0].ShouldEqual(new RoutingContentValue("0"));
+            messageBinding.RoutingContent[1].ShouldEqual(new RoutingContentValue((string)null));
+        }
+
+        [Routable]
+        public class FakeRoutableCommandWithProperties : ICommand
+        {
+            [RoutingPosition(1)]
+            public int Id { get; set; }
+
+            [RoutingPosition(2)]
+            public int FeedId { get; set; }
+        }
+    }
+}

+ 0 - 79
src/Abc.Zebus.Tests/Directory/PeerSubscriptionList.cs

@@ -1,79 +0,0 @@
-using System.Collections.Generic;
-using System.Linq;
-using System.Threading;
-using Abc.Zebus.Routing;
-using Abc.Zebus.Util.Extensions;
-
-namespace Abc.Zebus.Tests.Directory
-{
-    /// <summary>
-    /// Old container for peer subscriptions, now replaced by PeerSubscriptionTree.
-    /// It is kept for performance comparison purposes.
-    /// </summary>
-    internal class PeerSubscriptionList
-    {
-        private List<PeerSubscription> _dynamicPeerSubscriptions = new List<PeerSubscription>();
-        private List<Peer> _peersHandlingAllMessages = new List<Peer>();
-
-        public bool IsEmpty => _peersHandlingAllMessages.Count == 0 && _dynamicPeerSubscriptions.Count == 0;
-
-        public void Add(Peer peer, Subscription subscription)
-        {
-            UpdateCore(peer, subscription, true);
-        }
-
-        public IList<Peer> GetPeers(BindingKey routingKey)
-        {
-            if (_dynamicPeerSubscriptions.Count == 0)
-                return _peersHandlingAllMessages;
-
-            return _peersHandlingAllMessages
-                .Concat(_dynamicPeerSubscriptions.Where(x => x.Subscription.Matches(routingKey)).Select(i => i.Peer))
-                .DistinctBy(i => i.Id)
-                .ToList();
-        }
-
-        public void Remove(Peer peer, Subscription subscription)
-        {
-            UpdateCore(peer, subscription, false);
-        }
-
-        private void UpdateCore(Peer peer, Subscription subscription, bool isAddOrUpdate)
-        {
-            if (subscription.IsMatchingAllMessages)
-            {
-                var list = _peersHandlingAllMessages
-                    .Where(i => i.Id != peer.Id)
-                    .ToList();
-
-                if (isAddOrUpdate)
-                    list.Add(peer);
-
-                _peersHandlingAllMessages = list;
-            }
-            else
-            {
-                var list = _dynamicPeerSubscriptions
-                    .Where(item => item.Peer.Id != peer.Id || !Equals(item.Subscription, subscription))
-                    .ToList();
-
-                if (isAddOrUpdate)
-                    list.Add(new PeerSubscription(peer, subscription));
-
-                Interlocked.Exchange(ref _dynamicPeerSubscriptions, list);
-            }
-        }
-
-        private class PeerSubscription
-        {
-            public readonly Peer Peer;
-            public readonly Subscription Subscription;
-
-            public PeerSubscription(Peer peer, Subscription subscription)
-            {
-                Peer = peer;
-                Subscription = subscription;
-            }
-        }
-    }
-}

+ 4 - 28
src/Abc.Zebus.Tests/Directory/PeerSubscriptionTreeTests.Performance.cs

@@ -24,25 +24,13 @@ namespace Abc.Zebus.Tests.Directory
             Console.WriteLine("{0} subscriptions", subscriptions.Count);
             Console.WriteLine("{0} subscriptions", subscriptions.Count);
             Console.WriteLine();
             Console.WriteLine();
 
 
-            var subscriptionList = new PeerSubscriptionList();
             var subscriptionTree = new PeerSubscriptionTree();
             var subscriptionTree = new PeerSubscriptionTree();
             foreach (var peerSubscription in subscriptions)
             foreach (var peerSubscription in subscriptions)
             {
             {
-                subscriptionList.Add(peerSubscription.Item1, peerSubscription.Item2);
                 subscriptionTree.Add(peerSubscription.Item1, peerSubscription.Item2.BindingKey);
                 subscriptionTree.Add(peerSubscription.Item1, peerSubscription.Item2.BindingKey);
             }
             }
 
 
-            var bindingKey = new BindingKey(routingKey.Split('.'));
-
-            Console.WriteLine("{0} test -------------", subscriptionList.GetType().Name);
-            Console.WriteLine();
-            Measure.Execution(x =>
-            {
-                x.Iteration = 10000;
-                x.WarmUpIteration = 1000;
-                x.Action = _ => subscriptionList.GetPeers(bindingKey);
-            });
-            Console.WriteLine();
+            var routingContent = RoutingContent.FromValues(routingKey.Split('.'));
 
 
             Console.WriteLine("{0} test -------------", subscriptionTree.GetType().Name);
             Console.WriteLine("{0} test -------------", subscriptionTree.GetType().Name);
             Console.WriteLine();
             Console.WriteLine();
@@ -50,7 +38,7 @@ namespace Abc.Zebus.Tests.Directory
             {
             {
                 x.Iteration = 10000;
                 x.Iteration = 10000;
                 x.WarmUpIteration = 1000;
                 x.WarmUpIteration = 1000;
-                x.Action = _ => subscriptionTree.GetPeers(bindingKey);
+                x.Action = _ => subscriptionTree.GetPeers(routingContent);
             });
             });
         }
         }
 
 
@@ -61,32 +49,20 @@ namespace Abc.Zebus.Tests.Directory
             Console.WriteLine("{0} peers", peers.Count);
             Console.WriteLine("{0} peers", peers.Count);
             Console.WriteLine();
             Console.WriteLine();
 
 
-            var subscriptionList = new PeerSubscriptionList();
             var subscriptionTree = new PeerSubscriptionTree();
             var subscriptionTree = new PeerSubscriptionTree();
 
 
             foreach (var peer in peers)
             foreach (var peer in peers)
             {
             {
-                subscriptionList.Add(peer, Subscription.Any<FakeEvent>());
                 subscriptionTree.Add(peer, BindingKey.Empty);
                 subscriptionTree.Add(peer, BindingKey.Empty);
             }
             }
 
 
-            Console.WriteLine("{0} test -------------", subscriptionList.GetType().Name);
-            Console.WriteLine();
-            Measure.Execution(x =>
-            {
-                x.Iteration = 100000;
-                x.WarmUpIteration = 1000;
-                x.Action = _ => subscriptionList.GetPeers(BindingKey.Empty);
-            });
-            Console.WriteLine();
-
             Console.WriteLine("{0} test -------------", subscriptionTree.GetType().Name);
             Console.WriteLine("{0} test -------------", subscriptionTree.GetType().Name);
             Console.WriteLine();
             Console.WriteLine();
             Measure.Execution(x =>
             Measure.Execution(x =>
             {
             {
                 x.Iteration = 100000;
                 x.Iteration = 100000;
                 x.WarmUpIteration = 1000;
                 x.WarmUpIteration = 1000;
-                x.Action = _ => subscriptionTree.GetPeers(BindingKey.Empty);
+                x.Action = _ => subscriptionTree.GetPeers(RoutingContent.Empty);
             });
             });
         }
         }
 
 
@@ -101,4 +77,4 @@ namespace Abc.Zebus.Tests.Directory
                    select new Tuple<Peer, Subscription>(peer, subscription);
                    select new Tuple<Peer, Subscription>(peer, subscription);
         }
         }
     }
     }
-}
+}

+ 47 - 14
src/Abc.Zebus.Tests/Directory/PeerSubscriptionTreeTests.cs

@@ -25,7 +25,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, subscription);
             peerSubscriptionTree.Add(peer, subscription);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString(routingKey, '.'));
+            var matchingPeers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues(routingKey));
 
 
             // Assert
             // Assert
             matchingPeers.Single().ShouldEqual(peer);
             matchingPeers.Single().ShouldEqual(peer);
@@ -38,7 +38,7 @@ namespace Abc.Zebus.Tests.Directory
             var peer = new Peer(new PeerId("Abc.Testing.0"), "tcp://test:123");
             var peer = new Peer(new PeerId("Abc.Testing.0"), "tcp://test:123");
             peerSubscriptionTree.Add(peer, BindingKey.Empty);
             peerSubscriptionTree.Add(peer, BindingKey.Empty);
 
 
-            var matchingPeer = peerSubscriptionTree.GetPeers(BindingKey.Empty).ExpectedSingle();
+            var matchingPeer = peerSubscriptionTree.GetPeers(RoutingContent.Empty).ExpectedSingle();
             matchingPeer.Id.ShouldEqual(peer.Id);
             matchingPeer.Id.ShouldEqual(peer.Id);
         }
         }
 
 
@@ -49,7 +49,7 @@ namespace Abc.Zebus.Tests.Directory
             var peer = new Peer(new PeerId("Abc.Testing.0"), "tcp://test:123");
             var peer = new Peer(new PeerId("Abc.Testing.0"), "tcp://test:123");
             peerSubscriptionTree.Add(peer, new BindingKey("a"));
             peerSubscriptionTree.Add(peer, new BindingKey("a"));
 
 
-            var matchingPeer = peerSubscriptionTree.GetPeers(new BindingKey("a")).ExpectedSingle();
+            var matchingPeer = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("a")).ExpectedSingle();
             matchingPeer.Id.ShouldEqual(peer.Id);
             matchingPeer.Id.ShouldEqual(peer.Id);
         }
         }
 
 
@@ -65,7 +65,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, BindingKey.Empty);
             peerSubscriptionTree.Add(peer, BindingKey.Empty);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString(routingKey, '.'));
+            var matchingPeers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues(routingKey));
 
 
             // Assert
             // Assert
             matchingPeers.Single().ShouldEqual(peer);
             matchingPeers.Single().ShouldEqual(peer);
@@ -91,7 +91,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peerB, BindingKeyHelper.CreateFromString("foo.bar", '.'));
             peerSubscriptionTree.Add(peerB, BindingKeyHelper.CreateFromString("foo.bar", '.'));
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(BindingKey.Empty);
+            var matchingPeers = peerSubscriptionTree.GetPeers(RoutingContent.Empty);
 
 
             // Assert
             // Assert
             matchingPeers.ShouldBeEquivalentTo(peerA, peerB);
             matchingPeers.ShouldBeEquivalentTo(peerA, peerB);
@@ -99,7 +99,7 @@ namespace Abc.Zebus.Tests.Directory
 
 
         [TestCase("a.b.c")]
         [TestCase("a.b.c")]
         [TestCase("b.c.d")]
         [TestCase("b.c.d")]
-        public void stars_should_always_match_if_same_number_of_parts(string routingKey)
+        public void stars_should_always_match_if_same_number_of_parts(string routingKeys)
         {
         {
             // Arrange
             // Arrange
             var peerSubscriptionTree = new PeerSubscriptionTree();
             var peerSubscriptionTree = new PeerSubscriptionTree();
@@ -109,7 +109,8 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, subscription);
             peerSubscriptionTree.Add(peer, subscription);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString(routingKey, '.'));
+            var routingContent = RoutingContent.FromValues(routingKeys.Split('.'));
+            var matchingPeers = peerSubscriptionTree.GetPeers(routingContent);
 
 
             // Assert
             // Assert
             matchingPeers.Single().ShouldEqual(peer);
             matchingPeers.Single().ShouldEqual(peer);
@@ -129,7 +130,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, subscription);
             peerSubscriptionTree.Add(peer, subscription);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(new BindingKey("a", "b", "c"));
+            var matchingPeers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("a", "b", "c"));
 
 
             // Assert
             // Assert
             matchingPeers.Single().ShouldEqual(peer);
             matchingPeers.Single().ShouldEqual(peer);
@@ -147,7 +148,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, subscription);
             peerSubscriptionTree.Add(peer, subscription);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(new BindingKey("a", "b", "c"));
+            var matchingPeers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("a", "b", "c"));
 
 
             // Assert
             // Assert
             matchingPeers.Single().ShouldEqual(peer);
             matchingPeers.Single().ShouldEqual(peer);
@@ -188,7 +189,7 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, new BindingKey("123.456"));
             peerSubscriptionTree.Add(peer, new BindingKey("123.456"));
             peerSubscriptionTree.Add(peer, new BindingKey("123.456"));
             peerSubscriptionTree.Add(peer, new BindingKey("123.456"));
 
 
-            var peers = peerSubscriptionTree.GetPeers(BindingKey.Empty);
+            var peers = peerSubscriptionTree.GetPeers(RoutingContent.Empty);
             peers.Count.ShouldEqual(1);
             peers.Count.ShouldEqual(1);
 
 
             peerSubscriptionTree.Remove(peer, BindingKey.Empty);
             peerSubscriptionTree.Remove(peer, BindingKey.Empty);
@@ -227,19 +228,19 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer0, BindingKeyHelper.CreateFromString("*", '.'));
             peerSubscriptionTree.Add(peer0, BindingKeyHelper.CreateFromString("*", '.'));
 
 
             // Act - Assert
             // Act - Assert
-            var peers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString("b.1.c", '.'));
+            var peers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("b", "1", "c"));
             peers.Count.ShouldEqual(2);
             peers.Count.ShouldEqual(2);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer4);
             peers.ShouldContain(peer4);
 
 
-            peers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString("a.1", '.'));
+            peers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("a", "1"));
             peers.Count.ShouldEqual(4);
             peers.Count.ShouldEqual(4);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer3);
             peers.ShouldContain(peer3);
             peers.ShouldContain(peer8);
             peers.ShouldContain(peer8);
             peers.ShouldContain(peer9);
             peers.ShouldContain(peer9);
 
 
-            peers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString("a", '.'));
+            peers = peerSubscriptionTree.GetPeers(RoutingContent.FromValues("a"));
             peers.Count.ShouldEqual(3);
             peers.Count.ShouldEqual(3);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer1);
             peers.ShouldContain(peer7);
             peers.ShouldContain(peer7);
@@ -264,10 +265,42 @@ namespace Abc.Zebus.Tests.Directory
             peerSubscriptionTree.Add(peer, subscription);
             peerSubscriptionTree.Add(peer, subscription);
 
 
             // Act
             // Act
-            var matchingPeers = peerSubscriptionTree.GetPeers(BindingKeyHelper.CreateFromString(routingKey, '.'));
+            var routingContent = RoutingContent.FromValues(routingKey.Split('.'));
+            var matchingPeers = peerSubscriptionTree.GetPeers(routingContent);
 
 
             // Assert
             // Assert
             matchingPeers.ShouldBeEmpty();
             matchingPeers.ShouldBeEmpty();
         }
         }
+
+        // TODO: investigate this test case
+        // [TestCase("*", true)]
+        [TestCase("#", true)]
+        [TestCase("1.*", true)]
+        [TestCase("2.*", false)]
+        [TestCase("*.A", true)]
+        [TestCase("*.B", true)]
+        [TestCase("*.C", true)]
+        [TestCase("*.D", false)]
+        [TestCase("1.A", true)]
+        [TestCase("2.A", false)]
+        public void should_match_collection(string bindingKey, bool isMatchExpected)
+        {
+            // Arrange
+            var peerSubscriptionTree = new PeerSubscriptionTree();
+            var peer = new Peer(new PeerId("Testing.0"), "E");
+
+            peerSubscriptionTree.Add(peer, BindingKeyHelper.CreateFromString(bindingKey, '.'));
+
+            var routingContent = new RoutingContent(
+                new RoutingContentValue("1"),
+                new RoutingContentValue(new[] { "A", "B", "C" })
+            );
+
+            // Act
+            var matchingPeers = peerSubscriptionTree.GetPeers(routingContent);
+
+            // Assert
+            matchingPeers.Any().ShouldEqual(isMatchExpected);
+        }
     }
     }
 }
 }

+ 1 - 1
src/Abc.Zebus.Tests/Messages/FakeRoutableCommand.cs

@@ -33,4 +33,4 @@ namespace Abc.Zebus.Tests.Messages
             OtherId = otherId;
             OtherId = otherId;
         }
         }
     }
     }
-}
+}

+ 20 - 0
src/Abc.Zebus.Tests/Messages/FakeRoutableCommandWithCollection.cs

@@ -0,0 +1,20 @@
+using System.Collections.Generic;
+using Abc.Zebus.Routing;
+using ProtoBuf;
+
+namespace Abc.Zebus.Tests.Messages
+{
+    [ProtoContract, Routable]
+    public class FakeRoutableCommandWithCollection : ICommand
+    {
+        [ProtoMember(1), RoutingPosition(1)]
+        public string Name;
+
+        [ProtoMember(2), RoutingPosition(2)]
+        public int[] IdArray;
+
+        [ProtoMember(3), RoutingPosition(3)]
+        public List<decimal> ValueList { get; set; }
+
+    }
+}

+ 2 - 2
src/Abc.Zebus.Tests/Persistence/PersistentTransportTests.cs

@@ -429,7 +429,7 @@ namespace Abc.Zebus.Tests.Persistence
             {
             {
                 Transport.Start();
                 Transport.Start();
 
 
-                PeerDirectory.Setup(directory => directory.GetPeersHandlingMessage(new MessageBinding(MessageTypeId.PersistenceStoppingAck, BindingKey.Empty)))
+                PeerDirectory.Setup(directory => directory.GetPeersHandlingMessage(new MessageBinding(MessageTypeId.PersistenceStoppingAck, RoutingContent.Empty)))
                              .Returns(new List<Peer> { PersistencePeer });
                              .Returns(new List<Peer> { PersistencePeer });
 
 
                 // Stopping persistence
                 // Stopping persistence
@@ -444,4 +444,4 @@ namespace Abc.Zebus.Tests.Persistence
             }
             }
         }
         }
     }
     }
-}
+}

+ 2 - 82
src/Abc.Zebus.Tests/Routing/BindingKeyTests.cs

@@ -1,10 +1,6 @@
 using System;
 using System;
-using System.Globalization;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.Extensions;
-using Abc.Zebus.Testing.Measurements;
-using Abc.Zebus.Tests.Messages;
-using Abc.Zebus.Util;
 using NUnit.Framework;
 using NUnit.Framework;
 
 
 namespace Abc.Zebus.Tests.Routing
 namespace Abc.Zebus.Tests.Routing
@@ -12,88 +8,12 @@ namespace Abc.Zebus.Tests.Routing
     [TestFixture]
     [TestFixture]
     public class BindingKeyTests
     public class BindingKeyTests
     {
     {
-        [Test]
-        public void should_get_routing_key_from_message()
-        {
-            using (new CultureScope(CultureInfo.GetCultureInfo("FR-fr")))
-            {
-                var message = new FakeRoutableCommand(42.42m, "name", Guid.NewGuid());
-                var rountingKey = BindingKey.Create(message);
-
-                rountingKey.PartCount.ShouldEqual(3);
-                rountingKey.GetPartToken(0).ShouldEqual("42.42");
-                rountingKey.GetPartToken(1).ShouldEqual("name");
-                rountingKey.GetPartToken(2).ShouldEqual(message.OtherId.ToString());
-            }
-        }
-
-        [Test]
-        public void should_get_routing_key_from_message_with_properties()
-        {
-            var message = new FakeRoutableCommandWithProperties { Id = 100, FeedId = 200 };
-            var routingKey = BindingKey.Create(message);
-
-            routingKey.PartCount.ShouldEqual(2);
-            routingKey.GetPartToken(0).ShouldEqual("100");
-            routingKey.GetPartToken(1).ShouldEqual("200");
-
-            routingKey.ToString().ShouldEqual("100.200");
-        }
-
         [Test]
         [Test]
         public void should_use_special_char_for_empty_binding_key()
         public void should_use_special_char_for_empty_binding_key()
         {
         {
-            var empty = new BindingKey(new string[0]);
+            var empty = new BindingKey(Array.Empty<string>());
 
 
             empty.ToString().ShouldEqual("#");
             empty.ToString().ShouldEqual("#");
         }
         }
-
-        [Test]
-        [Explicit]
-        [Category("ManualOnly")]
-        public void MeasurePerformances()
-        {
-            var message = new FakeMarketDataEvent("USA", "NASDAQ", "MSFT");
-            Measure.Execution(1000000, () => BindingKey.Create(message));
-        }
-
-        [Test]
-        public void should_send_routing_key_exception()
-        {
-            var msg = new FakeRoutableCommand(0, null);
-
-            var exception = Assert.Throws<InvalidOperationException>(() => BindingKey.Create(msg));
-            exception.Message.ShouldContain(typeof(FakeRoutableCommand).Name);
-            exception.Message.ShouldContain("Name");
-            exception.Message.ShouldContain("can not be null");
-        }
-
-        [Routable]
-        public class FakeRoutableCommandWithProperties : ICommand
-        {
-            [RoutingPosition(1)]
-            public int Id { get; set; }
-
-            [RoutingPosition(2)]
-            public int FeedId { get; set; }
-        }
-
-        [Routable]
-        public class FakeMarketDataEvent : IEvent
-        {
-            [RoutingPosition(1)]
-            public readonly string Zone;
-            [RoutingPosition(2)]
-            public string ExchangeCode { get; private set; }
-            [RoutingPosition(3)]
-            public readonly string Ticker;
-
-            public FakeMarketDataEvent(string zone, string exchangeCode, string ticker)
-            {
-                Zone = zone;
-                Ticker = ticker;
-                ExchangeCode = exchangeCode;
-            }
-        }
     }
     }
-}
+}

+ 61 - 0
src/Abc.Zebus.Tests/Routing/RoutingMatchingTests.cs

@@ -0,0 +1,61 @@
+using System.Collections.Generic;
+using System.Linq;
+using Abc.Zebus.Directory;
+using Abc.Zebus.Routing;
+using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Tests.Messages;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Tests.Routing
+{
+    /// <summary>
+    /// Ensure alternative routing matching methods have consistent results.
+    /// </summary>
+    [TestFixture]
+    public class RoutingMatchingTests
+    {
+        [TestCase("#", true)]
+        [TestCase("*.*.*", true)]
+        [TestCase("A.*.*", true)]
+        [TestCase("9.*.*", false)]
+        [TestCase("*.1.*", true)]
+        [TestCase("*.9.*", false)]
+        [TestCase("*.*.101", true)]
+        [TestCase("*.*.999", false)]
+        [TestCase("A.3.102", true)]
+        [TestCase("A.3.999", false)]
+        [TestCase("A.9.102", false)]
+        [TestCase("9.3.102", false)]
+        public void should_matching_routable_message(string bindingKeyText, bool isMatchExpected)
+        {
+            var message = new FakeRoutableCommandWithCollection
+            {
+                Name = "A",
+                IdArray = new[] { 1, 2, 3 },
+                ValueList = new List<decimal> { 101m, 102m },
+            };
+
+            var bindingKey = BindingKeyHelper.CreateFromString(bindingKeyText, '.');
+
+            ValidateMatching(message, bindingKey, isMatchExpected);
+        }
+
+        private static void ValidateMatching(IMessage message, BindingKey bindingKey, bool isMatchExpected)
+        {
+            var messageTypeId = message.TypeId();
+            var messageBinding = MessageBinding.FromMessage(message);
+
+            var subscription = new Subscription(messageTypeId, bindingKey);
+            subscription.Matches(messageBinding).ShouldEqual(isMatchExpected, "Subscription should match");
+
+            var predicate = BindingKeyUtil.BuildPredicate(messageTypeId, bindingKey);
+            predicate.Invoke(message).ShouldEqual(isMatchExpected, "Predicate should match");
+
+            var subscriptionTree = new PeerSubscriptionTree();
+            subscriptionTree.Add(TestDataBuilder.Peer(), bindingKey);
+
+            var peers = subscriptionTree.GetPeers(messageBinding.RoutingContent);
+            peers.Any().ShouldEqual(isMatchExpected, "PeerSubscriptionTree should match");
+        }
+    }
+}

+ 75 - 10
src/Abc.Zebus.Tests/SubscriptionTests.cs

@@ -27,7 +27,29 @@ namespace Abc.Zebus.Tests
         public void single_star_should_always_match(string routingKey)
         public void single_star_should_always_match(string routingKey)
         {
         {
             var subscription = CreateSubscription("*");
             var subscription = CreateSubscription("*");
-            subscription.Matches(BindingKeyHelper.CreateFromString(routingKey, '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding(routingKey)).ShouldBeTrue();
+        }
+
+        [TestCase("*.A", true)]
+        [TestCase("*.B", true)]
+        [TestCase("*.C", true)]
+        [TestCase("*.X", false)]
+        [TestCase("X.A", false)]
+        [TestCase("1.B", true)]
+        public void should_match_collection_item(string bindingKey, bool isMatchExpected)
+        {
+            var subscription = CreateSubscription(bindingKey);
+
+            var routingContent = new RoutingContent(
+                new RoutingContentValue("1"),
+                new RoutingContentValue(new[] { "A", "B", "C" })
+                );
+
+            var messageBinding = new MessageBinding(subscription.MessageTypeId, routingContent);
+
+            var matches = subscription.Matches(messageBinding);
+
+            matches.ShouldEqual(isMatchExpected);
         }
         }
 
 
         [TestCase("whatever")]
         [TestCase("whatever")]
@@ -36,7 +58,7 @@ namespace Abc.Zebus.Tests
         public void single_dash_should_always_match(string routingKey)
         public void single_dash_should_always_match(string routingKey)
         {
         {
             var subscription = CreateSubscription("#");
             var subscription = CreateSubscription("#");
-            subscription.Matches(BindingKeyHelper.CreateFromString(routingKey, '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding(routingKey)).ShouldBeTrue();
         }
         }
 
 
         [TestCase("whatever")]
         [TestCase("whatever")]
@@ -45,7 +67,7 @@ namespace Abc.Zebus.Tests
         public void empty_bindingKey_should_always_match(string routingKey)
         public void empty_bindingKey_should_always_match(string routingKey)
         {
         {
             var subscription = new Subscription(new MessageTypeId(typeof(FakeCommand)), BindingKey.Empty);
             var subscription = new Subscription(new MessageTypeId(typeof(FakeCommand)), BindingKey.Empty);
-            subscription.Matches(BindingKeyHelper.CreateFromString(routingKey, '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding(routingKey)).ShouldBeTrue();
         }
         }
 
 
         [TestCase("a.b.c")]
         [TestCase("a.b.c")]
@@ -53,7 +75,7 @@ namespace Abc.Zebus.Tests
         public void stars_should_always_match_if_same_number_of_parts(string routingKey)
         public void stars_should_always_match_if_same_number_of_parts(string routingKey)
         {
         {
             var subscription = CreateSubscription("*.*.*");
             var subscription = CreateSubscription("*.*.*");
-            subscription.Matches(BindingKeyHelper.CreateFromString(routingKey, '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding(routingKey)).ShouldBeTrue();
         }
         }
 
 
         [TestCase("a.b.*")]
         [TestCase("a.b.*")]
@@ -63,7 +85,7 @@ namespace Abc.Zebus.Tests
         public void binding_key_with_star_should_match_routing_key(string bindingKey)
         public void binding_key_with_star_should_match_routing_key(string bindingKey)
         {
         {
             var subscription = CreateSubscription(bindingKey);
             var subscription = CreateSubscription(bindingKey);
-            subscription.Matches(BindingKeyHelper.CreateFromString("a.b.c", '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding("a.b.c")).ShouldBeTrue();
         }
         }
 
 
         [TestCase("a.b.#")]
         [TestCase("a.b.#")]
@@ -71,7 +93,7 @@ namespace Abc.Zebus.Tests
         public void binding_key_with_dash_should_match_routing_key(string bindingKey)
         public void binding_key_with_dash_should_match_routing_key(string bindingKey)
         {
         {
             var subscription = CreateSubscription(bindingKey);
             var subscription = CreateSubscription(bindingKey);
-            subscription.Matches(BindingKeyHelper.CreateFromString("a.b.c", '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding("a.b.c")).ShouldBeTrue();
         }
         }
 
 
         [TestCase("a.b", "a.b.c.d")]
         [TestCase("a.b", "a.b.c.d")]
@@ -80,14 +102,14 @@ namespace Abc.Zebus.Tests
         public void should_not_match_binding_key(string routingKey, string bindingKey)
         public void should_not_match_binding_key(string routingKey, string bindingKey)
         {
         {
             var subscription = CreateSubscription(bindingKey);
             var subscription = CreateSubscription(bindingKey);
-            subscription.Matches(BindingKeyHelper.CreateFromString(routingKey, '.')).ShouldBeFalse();
+            subscription.Matches(CreateMessageBinding(routingKey)).ShouldBeFalse();
         }
         }
 
 
         [Test]
         [Test]
         public void exact_same_routing_key_should_match_binding_key()
         public void exact_same_routing_key_should_match_binding_key()
         {
         {
             var subscription = CreateSubscription("a.b.c");
             var subscription = CreateSubscription("a.b.c");
-            subscription.Matches(BindingKeyHelper.CreateFromString("a.b.c", '.')).ShouldBeTrue();
+            subscription.Matches(CreateMessageBinding("a.b.c")).ShouldBeTrue();
         }
         }
 
 
         [Test]
         [Test]
@@ -197,6 +219,44 @@ namespace Abc.Zebus.Tests
             subscription.BindingKey.ShouldEqual(new BindingKey(GetFieldValue().ToString(), "*", "*"));
             subscription.BindingKey.ShouldEqual(new BindingKey(GetFieldValue().ToString(), "*", "*"));
         }
         }
 
 
+        [Test]
+        public void should_create_subscription_from_array_1()
+        {
+            var subscription = Subscription.Matching<FakeRoutableCommandWithCollection>(x => x.IdArray.Contains(ExpectedId()));
+
+            subscription.MessageTypeId.ShouldEqual(new MessageTypeId(typeof(FakeRoutableCommandWithCollection)));
+            subscription.BindingKey.ShouldEqual(new BindingKey("*", "42", "*"));
+        }
+
+        [Test]
+        public void should_create_subscription_from_array_2()
+        {
+            var subscription = Subscription.Matching<FakeRoutableCommandWithCollection>(x => x.Name == "X" && x.IdArray.Contains(1));
+
+            subscription.MessageTypeId.ShouldEqual(new MessageTypeId(typeof(FakeRoutableCommandWithCollection)));
+            subscription.BindingKey.ShouldEqual(new BindingKey("X", "1", "*"));
+        }
+
+        [Test]
+        public void should_create_subscription_from_array_3()
+        {
+            var subscription = Subscription.Matching<FakeRoutableCommandWithCollection>(x => x.IdArray.Contains(1) && x.Name == "X");
+
+            subscription.MessageTypeId.ShouldEqual(new MessageTypeId(typeof(FakeRoutableCommandWithCollection)));
+            subscription.BindingKey.ShouldEqual(new BindingKey("X", "1", "*"));
+        }
+
+        [Test]
+        public void should_create_subscription_from_list()
+        {
+            var subscription = Subscription.Matching<FakeRoutableCommandWithCollection>(x => x.ValueList.Contains(999));
+
+            subscription.MessageTypeId.ShouldEqual(new MessageTypeId(typeof(FakeRoutableCommandWithCollection)));
+            subscription.BindingKey.ShouldEqual(new BindingKey("*", "*", "999"));
+        }
+
+        private static int ExpectedId() => 42;
+
         [Test]
         [Test]
         public void should_be_equatable()
         public void should_be_equatable()
         {
         {
@@ -277,13 +337,13 @@ namespace Abc.Zebus.Tests
         public void MeasurePerformance(string routingKey, string bindingKey)
         public void MeasurePerformance(string routingKey, string bindingKey)
         {
         {
             var subscription = CreateSubscription(bindingKey);
             var subscription = CreateSubscription(bindingKey);
-            var key = BindingKeyHelper.CreateFromString(routingKey, '.');
+            var messageBinding = CreateMessageBinding(routingKey);
 
 
             Measure.Execution(x =>
             Measure.Execution(x =>
             {
             {
                 x.Iteration = 1000000;
                 x.Iteration = 1000000;
                 x.WarmUpIteration = 1000;
                 x.WarmUpIteration = 1000;
-                x.Action = _ => subscription.Matches(key);
+                x.Action = _ => subscription.Matches(messageBinding);
             });
             });
         }
         }
 
 
@@ -297,6 +357,11 @@ namespace Abc.Zebus.Tests
             return new Subscription(new MessageTypeId(typeof(FakeCommand)), BindingKeyHelper.CreateFromString(bindingKey, '.'));
             return new Subscription(new MessageTypeId(typeof(FakeCommand)), BindingKeyHelper.CreateFromString(bindingKey, '.'));
         }
         }
 
 
+        private MessageBinding CreateMessageBinding(string routingKeys)
+        {
+            return new MessageBinding(new MessageTypeId(typeof(FakeCommand)), RoutingContent.FromValues(routingKeys.Split('.')));
+        }
+
         private int GetFieldValue()
         private int GetFieldValue()
         {
         {
             return _field;
             return _field;

+ 6 - 1
src/Abc.Zebus.Tests/TestDataBuilder.cs

@@ -7,6 +7,11 @@ namespace Abc.Zebus.Tests
 {
 {
     public static class TestDataBuilder
     public static class TestDataBuilder
     {
     {
+        public static Peer Peer()
+        {
+            return new Peer(new PeerId($"Abc.Testing.{Guid.NewGuid()}"), "tcp://testingendpoint:123");
+        }
+
          public static Subscription[] CreateSubscriptions<TMessage>() where TMessage : IMessage
          public static Subscription[] CreateSubscriptions<TMessage>() where TMessage : IMessage
          {
          {
              return new[] { CreateSubscription<TMessage>() };
              return new[] { CreateSubscription<TMessage>() };
@@ -41,4 +46,4 @@ namespace Abc.Zebus.Tests
             };
             };
         }
         }
     }
     }
-}
+}

+ 21 - 15
src/Abc.Zebus/Directory/MessageBinding.cs

@@ -3,36 +3,42 @@ using Abc.Zebus.Routing;
 
 
 namespace Abc.Zebus.Directory
 namespace Abc.Zebus.Directory
 {
 {
-    public readonly struct MessageBinding : IEquatable<MessageBinding>
+    public readonly struct MessageBinding
     {
     {
         public readonly MessageTypeId MessageTypeId;
         public readonly MessageTypeId MessageTypeId;
-        public readonly BindingKey RoutingKey;
+        public readonly RoutingContent RoutingContent;
 
 
-        public MessageBinding(MessageTypeId messageTypeId, BindingKey routingKey)
+        public MessageBinding(MessageTypeId messageTypeId, RoutingContent routingContent)
         {
         {
             MessageTypeId = messageTypeId;
             MessageTypeId = messageTypeId;
-            RoutingKey = routingKey;
+            RoutingContent = routingContent;
         }
         }
 
 
         public static MessageBinding FromMessage(IMessage message)
         public static MessageBinding FromMessage(IMessage message)
-            => new MessageBinding(message.TypeId(), BindingKey.Create(message));
+        {
+            var messageTypeId = message.TypeId();
+            var routingContent = GetRoutingContent(message, messageTypeId);
+
+            return new MessageBinding(messageTypeId, routingContent);
+        }
 
 
         public static MessageBinding Default<T>()
         public static MessageBinding Default<T>()
-            where T : IMessage
-            => new MessageBinding(MessageUtil.TypeId<T>(), BindingKey.Empty);
+            where T : IMessage => new(MessageUtil.TypeId<T>(), RoutingContent.Empty);
 
 
-        public bool Equals(MessageBinding other)
-            => MessageTypeId == other.MessageTypeId && RoutingKey.Equals(other.RoutingKey);
+        private static RoutingContent GetRoutingContent(IMessage message, MessageTypeId messageTypeId)
+        {
+            var members = messageTypeId.Descriptor.RoutingMembers;
+            if (members.Length == 0)
+                return RoutingContent.Empty;
 
 
-        public override bool Equals(object? obj)
-            => obj is MessageBinding binding && Equals(binding);
+            var values = new RoutingContentValue[members.Length];
 
 
-        public override int GetHashCode()
-        {
-            unchecked
+            for (var tokenIndex = 0; tokenIndex < values.Length; ++tokenIndex)
             {
             {
-                return (MessageTypeId.GetHashCode() * 397) ^ RoutingKey.GetHashCode();
+                values[tokenIndex] = members[tokenIndex].GetValue(message);
             }
             }
+
+            return new RoutingContent(values);
         }
         }
     }
     }
 }
 }

+ 1 - 1
src/Abc.Zebus/Directory/PeerDirectoryClient.cs

@@ -226,7 +226,7 @@ namespace Abc.Zebus.Directory
             if (subscriptionList == null)
             if (subscriptionList == null)
                 return Array.Empty<Peer>();
                 return Array.Empty<Peer>();
 
 
-            return subscriptionList.GetPeers(messageBinding.RoutingKey);
+            return subscriptionList.GetPeers(messageBinding.RoutingContent);
         }
         }
 
 
         public bool IsPersistent(PeerId peerId)
         public bool IsPersistent(PeerId peerId)

+ 32 - 10
src/Abc.Zebus/Directory/PeerSubscriptionTree.cs

@@ -21,11 +21,11 @@ namespace Abc.Zebus.Directory
         public void Remove(Peer peer, BindingKey subscription)
         public void Remove(Peer peer, BindingKey subscription)
             => UpdatePeerSubscription(peer, subscription, UpdateAction.Remove);
             => UpdatePeerSubscription(peer, subscription, UpdateAction.Remove);
 
 
-        public IList<Peer> GetPeers(BindingKey routingKey)
+        public IList<Peer> GetPeers(RoutingContent routingContent)
         {
         {
             var peerCollector = new PeerCollector(_peersMatchingAllMessages);
             var peerCollector = new PeerCollector(_peersMatchingAllMessages);
 
 
-            if (routingKey.IsEmpty)
+            if (routingContent.IsEmpty)
             {
             {
                 // The message is not routable or has no routing member.
                 // The message is not routable or has no routing member.
 
 
@@ -39,7 +39,7 @@ namespace Abc.Zebus.Directory
             }
             }
             else
             else
             {
             {
-                _rootNode.Accept(peerCollector, routingKey);
+                _rootNode.Accept(peerCollector, routingContent);
             }
             }
 
 
             return peerCollector.GetPeers();
             return peerCollector.GetPeers();
@@ -134,23 +134,37 @@ namespace Abc.Zebus.Directory
                 }
                 }
             }
             }
 
 
-            public void Accept(PeerCollector peerCollector, BindingKey routingKey)
+            public void Accept(PeerCollector peerCollector, RoutingContent routingContent)
             {
             {
-                if (IsLeaf(routingKey))
+                if (IsLeaf(routingContent))
                 {
                 {
                     peerCollector.Offer(_peers);
                     peerCollector.Offer(_peers);
                     return;
                     return;
                 }
                 }
 
 
                 _sharpNode?.AddAllPeers(peerCollector);
                 _sharpNode?.AddAllPeers(peerCollector);
-                _starNode?.Accept(peerCollector, routingKey);
+                _starNode?.Accept(peerCollector, routingContent);
 
 
-                var nextPart = routingKey.GetPartToken(_nextPartIndex);
-                if (nextPart == null || _childNodes == null)
+                if (_childNodes == null)
                     return;
                     return;
 
 
-                if (_childNodes.TryGetValue(nextPart, out var childNode))
-                    childNode.Accept(peerCollector, routingKey);
+                var partValue = routingContent[_nextPartIndex];
+
+                if (partValue.IsSingle)
+                {
+                    // Almost all RoutingContentValue are expected to contain a single value.
+
+                    if (partValue.SingleValue != null && _childNodes.TryGetValue(partValue.SingleValue, out var childNode))
+                        childNode.Accept(peerCollector, routingContent);
+                }
+                else
+                {
+                    foreach (var token in partValue.GetValues())
+                    {
+                        if (token != null && _childNodes.TryGetValue(token, out var childNode))
+                            childNode.Accept(peerCollector, routingContent);
+                    }
+                }
             }
             }
 
 
             public int Update(Peer peer, BindingKey subscription, UpdateAction action)
             public int Update(Peer peer, BindingKey subscription, UpdateAction action)
@@ -194,6 +208,14 @@ namespace Abc.Zebus.Directory
                 return _nextPartIndex == bindingKey.PartCount;
                 return _nextPartIndex == bindingKey.PartCount;
             }
             }
 
 
+            private bool IsLeaf(RoutingContent routingContent)
+            {
+                if (_nextPartIndex == 0)
+                    return false;
+
+                return _nextPartIndex == routingContent.PartCount;
+            }
+
             private SubscriptionNode GetOrAddChildNode(string part)
             private SubscriptionNode GetOrAddChildNode(string part)
             {
             {
                 if (_childNodes == null)
                 if (_childNodes == null)

+ 65 - 34
src/Abc.Zebus/MessageTypeDescriptor.cs

@@ -1,4 +1,5 @@
 using System;
 using System;
+using System.Collections.Generic;
 using System.Diagnostics.CodeAnalysis;
 using System.Diagnostics.CodeAnalysis;
 using System.Globalization;
 using System.Globalization;
 using System.Linq;
 using System.Linq;
@@ -51,7 +52,7 @@ namespace Abc.Zebus
             return Load(TypeUtil.Resolve(fullName), fullName);
             return Load(TypeUtil.Resolve(fullName), fullName);
         }
         }
 
 
-        internal static MessageTypeDescriptor Load(Type? messageType, string? fullName)
+        public static MessageTypeDescriptor Load(Type? messageType, string? fullName)
         {
         {
             if (fullName == null)
             if (fullName == null)
                 return Null;
                 return Null;
@@ -68,43 +69,48 @@ namespace Abc.Zebus
 
 
         public class RoutingMember
         public class RoutingMember
         {
         {
-            private static readonly MethodInfo _toStringMethod = typeof(object).GetMethod(nameof(ToString))!;
-            private static readonly MethodInfo _toStringWithFormatMethod = typeof(IConvertible).GetMethod(nameof(IConvertible.ToString))!;
+            private static readonly MethodInfo _getValueMethod = typeof(RoutingMember).GetMethod(nameof(GetMemberValue), BindingFlags.Static | BindingFlags.NonPublic)!;
+            private static readonly MethodInfo _getValueConvertibleMethod = typeof(RoutingMember).GetMethod(nameof(GetMemberValueConvertible), BindingFlags.Static | BindingFlags.NonPublic)!;
+            private static readonly MethodInfo _getValuesMethod = typeof(RoutingMember).GetMethod(nameof(GetMemberValues), BindingFlags.Static | BindingFlags.NonPublic)!;
+            private static readonly MethodInfo _getValuesConvertibleMethod = typeof(RoutingMember).GetMethod(nameof(GetMemberValuesConvertible), BindingFlags.Static | BindingFlags.NonPublic)!;
+            private static readonly MethodInfo _matchesMethod = typeof(RoutingContentValue).GetMethod(nameof(RoutingContentValue.Matches))!;
 
 
             public static ParameterExpression ParameterExpression { get; } = Expression.Parameter(typeof(IMessage), "m");
             public static ParameterExpression ParameterExpression { get; } = Expression.Parameter(typeof(IMessage), "m");
 
 
             public static RoutingMember[] GetAll(Type messageType)
             public static RoutingMember[] GetAll(Type messageType)
             {
             {
-                var castExpression = Expression.Convert(ParameterExpression, messageType);
-
                 return messageType.GetMembers(BindingFlags.Public | BindingFlags.Instance)
                 return messageType.GetMembers(BindingFlags.Public | BindingFlags.Instance)
                                   .Select(x => (member: x, attribute: x.GetCustomAttribute<RoutingPositionAttribute>(true)))
                                   .Select(x => (member: x, attribute: x.GetCustomAttribute<RoutingPositionAttribute>(true)))
                                   .Where(x => x.attribute != null)
                                   .Where(x => x.attribute != null)
                                   .OrderBy(x => x.attribute!.Position)
                                   .OrderBy(x => x.attribute!.Position)
-                                  .Select((x, index) => new RoutingMember(index, x.attribute!.Position, x.member, BuildToStringExpression(castExpression, x.member)))
+                                  .Select((x, index) => new RoutingMember(index, x.attribute!.Position, x.member))
                                   .ToArray();
                                   .ToArray();
             }
             }
 
 
-            private RoutingMember(int index, int position, MemberInfo member, MethodCallExpression toStringExpression)
+            private readonly MethodCallExpression _valueExpression;
+            private readonly Func<IMessage, RoutingContentValue> _valueFunc;
+
+            private RoutingMember(int index, int position, MemberInfo member)
             {
             {
                 Index = index;
                 Index = index;
                 RoutingPosition = position;
                 RoutingPosition = position;
                 Member = member;
                 Member = member;
-                ToStringExpression = toStringExpression;
-                ToStringDelegate = BuildToStringDelegate(toStringExpression);
+
+                var valueExpression = BuildValueExpression(member);
+
+                _valueExpression = valueExpression;
+                _valueFunc = BuildValueFunc(valueExpression);
             }
             }
 
 
             public int Index { get; }
             public int Index { get; }
             public int RoutingPosition { get; }
             public int RoutingPosition { get; }
             public MemberInfo Member { get; }
             public MemberInfo Member { get; }
-            public MethodCallExpression ToStringExpression { get; }
-            private Func<IMessage, string> ToStringDelegate { get; }
 
 
-            public string GetValue(IMessage message)
+            public RoutingContentValue GetValue(IMessage message)
             {
             {
                 try
                 try
                 {
                 {
-                    return ToStringDelegate(message);
+                    return _valueFunc(message);
                 }
                 }
                 catch (NullReferenceException)
                 catch (NullReferenceException)
                 {
                 {
@@ -112,37 +118,62 @@ namespace Abc.Zebus
                 }
                 }
             }
             }
 
 
-            private static MethodCallExpression BuildToStringExpression(Expression castExpression, MemberInfo member)
+            private static RoutingContentValue GetMemberValue<TMember>(TMember? value)
+                => new RoutingContentValue(value?.ToString());
+
+            private static RoutingContentValue GetMemberValueConvertible<TMember>(TMember? value)
+                where TMember : IConvertible
+                => new RoutingContentValue(value?.ToString(CultureInfo.InvariantCulture));
+
+            private static RoutingContentValue GetMemberValues<TMember>(ICollection<TMember>? values)
+                => new RoutingContentValue(values != null ? values.Select(x => x!.ToString()).ToArray() : Array.Empty<string>());
+
+            private static RoutingContentValue GetMemberValuesConvertible<TMember>(ICollection<TMember>? values)
+                where TMember : IConvertible
+                => new RoutingContentValue(values != null ? values.Select(x => x.ToString(CultureInfo.InvariantCulture)).ToArray() : Array.Empty<string>());
+
+            private static MethodCallExpression BuildValueExpression(MemberInfo member)
             {
             {
-                Func<Expression, Expression> memberAccessor;
-                Type memberType;
+                var memberExpression = GetMemberExpression();
+                var getValueMethod = GetValueMethodInfo(memberExpression.type);
 
 
-                if (member.MemberType == MemberTypes.Property)
-                {
-                    var propertyInfo = (PropertyInfo)member;
-                    memberAccessor = m => Expression.Property(m, propertyInfo);
-                    memberType = propertyInfo.PropertyType;
-                }
-                else if (member.MemberType == MemberTypes.Field)
+                return Expression.Call(null, getValueMethod, memberExpression.value);
+
+                (Expression value, Type type) GetMemberExpression()
                 {
                 {
-                    var fieldInfo = (FieldInfo)member;
-                    memberAccessor = m => Expression.Field(m, fieldInfo);
-                    memberType = fieldInfo.FieldType;
+                    var castExpression = Expression.Convert(ParameterExpression, member.DeclaringType!);
+
+                    return member switch
+                    {
+                        PropertyInfo propertyInfo => (Expression.Property(castExpression, propertyInfo), propertyInfo.PropertyType),
+                        FieldInfo fieldInfo       => (Expression.Field(castExpression, fieldInfo), fieldInfo.FieldType),
+                        _                         => throw new InvalidOperationException("Cannot define routing position on a member other than a field or property"),
+                    };
                 }
                 }
-                else
+
+                MethodInfo GetValueMethodInfo(Type memberType)
                 {
                 {
-                    throw new InvalidOperationException("Cannot define routing position on a member other than a field or property");
+                    var collectionType = memberType.GetInterfaces().FirstOrDefault(x => x.IsGenericType && x.GetGenericTypeDefinition() == typeof(ICollection<>));
+
+                    if (collectionType != null)
+                    {
+                        var itemType = collectionType.GetGenericArguments()[0];
+                        return typeof(IConvertible).IsAssignableFrom(itemType) ? _getValuesConvertibleMethod.MakeGenericMethod(itemType) : _getValuesMethod.MakeGenericMethod(itemType);
+                    }
+
+                    return typeof(IConvertible).IsAssignableFrom(memberType) ? _getValueConvertibleMethod.MakeGenericMethod(memberType) : _getValueMethod.MakeGenericMethod(memberType);
                 }
                 }
+            }
 
 
-                return typeof(IConvertible).IsAssignableFrom(memberType) && memberType != typeof(string)
-                    ? Expression.Call(memberAccessor(castExpression), _toStringWithFormatMethod, Expression.Constant(CultureInfo.InvariantCulture))
-                    : Expression.Call(memberAccessor(castExpression), _toStringMethod);
+            private static Func<IMessage, RoutingContentValue> BuildValueFunc(Expression valueExpression)
+            {
+                var lambda = Expression.Lambda(typeof(Func<IMessage, RoutingContentValue>), valueExpression, ParameterExpression);
+                return (Func<IMessage, RoutingContentValue>)lambda.Compile();
             }
             }
 
 
-            private static Func<IMessage, string> BuildToStringDelegate(Expression toStringExpression)
+            public Expression CreateMatchExpression(string? targetValue)
             {
             {
-                var lambda = Expression.Lambda(typeof(Func<IMessage, string>), toStringExpression, ParameterExpression);
-                return (Func<IMessage, string>)lambda.Compile();
+                return Expression.Call(_valueExpression, _matchesMethod, Expression.Constant(targetValue));
             }
             }
         }
         }
     }
     }

+ 3 - 19
src/Abc.Zebus/Routing/BindingKey.cs

@@ -1,14 +1,14 @@
 using System;
 using System;
-using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Collections.Generic;
-using System.Linq;
-using System.Reflection;
 using Abc.Zebus.Util.Extensions;
 using Abc.Zebus.Util.Extensions;
 using JetBrains.Annotations;
 using JetBrains.Annotations;
 using ProtoBuf;
 using ProtoBuf;
 
 
 namespace Abc.Zebus.Routing
 namespace Abc.Zebus.Routing
 {
 {
+    /// <summary>
+    /// Routing key of a subscription.
+    /// </summary>
     [ProtoContract]
     [ProtoContract]
     public readonly struct BindingKey : IEquatable<BindingKey>
     public readonly struct BindingKey : IEquatable<BindingKey>
     {
     {
@@ -99,22 +99,6 @@ namespace Abc.Zebus.Routing
             return string.Join(".", _parts);
             return string.Join(".", _parts);
         }
         }
 
 
-        internal static BindingKey Create(IMessage message)
-        {
-            var routingMembers = message.TypeId().Descriptor.RoutingMembers;
-            if (routingMembers.Length == 0)
-                return Empty;
-
-            var parts = new string[routingMembers.Length];
-
-            for (var tokenIndex = 0; tokenIndex < parts.Length; ++tokenIndex)
-            {
-                parts[tokenIndex] = routingMembers[tokenIndex].GetValue(message);
-            }
-
-            return new BindingKey(parts);
-        }
-
         internal static BindingKey Create(Type messageType, IDictionary<string, string> fieldValues)
         internal static BindingKey Create(Type messageType, IDictionary<string, string> fieldValues)
         {
         {
             var routingMembers = MessageUtil.GetTypeId(messageType).Descriptor.RoutingMembers;
             var routingMembers = MessageUtil.GetTypeId(messageType).Descriptor.RoutingMembers;

+ 3 - 2
src/Abc.Zebus/Routing/BindingKeyUtil.cs

@@ -24,8 +24,8 @@ namespace Abc.Zebus.Routing
                     continue;
                     continue;
 
 
                 var part = bindingKey.GetPartToken(index);
                 var part = bindingKey.GetPartToken(index);
-                var memberToStringExpression = routingMembers[index].ToStringExpression;
-                subPredicates.Add(Expression.MakeBinary(ExpressionType.Equal, memberToStringExpression, Expression.Constant(part)));
+
+                subPredicates.Add(routingMembers[index].CreateMatchExpression(part));
             }
             }
 
 
             if (!subPredicates.Any())
             if (!subPredicates.Any())
@@ -33,6 +33,7 @@ namespace Abc.Zebus.Routing
 
 
             var empty = Expression.Empty();
             var empty = Expression.Empty();
             var finalExpression = subPredicates.Aggregate<Expression, Expression>(empty, (final, exp) => final == empty ? exp : Expression.AndAlso(final, exp));
             var finalExpression = subPredicates.Aggregate<Expression, Expression>(empty, (final, exp) => final == empty ? exp : Expression.AndAlso(final, exp));
+
             return (Func<IMessage, bool>)Expression.Lambda(finalExpression, MessageTypeDescriptor.RoutingMember.ParameterExpression).Compile();
             return (Func<IMessage, bool>)Expression.Lambda(finalExpression, MessageTypeDescriptor.RoutingMember.ParameterExpression).Compile();
         }
         }
 
 

+ 32 - 0
src/Abc.Zebus/Routing/RoutingContent.cs

@@ -0,0 +1,32 @@
+using System;
+using System.Linq;
+
+namespace Abc.Zebus.Routing
+{
+    /// <summary>
+    /// Stores the routing members values of routable messages.
+    /// </summary>
+    public readonly struct RoutingContent
+    {
+        public static readonly RoutingContent Empty = new RoutingContent();
+
+        private readonly RoutingContentValue[]? _members;
+
+        public RoutingContent(params RoutingContentValue[]? members)
+        {
+            _members = members;
+        }
+
+        public int PartCount
+            => _members?.Length ?? 0;
+
+        public bool IsEmpty
+            => _members == null || _members.Length == 0;
+
+        public RoutingContentValue this[int index]
+            => _members != null ? _members[index] : throw new ArgumentOutOfRangeException();
+
+        public static RoutingContent FromValues(params string[] values)
+            => new RoutingContent(values.Select(x => new RoutingContentValue(x)).ToArray());
+    }
+}

+ 63 - 0
src/Abc.Zebus/Routing/RoutingContentValue.cs

@@ -0,0 +1,63 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Runtime.InteropServices;
+
+namespace Abc.Zebus.Routing
+{
+    /// <summary>
+    /// Stores the routing member value of a routable message.
+    /// </summary>
+    [StructLayout(LayoutKind.Explicit)]
+    public readonly struct RoutingContentValue : IEquatable<RoutingContentValue>
+    {
+        [FieldOffset(0)]
+        private readonly string? _value;
+        [FieldOffset(0)]
+        private readonly string?[] _values;
+        [FieldOffset(8)]
+        private readonly bool _isCollection;
+
+        public RoutingContentValue(string? value)
+        {
+            _values = null!;
+            _value = value;
+            _isCollection = false;
+        }
+
+        public RoutingContentValue(string?[] values)
+        {
+            _value = null;
+            _values = values;
+            _isCollection = true;
+        }
+
+        public string?[] GetValues()
+        {
+            if (_isCollection)
+                return _values;
+
+            return new[] { _value };
+        }
+
+        public bool Matches(string? s)
+        {
+            return _isCollection ? _values.Contains(s) : _value == s;
+        }
+
+        public bool Equals(RoutingContentValue other)
+        {
+            return _isCollection == other._isCollection && GetValues().SequenceEqual(other.GetValues());
+        }
+
+        public override string? ToString()
+        {
+            return _isCollection
+                ? "[" + string.Join(", ", _values) + "]"
+                : _value;
+        }
+
+        internal bool IsSingle => !_isCollection;
+        internal string? SingleValue => _value;
+    }
+}

+ 53 - 21
src/Abc.Zebus/Subscription.cs

@@ -44,25 +44,30 @@ namespace Abc.Zebus
 
 
         public bool Matches(MessageBinding messageBinding)
         public bool Matches(MessageBinding messageBinding)
         {
         {
-            return messageBinding.MessageTypeId == MessageTypeId && Matches(messageBinding.RoutingKey);
+            return Matches(messageBinding.MessageTypeId, messageBinding.RoutingContent);
         }
         }
 
 
-        public bool Matches(BindingKey routingKey)
+        public bool Matches(MessageTypeId messageTypeId, RoutingContent routingContent)
+        {
+            return messageTypeId == MessageTypeId && Matches(routingContent);
+        }
+
+        private bool Matches(RoutingContent routingContent)
         {
         {
             if (BindingKey.IsEmpty)
             if (BindingKey.IsEmpty)
                 return true;
                 return true;
 
 
-            for (var i = 0; i < routingKey.PartCount; i++)
+            for (var i = 0; i < routingContent.PartCount; i++)
             {
             {
                 var evaluatedPart = BindingKey.GetPartToken(i);
                 var evaluatedPart = BindingKey.GetPartToken(i);
                 if (evaluatedPart == "#")
                 if (evaluatedPart == "#")
                     return true;
                     return true;
 
 
-                if (evaluatedPart != "*" && routingKey.GetPartToken(i) != evaluatedPart)
+                if (evaluatedPart != "*" && !routingContent[i].Matches(evaluatedPart))
                     return false;
                     return false;
             }
             }
 
 
-            return routingKey.PartCount == BindingKey.PartCount;
+            return routingContent.PartCount == BindingKey.PartCount;
         }
         }
 
 
         public BindingKeyPart GetBindingKeyPartForMember(string memberName)
         public BindingKeyPart GetBindingKeyPartForMember(string memberName)
@@ -166,27 +171,46 @@ namespace Abc.Zebus
 
 
         private static void AddFieldValue<TMessage>(Dictionary<string, string> fieldValues, Expression expression)
         private static void AddFieldValue<TMessage>(Dictionary<string, string> fieldValues, Expression expression)
         {
         {
-            if (expression is BinaryExpression binaryExpression)
-            {
-                AddFieldValueFromBinaryExpression<TMessage>(fieldValues, binaryExpression);
-                return;
-            }
-
-            if (expression is UnaryExpression unaryExpression)
+            switch (expression)
             {
             {
-                AddFieldValueFromUnaryExpression<TMessage>(fieldValues, unaryExpression);
-                return;
-            }
+                case BinaryExpression binaryExpression:
+                    AddFieldValueFromBinaryExpression<TMessage>(fieldValues, binaryExpression);
+                    return;
+
+                case UnaryExpression unaryExpression:
+                    AddFieldValueFromUnaryExpression<TMessage>(fieldValues, unaryExpression);
+                    return;
+
+                case MemberExpression memberExpression:
+                    AddFieldValueFromMemberExpression<TMessage>(fieldValues, memberExpression);
+                    return;
+
+                case MethodCallExpression methodCallExpression when methodCallExpression.Method.Name == "Contains":
+                    if (methodCallExpression.Object != null && methodCallExpression.Arguments.Count == 1)
+                    {
+                        AddFieldValueFromContainsExpression<TMessage>(fieldValues, methodCallExpression, methodCallExpression.Object, methodCallExpression.Arguments[0]);
+                        return;
+                    }
+                    if (methodCallExpression.Object == null && methodCallExpression.Arguments.Count == 2)
+                    {
+                        AddFieldValueFromContainsExpression<TMessage>(fieldValues, methodCallExpression, methodCallExpression.Arguments[0], methodCallExpression.Arguments[1]);
+                        return;
+                    }
+                    break;
 
 
-            if (expression is MemberExpression memberExpression)
-            {
-                AddFieldValueFromMemberExpression<TMessage>(fieldValues, memberExpression);
-                return;
             }
             }
 
 
             throw CreateArgumentException(expression);
             throw CreateArgumentException(expression);
         }
         }
 
 
+        private static void AddFieldValueFromContainsExpression<T>(Dictionary<string, string> fieldValues, Expression parentExpression, Expression collection, Expression item)
+        {
+            if (!TryGetMessageMemberExpression<T>(collection, out var memberExpression))
+                throw CreateArgumentException(parentExpression);
+
+            AddFieldValueFromExpressions(fieldValues, memberExpression, item);
+        }
+
         private static void AddFieldValueFromUnaryExpression<T>(Dictionary<string, string> fieldValues, UnaryExpression unaryExpression)
         private static void AddFieldValueFromUnaryExpression<T>(Dictionary<string, string> fieldValues, UnaryExpression unaryExpression)
         {
         {
             if (unaryExpression.Type != typeof(bool))
             if (unaryExpression.Type != typeof(bool))
@@ -237,13 +261,21 @@ namespace Abc.Zebus
                 throw CreateArgumentException(binaryExpression);
                 throw CreateArgumentException(binaryExpression);
             }
             }
 
 
+            AddFieldValueFromExpressions(fieldValues, memberExpression, memberValueExpression);
+        }
+
+        private static void AddFieldValueFromExpressions(Dictionary<string, string> fieldValues, MemberExpression memberExpression, Expression memberValueExpression)
+        {
             var memberName = memberExpression.Member.Name;
             var memberName = memberExpression.Member.Name;
             var memberValue = Expression.Lambda(memberValueExpression).Compile().DynamicInvoke();
             var memberValue = Expression.Lambda(memberValueExpression).Compile().DynamicInvoke();
             if (memberValue == null)
             if (memberValue == null)
                 return;
                 return;
 
 
-            var valueAsText = memberExpression.Type.IsEnum ? Enum.GetName(memberExpression.Type, memberValue) : memberValue.ToString();
-            fieldValues.Add(memberName, valueAsText ?? string.Empty);
+            var stringValue = memberExpression.Type.IsEnum ? Enum.GetName(memberExpression.Type, memberValue) : memberValue.ToString();
+            if (stringValue == null)
+                return;
+
+            fieldValues.Add(memberName, stringValue);
         }
         }
 
 
         private static bool TryGetMessageMemberExpression<TMessage>(Expression? expression, [NotNullWhen(true)] out MemberExpression? memberExpression)
         private static bool TryGetMessageMemberExpression<TMessage>(Expression? expression, [NotNullWhen(true)] out MemberExpression? memberExpression)