Browse Source

Publish MessageProcessingFailed instead of CustomProcessingFailed

cschiano 10 years ago
parent
commit
07af31346f

+ 2 - 1
src/Abc.Zebus.Tests/Abc.Zebus.Tests.csproj

@@ -94,6 +94,7 @@
     <Compile Include="Core\BusTests.Dispatch.cs" />
     <Compile Include="Core\BusTests.Publish.cs" />
     <Compile Include="Core\BusTests.Subscribe.cs" />
+    <Compile Include="Core\BusTests.DeserializationErrors.cs" />
     <Compile Include="Core\MessageContextAwareBusTests.cs" />
     <Compile Include="Core\RoundRobinPeerSelectorTests.cs" />
     <Compile Include="Directory\PeerDirectoryClientTests.Performance.cs" />
@@ -123,7 +124,7 @@
     <Compile Include="Lotus\ReplayMessageHandlerTests.cs" />
     <Compile Include="MessageContextTests.cs" />
     <Compile Include="Messages\FakeCommandWithTimestamp.cs" />
-    <Compile Include="Routing\BindingKeyTests.cs" />
+    <Compile Include="Routing\BindingKeyTests.cs" />
     <Compile Include="Scan\StaticAsyncMessageHandlerInvokerLoaderTests.cs" />
     <Compile Include="Scan\SyncMessageHandlerInvokerLoaderTests.cs" />
     <Compile Include="Dispatch\DispatchMessages\SyncCommandHandlerWithQueueName1.cs" />

+ 106 - 0
src/Abc.Zebus.Tests/Core/BusTests.DeserializationErrors.cs

@@ -0,0 +1,106 @@
+using System;
+using System.IO;
+using System.Linq;
+using Abc.Zebus.Lotus;
+using Abc.Zebus.Testing;
+using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Testing.Transport;
+using Abc.Zebus.Tests.Messages;
+using Abc.Zebus.Transport;
+using Abc.Zebus.Util;
+using NUnit.Framework;
+
+namespace Abc.Zebus.Tests.Core
+{
+    public partial class BusTests
+    {
+        [Test]
+        public void should_send_MessageProcessingFailed_if_unable_to_deserialize_message()
+        {
+            SetupPeersHandlingMessage<MessageProcessingFailed>(_peerUp);
+
+            _bus.Start();
+
+            var command = new FakeCommand(123);
+            _messageSerializer.AddSerializationExceptionFor(command.TypeId(), "Serialization error");
+
+            using (SystemDateTime.PauseTime())
+            using (MessageId.PauseIdGeneration())
+            {
+                var transportMessage = command.ToTransportMessage();
+
+                var messageProcessingFailedBytes = new MessageProcessingFailed(null, null, null, DateTime.UtcNow, null).ToTransportMessage().MessageBytes;
+                _messageSerializer.AddSerializationFuncFor<MessageProcessingFailed>(x =>
+                {
+                    x.FailingMessage.ShouldEqual(transportMessage);
+                    x.ExceptionUtcTime.ShouldEqual(SystemDateTime.UtcNow);
+                    x.ExceptionMessage.ShouldContain("Unable to deserialize message");
+                    return messageProcessingFailedBytes;
+                });
+                
+                _transport.RaiseMessageReceived(transportMessage);
+
+                var processingFailedTransportMessage = new TransportMessage(MessageUtil.TypeId<MessageProcessingFailed>(), messageProcessingFailedBytes, _self);
+                _transport.ExpectExactly(new TransportMessageSent(processingFailedTransportMessage, _peerUp));
+            }
+        }
+
+        [Test]
+        public void should_include_exception_in_MessageProcessingFailed()
+        {
+            SetupPeersHandlingMessage<MessageProcessingFailed>(_peerUp);
+
+            _bus.Start();
+
+            var exception = new Exception("Expected exception");
+            _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
+
+            _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
+
+            var message = _transport.MessagesSent.OfType<MessageProcessingFailed>().ExpectedSingle();
+            message.ExceptionMessage.ShouldContain(exception.ToString());
+        }
+
+        [Test]
+        public void should_include_dump_path_in_MessageProcessingFailed()
+        {
+            SetupPeersHandlingMessage<MessageProcessingFailed>(_peerUp);
+
+            _bus.Start();
+
+            var exception = new Exception("Expected exception");
+            _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
+
+            _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
+
+            var message = _transport.MessagesSent.OfType<MessageProcessingFailed>().ExpectedSingle();
+            message.ExceptionMessage.ShouldContain(_expectedDumpDirectory);
+        }
+
+        [Test]
+        public void should_ack_transport_when_handling_undeserializable_message()
+        {
+            var command = new FakeCommand(123);
+            _messageSerializer.AddSerializationExceptionFor(command.TypeId());
+
+            var transportMessage = command.ToTransportMessage();
+            _transport.RaiseMessageReceived(transportMessage);
+
+            _transport.AckedMessages.ShouldContain(transportMessage);
+        }
+
+        [Test]
+        public void should_dump_incoming_message_if_unable_to_deserialize_it()
+        {
+            var command = new FakeCommand(123);
+            _messageSerializer.AddSerializationExceptionFor(command.TypeId());
+
+            var transportMessage = command.ToTransportMessage();
+            _transport.RaiseMessageReceived(transportMessage);
+
+            var dumpFileName = System.IO.Directory.GetFiles(_expectedDumpDirectory).ExpectedSingle();
+            dumpFileName.ShouldContain("Abc.Zebus.Tests.Messages.FakeCommand");
+            File.ReadAllBytes(dumpFileName).Length.ShouldEqual(2);
+        }
+    }
+}

+ 0 - 88
src/Abc.Zebus.Tests/Core/BusTests.Dispatch.cs

@@ -147,94 +147,6 @@ namespace Abc.Zebus.Tests.Core
             _transport.ExpectNothing();
         }
 
-        [Test]
-        public void should_send_CustomMessageProcessingFailed_if_unable_to_deserialize_message()
-        {
-            SetupPeersHandlingMessage<CustomProcessingFailed>(_peerUp);
-
-            _bus.Start();
-
-            var command = new FakeCommand(123);
-            _messageSerializer.AddSerializationExceptionFor(command.TypeId(), "Serialization error");
-
-            using (SystemDateTime.PauseTime())
-            using (MessageId.PauseIdGeneration())
-            {
-                var customProcessingFailedBytes = new CustomProcessingFailed("X", "Y", DateTime.UtcNow).ToTransportMessage().MessageBytes;
-                _messageSerializer.AddSerializationFuncFor<CustomProcessingFailed>(x =>
-                {
-                    x.ExceptionUtcTime.ShouldEqual(SystemDateTime.UtcNow);
-                    x.SourceTypeFullName.ShouldEqual(typeof(Bus).FullName);
-                    x.ExceptionMessage.ShouldContain("Unable to deserialize message");
-                    return customProcessingFailedBytes;
-                });
-
-                var transportMessage = command.ToTransportMessage();
-                _transport.RaiseMessageReceived(transportMessage);
-
-                var processingFailedTransportMessage = new TransportMessage(MessageUtil.TypeId<CustomProcessingFailed>(), customProcessingFailedBytes, _self);
-                _transport.ExpectExactly(new TransportMessageSent(processingFailedTransportMessage, _peerUp));
-            }
-        }
-
-        [Test]
-        public void should_include_exception_in_CustomProcessingFailed()
-        {
-            SetupPeersHandlingMessage<CustomProcessingFailed>(_peerUp);
-
-            _bus.Start();
-
-            var exception = new Exception("Expected exception");
-            _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
-
-            _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
-
-            var message = _transport.MessagesSent.OfType<CustomProcessingFailed>().ExpectedSingle();
-            message.ExceptionMessage.ShouldContain(exception.ToString());
-        }
-
-        [Test]
-        public void should_include_dump_path_in_CustomProcessingFailed()
-        {
-            SetupPeersHandlingMessage<CustomProcessingFailed>(_peerUp);
-
-            _bus.Start();
-
-            var exception = new Exception("Expected exception");
-            _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
-
-            _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
-
-            var message = _transport.MessagesSent.OfType<CustomProcessingFailed>().ExpectedSingle();
-            message.ExceptionMessage.ShouldContain(_expectedDumpDirectory);
-        }
-
-        [Test]
-        public void should_ack_transport_when_handling_undeserializable_message()
-        {
-            var command = new FakeCommand(123);
-            _messageSerializer.AddSerializationExceptionFor(command.TypeId());
-
-            var transportMessage = command.ToTransportMessage();
-            _transport.RaiseMessageReceived(transportMessage);
-
-            _transport.AckedMessages.ShouldContain(transportMessage);
-        }
-
-        [Test]
-        public void should_dump_incoming_message_if_unable_to_deserialize_it()
-        {
-            var command = new FakeCommand(123);
-            _messageSerializer.AddSerializationExceptionFor(command.TypeId());
-
-            var transportMessage = command.ToTransportMessage();
-            _transport.RaiseMessageReceived(transportMessage);
-
-            var dumpFileName = System.IO.Directory.GetFiles(_expectedDumpDirectory).ExpectedSingle();
-            dumpFileName.ShouldContain("Abc.Zebus.Tests.Messages.FakeCommand");
-            File.ReadAllBytes(dumpFileName).Length.ShouldEqual(2);
-        }
-
         [Test]
         public void should_stop_dispatcher_before_transport()
         {

+ 6 - 6
src/Abc.Zebus/Core/Bus.cs

@@ -430,7 +430,7 @@ namespace Abc.Zebus.Core
             if (!_messageIdToTaskCompletionSources.TryRemove(message.SourceCommandId, out taskCompletionSource))
                 return;
 
-            var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId, message.Payload, transportMessage.Originator) : null;
+            var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId, message.Payload, transportMessage) : null;
             var commandResult = new CommandResult(message.ErrorCode, response);
 
             var task = new Task(() => taskCompletionSource.SetResult(commandResult));
@@ -501,10 +501,10 @@ namespace Abc.Zebus.Core
 
         private IMessage ToMessage(TransportMessage transportMessage)
         {
-            return ToMessage(transportMessage.MessageTypeId, transportMessage.MessageBytes, transportMessage.Originator);
+            return ToMessage(transportMessage.MessageTypeId, transportMessage.MessageBytes, transportMessage);
         }
 
-        private IMessage ToMessage(MessageTypeId messageTypeId, byte[] messageBytes, OriginatorInfo originator)
+        private IMessage ToMessage(MessageTypeId messageTypeId, byte[] messageBytes, TransportMessage transportMessage)
         {
             try
             {
@@ -512,12 +512,12 @@ namespace Abc.Zebus.Core
             }
             catch (Exception exception)
             {
-                HandleDeserializationError(messageTypeId, messageBytes, originator, exception);
+                HandleDeserializationError(messageTypeId, messageBytes, transportMessage.Originator, exception, transportMessage);
             }
             return null;
         }
 
-        private void HandleDeserializationError(MessageTypeId messageTypeId, byte[] messageBytes, OriginatorInfo originator, Exception exception)
+        private void HandleDeserializationError(MessageTypeId messageTypeId, byte[] messageBytes, OriginatorInfo originator, Exception exception, TransportMessage transportMessage)
         {
             var dumpLocation = DumpMessageOnDisk(messageTypeId, messageBytes);
             var errorMessage = string.Format("Unable to deserialize message {0}. Originator: {1}. Message dumped at: {2}\r\n{3}", messageTypeId.FullName, originator.SenderId, dumpLocation, exception);
@@ -526,7 +526,7 @@ namespace Abc.Zebus.Core
             if (!_isRunning)
                 return;
 
-            var processingFailed = new CustomProcessingFailed(GetType().FullName, errorMessage, SystemDateTime.UtcNow);
+            var processingFailed = new MessageProcessingFailed(transportMessage,String.Empty, errorMessage, SystemDateTime.UtcNow, null);
             Publish(processingFailed);
         }