Pārlūkot izejas kodu

Wait for dispatcher shutdown before setting bus as stopped

Olivier Coanet 10 gadi atpakaļ
vecāks
revīzija
1fdf94e928

+ 1 - 0
src/Abc.Zebus.Tests/Abc.Zebus.Tests.csproj

@@ -83,6 +83,7 @@
     <Compile Include="BindingKeyTests.cs" />
     <Compile Include="Comparison\ComparisonExtensionsTests.cs" />
     <Compile Include="Core\BusFactoryTests.cs" />
+    <Compile Include="Core\BusInMemoryTests.cs" />
     <Compile Include="Core\BusManualTests.cs" />
     <Compile Include="Core\BusPerformanceTests.cs" />
     <Compile Include="Core\BusTests.cs" />

+ 104 - 0
src/Abc.Zebus.Tests/Core/BusInMemoryTests.cs

@@ -0,0 +1,104 @@
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using Abc.Zebus.Core;
+using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Directory;
+using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Testing.Transport;
+using Abc.Zebus.Util;
+using NUnit.Framework;
+using ProtoBuf;
+using StructureMap;
+
+namespace Abc.Zebus.Tests.Core
+{
+    [TestFixture]
+    public class BusInMemoryTests
+    {
+        private IBus _bus;
+        private TestTransport _transport;
+        private Container _container;
+
+        [SetUp]
+        public void Setup()
+        {
+            _transport = new TestTransport();
+            _container = new Container();
+
+            _bus = new BusFactory(_container)
+                .WithHandlers(typeof(EventPublisherEventHandler))
+                .CreateAndStartInMemoryBus(new TestPeerDirectory(), _transport);
+        }
+
+        [Test]
+        public void should_be_able_to_use_bus_in_handlers_during_shutdown()
+        {
+            var handler = new EventPublisherEventHandler(_bus);
+
+            _container.Configure(x => x.ForSingletonOf<EventPublisherEventHandler>().Use(handler));
+            _transport.RaiseMessageReceived(new EventPublisherEvent().ToTransportMessage());
+
+            // make sure the handler is running
+            handler.StartedSignal.WaitOne(1.Second()).ShouldBeTrue("Handler was not started");
+
+            var stopTask = Task.Run(() => _bus.Stop());
+
+            // resume the handler
+            handler.WaitingSignal.Set();
+
+            // wait for handler completion
+            Wait.Until(() => handler.Processed, 1.Second(), "Handler was not processed");
+
+            handler.Error.ShouldBeNull("Handler was not able to send a message");
+
+            stopTask.Wait(1.Second()).ShouldBeTrue("Bus was not stopped");
+        }
+
+        [ProtoContract]
+        public class Event : IEvent
+        {
+        }
+
+        [ProtoContract]
+        public class EventPublisherEvent : IEvent
+        {
+        }
+
+        public class EventPublisherEventHandler : IMessageHandler<EventPublisherEvent>
+        {
+            private readonly IBus _bus;
+
+            public EventPublisherEventHandler(IBus bus)
+            {
+                _bus = bus;
+
+                StartedSignal = new ManualResetEvent(false);
+                WaitingSignal = new ManualResetEvent(false);
+            }
+
+            public EventWaitHandle StartedSignal { get; private set; }
+            public EventWaitHandle WaitingSignal { get; private set; }
+            public Exception Error { get; private set; }
+            public bool Processed { get; private set; }
+
+            public void Handle(EventPublisherEvent message)
+            {
+                StartedSignal.Set();
+                WaitingSignal.WaitOne();
+                try
+                {
+                    _bus.Publish(new Event());
+                }
+                catch (Exception ex)
+                {
+                    Error = ex;
+                }
+                finally
+                {
+                    Processed = true;
+                }
+            }
+        }
+    }
+}

+ 2 - 2
src/Abc.Zebus/Core/Bus.cs

@@ -128,10 +128,10 @@ namespace Abc.Zebus.Core
 
             _directory.Unregister(this);
 
-            _isRunning = false;
-
             _stoppingStrategy.Stop(_transport, _messageDispatcher);
 
+            _isRunning = false;
+
             _subscriptions.Clear();
             _messageIdToTaskCompletionSources.Clear();
             _completionResultTaskScheduler.Dispose();