Pārlūkot izejas kodu

Don't block during unsubsciptions, but prevent subscription calls from running concurrently with unsubscriptions

Lucas Trzesniewski 8 gadi atpakaļ
vecāks
revīzija
e21e310f39

+ 9 - 0
src/Abc.Zebus.Tests/Core/BusTests.Subscribe.cs

@@ -7,10 +7,12 @@ using Abc.Zebus.Directory;
 using Abc.Zebus.Dispatch;
 using Abc.Zebus.Routing;
 using Abc.Zebus.Scan;
+using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Comparison;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Testing.UnitTesting;
 using Abc.Zebus.Tests.Messages;
+using Abc.Zebus.Util;
 using Moq;
 using NUnit.Framework;
 
@@ -118,6 +120,7 @@ namespace Abc.Zebus.Tests.Core
                 _directoryMock.CaptureEnumerable((IBus)_bus, (x, bus, items) => x.UpdateSubscriptionsAsync(bus, items), directorySubscriptions);
 
                 subscription.Dispose();
+                Wait.Until(() => directorySubscriptions.Count > 0, 2.Seconds());
 
                 directorySubscriptions.ExpectedSingle().ShouldEqual(new SubscriptionsForType(MessageUtil.TypeId<FakeRoutableCommand>()));
             }
@@ -132,6 +135,7 @@ namespace Abc.Zebus.Tests.Core
                 _directoryMock.CaptureEnumerable((IBus)_bus, (x, bus, items) => x.UpdateSubscriptionsAsync(bus, items), directorySubscriptions);
 
                 subscription.Dispose();
+                Wait.Until(() => directorySubscriptions.Count > 0, 2.Seconds());
 
                 directorySubscriptions.ExpectedSingle().ShouldEqual(new SubscriptionsForType(MessageUtil.TypeId<FakeRoutableCommand>(), BindingKey.Empty));
             }
@@ -146,6 +150,7 @@ namespace Abc.Zebus.Tests.Core
                 _directoryMock.CaptureEnumerable((IBus)_bus, (x, bus, items) => x.UpdateSubscriptionsAsync(bus, items), subscriptions);
 
                 subscription.Dispose();
+                Wait.Until(() => subscriptions.Count > 0, 2.Seconds());
 
                 subscriptions.ShouldContain(new SubscriptionsForType(MessageUtil.TypeId<FakeRoutableCommand>()));
             }
@@ -162,6 +167,7 @@ namespace Abc.Zebus.Tests.Core
                 _directoryMock.CaptureEnumerable((IBus)_bus, (x, bus, items) => x.UpdateSubscriptionsAsync(bus, items), subscriptions);
 
                 firstSubscription.Dispose();
+                Wait.Until(() => subscriptions.Count > 0, 2.Seconds());
 
                 subscriptions.ExpectedSingle();
                 subscriptions.ShouldContain(new SubscriptionsForType(MessageUtil.TypeId<FakeCommand>()));
@@ -184,9 +190,12 @@ namespace Abc.Zebus.Tests.Core
                 var subscription2 = _bus.Subscribe(Subscription.ByExample(x => new FakeRoutableCommand(1, "toto")));
 
                 subscription1.Dispose();
+                Wait.Until(() => lastDirectorySubscriptions.Count > 0, 2.Seconds());
                 lastDirectorySubscriptions.ExpectedSingle().BindingKeys.ShouldBeEquivalentTo(new[] { new BindingKey("1", "toto", "*") });
 
+                lastDirectorySubscriptions.Clear();
                 subscription2.Dispose();
+                Wait.Until(() => lastDirectorySubscriptions.Count > 0, 2.Seconds());
                 lastDirectorySubscriptions.ExpectedSingle().BindingKeys.ShouldBeEmpty();
             }
 

+ 23 - 11
src/Abc.Zebus/Core/Bus.cs

@@ -34,6 +34,7 @@ namespace Abc.Zebus.Core
         private readonly IStoppingStrategy _stoppingStrategy;
         private readonly IBindingKeyPredicateBuilder _predicateBuilder;
         private readonly IBusConfiguration _configuration;
+        private Task _unsubscribeTask = TaskUtil.Completed;
 
         public Bus(ITransport transport, IPeerDirectory directory, IMessageSerializer serializer, IMessageDispatcher messageDispatcher, IMessageSendingStrategy messageSendingStrategy, IStoppingStrategy stoppingStrategy, IBindingKeyPredicateBuilder predicateBuilder, IBusConfiguration configuration)
         {
@@ -111,11 +112,15 @@ namespace Abc.Zebus.Core
         {
             _logger.DebugFormat("Performing auto subscribe...");
 
-            var autoSubscribeInvokers = _messageDispatcher.GetMessageHanlerInvokers().Where(x => x.ShouldBeSubscribedOnStartup);
-            foreach (var invoker in autoSubscribeInvokers)
+            var autoSubscribeInvokers = _messageDispatcher.GetMessageHanlerInvokers().Where(x => x.ShouldBeSubscribedOnStartup).ToList();
+
+            lock (_subscriptions)
             {
-                var subscription = new Subscription(invoker.MessageTypeId);
-                _subscriptions[subscription] = 1 + _subscriptions.GetValueOrDefault(subscription);
+                foreach (var invoker in autoSubscribeInvokers)
+                {
+                    var subscription = new Subscription(invoker.MessageTypeId);
+                    _subscriptions[subscription] = 1 + _subscriptions.GetValueOrDefault(subscription);
+                }
             }
         }
 
@@ -148,7 +153,11 @@ namespace Abc.Zebus.Core
 
             IsRunning = false;
 
-            _subscriptions.Clear();
+            lock (_subscriptions)
+            {
+                _subscriptions.Clear();
+            }
+
             _messageIdToTaskCompletionSources.Clear();
         }
 
@@ -234,7 +243,7 @@ namespace Abc.Zebus.Core
 
             await AddSubscriptionsAsync(subscriptions).ConfigureAwait(false);
 
-            return new DisposableAction(() => RemoveSubscriptionsAsync(subscriptions).Wait());
+            return new DisposableAction(() => RemoveSubscriptions(subscriptions));
         }
 
         public async Task<IDisposable> SubscribeAsync([NotNull] SubscriptionRequest request, [NotNull] Action<IMessage> handler)
@@ -257,10 +266,10 @@ namespace Abc.Zebus.Core
 
             return new DisposableAction(() =>
             {
-                RemoveSubscriptionsAsync(subscriptions).Wait();
-
                 foreach (var eventHandlerInvoker in eventHandlerInvokers)
                     _messageDispatcher.RemoveInvoker(eventHandlerInvoker);
+
+                RemoveSubscriptions(subscriptions);
             });
         }
 
@@ -286,10 +295,13 @@ namespace Abc.Zebus.Core
                 }
             }
 
+            // Wait until all unsubscriptions are completed to prevent race conditions
+            await Task.WhenAny(_unsubscribeTask).ConfigureAwait(false);
+
             await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
         }
 
-        private async Task RemoveSubscriptionsAsync(IEnumerable<Subscription> subscriptions)
+        private void RemoveSubscriptions(IEnumerable<Subscription> subscriptions)
         {
             var updatedTypes = new HashSet<MessageTypeId>();
 
@@ -304,9 +316,9 @@ namespace Abc.Zebus.Core
                     else
                         _subscriptions[subscription] = subscriptionCount - 1;
                 }
-            }
 
-            await UpdateDirectorySubscriptionsAsync(updatedTypes).ConfigureAwait(false);
+                _unsubscribeTask = _unsubscribeTask.ContinueWith(_ => UpdateDirectorySubscriptionsAsync(updatedTypes)).Unwrap();
+            }
         }
 
         private async Task UpdateDirectorySubscriptionsAsync(HashSet<MessageTypeId> updatedTypes)