|
@@ -32,30 +32,32 @@ namespace Abc.Zebus.Tests.Core
|
|
|
_bus.Start();
|
|
|
|
|
|
var command = new FakeCommand(123);
|
|
|
- _messageSerializer.AddSerializationExceptionFor(command.TypeId(), "Serialization error");
|
|
|
|
|
|
using (SystemDateTime.PauseTime())
|
|
|
using (MessageId.PauseIdGeneration())
|
|
|
{
|
|
|
var transportMessage = command.ToTransportMessage();
|
|
|
-
|
|
|
- var messageProcessingFailedBytes = new MessageProcessingFailed(null, null, null, DateTime.UtcNow, null).ToTransportMessage().Content;
|
|
|
- _messageSerializer.AddSerializationFuncFor<MessageProcessingFailed>(x =>
|
|
|
- {
|
|
|
- x.FailingMessage.ShouldEqual(transportMessage);
|
|
|
- x.ExceptionUtcTime.ShouldEqual(SystemDateTime.UtcNow);
|
|
|
- x.ExceptionMessage.ShouldContain("Unable to deserialize message");
|
|
|
- x.ExceptionMessage.ShouldContain($"MessageId: {transportMessage.Id}");
|
|
|
- return messageProcessingFailedBytes;
|
|
|
- });
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
|
|
|
_transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
- var processingFailedTransportMessage = new TransportMessage(MessageUtil.TypeId<MessageProcessingFailed>(), messageProcessingFailedBytes, _self);
|
|
|
- _transport.ExpectExactly(new TransportMessageSent(processingFailedTransportMessage, _peerUp));
|
|
|
+ var sentTransportMessage = _transport.Messages.ExpectedSingle();
|
|
|
+ sentTransportMessage.Targets.ShouldBeEquivalentTo(_peerUp);
|
|
|
+
|
|
|
+ var sentMessage = sentTransportMessage.TransportMessage.ToMessage().ShouldBe<MessageProcessingFailed>();
|
|
|
+ sentMessage.FailingMessage.ShouldEqualDeeply(transportMessage);
|
|
|
+ sentMessage.ExceptionUtcTime.ShouldEqual(SystemDateTime.UtcNow);
|
|
|
+ sentMessage.ExceptionMessage.ShouldContain("Unable to deserialize message");
|
|
|
+ sentMessage.ExceptionMessage.ShouldContain($"MessageId: {transportMessage.Id}");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static void MakeContentInvalid(TransportMessage transportMessage)
|
|
|
+ {
|
|
|
+ // Zero is an invalid first byte for protobuf
|
|
|
+ transportMessage.Content = new byte[transportMessage.Content.Length];
|
|
|
+ }
|
|
|
+
|
|
|
[Test]
|
|
|
public void should_send_MessageProcessingFailed_when_error_publication_os_not_enabled()
|
|
|
{
|
|
@@ -65,10 +67,10 @@ namespace Abc.Zebus.Tests.Core
|
|
|
|
|
|
_bus.Start();
|
|
|
|
|
|
- var exception = new Exception("Expected exception");
|
|
|
- _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
|
|
|
+ var transportMessage = new FakeCommand(123).ToTransportMessage();
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
|
|
|
- _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
|
|
|
+ _transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
_transport.ExpectNothing();
|
|
|
}
|
|
@@ -80,13 +82,13 @@ namespace Abc.Zebus.Tests.Core
|
|
|
|
|
|
_bus.Start();
|
|
|
|
|
|
- var exception = new Exception("Expected exception");
|
|
|
- _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
|
|
|
+ var transportMessage = new FakeCommand(123).ToTransportMessage();
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
|
|
|
- _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
|
|
|
+ _transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
var message = _transport.MessagesSent.OfType<MessageProcessingFailed>().ExpectedSingle();
|
|
|
- message.ExceptionMessage.ShouldContain(exception.ToString());
|
|
|
+ message.ExceptionMessage.ShouldContain("Exception");
|
|
|
}
|
|
|
|
|
|
[Test]
|
|
@@ -96,10 +98,10 @@ namespace Abc.Zebus.Tests.Core
|
|
|
|
|
|
_bus.Start();
|
|
|
|
|
|
- var exception = new Exception("Expected exception");
|
|
|
- _messageSerializer.AddSerializationExceptionFor<FakeCommand>(exception);
|
|
|
+ var transportMessage = new FakeCommand(123).ToTransportMessage();
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
|
|
|
- _transport.RaiseMessageReceived(new FakeCommand(123).ToTransportMessage());
|
|
|
+ _transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
var message = _transport.MessagesSent.OfType<MessageProcessingFailed>().ExpectedSingle();
|
|
|
message.ExceptionMessage.ShouldContain(_bus.DeserializationFailureDumpDirectoryPath);
|
|
@@ -109,9 +111,10 @@ namespace Abc.Zebus.Tests.Core
|
|
|
public void should_ack_transport_when_handling_undeserializable_message()
|
|
|
{
|
|
|
var command = new FakeCommand(123);
|
|
|
- _messageSerializer.AddSerializationExceptionFor(command.TypeId());
|
|
|
|
|
|
var transportMessage = command.ToTransportMessage();
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
+
|
|
|
_transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
_transport.AckedMessages.ShouldContain(transportMessage);
|
|
@@ -121,9 +124,10 @@ namespace Abc.Zebus.Tests.Core
|
|
|
public void should_dump_incoming_message_if_unable_to_deserialize_it()
|
|
|
{
|
|
|
var command = new FakeCommand(123);
|
|
|
- _messageSerializer.AddSerializationExceptionFor(command.TypeId());
|
|
|
|
|
|
var transportMessage = command.ToTransportMessage();
|
|
|
+ MakeContentInvalid(transportMessage);
|
|
|
+
|
|
|
_transport.RaiseMessageReceived(transportMessage);
|
|
|
|
|
|
var dumpFileName = System.IO.Directory.GetFiles(_bus.DeserializationFailureDumpDirectoryPath).ExpectedSingle();
|