Bladeren bron

Use ReadOnlyMemory<byte> instead of Stream for TransportMessage content

The goal of Stream was to encapsulate the underlying buffer and allow
future optimizations, for example by using RecyclableMemoryStream
to reduce buffer allocations.

Yet, these optimizations were never implemented. Also, streams are
stateful and non-thread-safe, which makes passing TransportMessage
among threads quite dangerous. So, a clean implementation of the stream
design would imply to properly manage TransportMessage ownership,
to copy them when required and to dispose the underlying stream.

A concurrency bug was identified in PersistMessageCommandHandler,
because a TransportMessage was forwarded to multiple threads. Instead of
fixing this very specific bug, it was decided to make TransportMessage
immutable by using ReadOnlyMemory instead of Stream, thus removing
the possibility of the currency bug to happen.

This is a breaking change that should only impact low level code.
Olivier Coanet 3 jaren geleden
bovenliggende
commit
2c16ef571e
29 gewijzigde bestanden met toevoegingen van 322 en 187 verwijderingen
  1. 1 1
      src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs
  2. 91 2
      src/Abc.Zebus.Persistence.Tests/Handlers/PersistMessageCommandHandlerTests.cs
  3. 4 4
      src/Abc.Zebus.Persistence.Tests/Transport/QueueingTransportTests.cs
  4. 1 1
      src/Abc.Zebus.Persistence/Transport/QueueingTransport.cs
  5. 1 1
      src/Abc.Zebus.Testing/Transport/TestTransport.cs
  6. 2 2
      src/Abc.Zebus.Tests/Core/BusTests.Dispatch.cs
  7. 2 2
      src/Abc.Zebus.Tests/Lotus/ReplayMessageHandlerTests.cs
  8. 5 5
      src/Abc.Zebus.Tests/Persistence/PersistentTransportTests.cs
  9. 23 2
      src/Abc.Zebus.Tests/Serialization/MessageSerializerTests.cs
  10. 5 5
      src/Abc.Zebus.Tests/Serialization/TestMessageSerializer.cs
  11. 2 2
      src/Abc.Zebus.Tests/SerializationTests.cs
  12. 5 5
      src/Abc.Zebus.Tests/TestData.cs
  13. 5 8
      src/Abc.Zebus.Tests/Transport/BackwardCompatibilityTests.cs
  14. 3 2
      src/Abc.Zebus.Tests/Transport/TransportMessageReaderTests.cs
  15. 64 1
      src/Abc.Zebus.Tests/Transport/TransportMessageTests.cs
  16. 2 10
      src/Abc.Zebus.Tests/Transport/ZmqTransportTests.cs
  17. 9 12
      src/Abc.Zebus/Core/Bus.cs
  18. 2 2
      src/Abc.Zebus/Core/BusMessageLogger.cs
  19. 11 15
      src/Abc.Zebus/Core/MessageExecutionCompleted.cs
  20. 1 1
      src/Abc.Zebus/Persistence/PersistentTransport.cs
  21. 4 4
      src/Abc.Zebus/Serialization/IMessageSerializer.cs
  22. 4 8
      src/Abc.Zebus/Serialization/MessageSerializer.cs
  23. 3 5
      src/Abc.Zebus/Serialization/MessageSerializerExtensions.cs
  24. 6 32
      src/Abc.Zebus/Serialization/Protobuf/ProtoBufferWriter.cs
  25. 24 8
      src/Abc.Zebus/Serialization/Serializer.cs
  26. 28 34
      src/Abc.Zebus/Transport/TransportMessage.cs
  27. 10 9
      src/Abc.Zebus/Transport/TransportMessageReader.cs
  28. 2 2
      src/Abc.Zebus/Transport/TransportMessageWriter.cs
  29. 2 2
      src/Abc.Zebus/Transport/ZmqTransport.cs

+ 1 - 1
src/Abc.Zebus.Persistence.Cassandra.Tests/CqlMessageReaderTests.cs

@@ -78,7 +78,7 @@ namespace Abc.Zebus.Persistence.Cassandra.Tests
         {
             var bytes = new byte[128];
             new Random().NextBytes(bytes);
-            return new TransportMessage(new MessageTypeId("Fake"), new MemoryStream(bytes), new Peer(peerId, string.Empty));
+            return new TransportMessage(new MessageTypeId("Fake"), bytes, new Peer(peerId, string.Empty));
         }
 
         private CqlMessageReader CreateReader(PeerId peerId, DateTime oldestNonAckedMessage)

+ 91 - 2
src/Abc.Zebus.Persistence.Tests/Handlers/PersistMessageCommandHandlerTests.cs

@@ -1,8 +1,14 @@
-using Abc.Zebus.Persistence.Handlers;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Threading.Tasks;
+using Abc.Zebus.Persistence.Handlers;
 using Abc.Zebus.Persistence.Tests.Matching;
 using Abc.Zebus.Serialization;
 using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
+using Abc.Zebus.Transport;
+using Abc.Zebus.Util;
 using Moq;
 using NUnit.Framework;
 
@@ -82,13 +88,96 @@ namespace Abc.Zebus.Persistence.Tests.Handlers
             var replayerMock = new Mock<IMessageReplayer>();
             _replayerRepository.Setup(x => x.GetActiveMessageReplayer(targetPeerId)).Returns(replayerMock.Object);
 
-            var transportMessage = new FakeCommand(1).ToTransportMessage();
+            var message = new FakeCommand(1);
+            var transportMessage = message.ToTransportMessage();
+            transportMessage.GetContentBytes().ShouldEqual(Serializer.Serialize(message).ToArray());
 
             // Act
             _handler.Handle(new PersistMessageCommand(transportMessage, targetPeerId));
 
             // Assert
             replayerMock.Verify(x => x.AddLiveMessage(transportMessage));
+            transportMessage.GetContentBytes().ShouldEqual(Serializer.Serialize(message).ToArray());
+        }
+
+        [Test, Repeat(5)]
+        public void should_send_message_to_multiple_replayers()
+        {
+            // Arrange
+            const int targetPeerCount = 100;
+
+            var replayedMessages = new List<(PeerId, TransportMessage)>();
+
+            var targetPeerIds = Enumerable.Range(0, targetPeerCount).Select(x => new PeerId($"Abc.Testing.Target.{x}")).ToList();
+            var replayers = targetPeerIds.ToDictionary(x => x, x => new TestMessageReplayer(x, replayedMessages));
+
+            _replayerRepository.Setup(x => x.GetActiveMessageReplayer(It.IsAny<PeerId>()))
+                               .Returns<PeerId>(peerId => replayers[peerId]);
+
+            var sourceMessage = new FakeCommand(123456);
+            var sourceTransportMessage = sourceMessage.ToTransportMessage().ToPersistTransportMessage(targetPeerIds);
+            var persistMessageCommand = (PersistMessageCommand)sourceTransportMessage.ToMessage();
+
+            // Act
+            _handler.Handle(persistMessageCommand);
+
+            // Assert
+            Wait.Until(() => replayedMessages.Count == targetPeerCount, 5.Seconds());
+
+            foreach (var (peerId, replayedMessage) in replayedMessages)
+            {
+                var messageReplayed = (MessageReplayed)replayedMessage.ToMessage();
+                var sourceMessageCopy = (FakeCommand)messageReplayed.Message.ToMessage();
+                sourceMessageCopy.ShouldEqualDeeply(sourceMessage);
+            }
+        }
+
+        private class TestMessageReplayer : IMessageReplayer
+        {
+            private readonly MessageSerializer _messageSerializer = new MessageSerializer();
+            private readonly PeerId _peerId;
+            private readonly List<(PeerId, TransportMessage)> _replayMessages;
+
+            public TestMessageReplayer(PeerId peerId, List<(PeerId, TransportMessage)> replayMessages)
+            {
+                _peerId = peerId;
+                _replayMessages = replayMessages;
+            }
+
+            public event Action Stopped;
+
+            public void AddLiveMessage(TransportMessage message)
+            {
+                Task.Run(() =>
+                {
+                    var messageReplayed = new MessageReplayed(Guid.Empty, message);
+                    var messageReplayedTransportMessage = ToTransportMessage(messageReplayed);
+
+                    lock (_replayMessages)
+                    {
+                        _replayMessages.Add((_peerId, messageReplayedTransportMessage));
+                    }
+                });
+            }
+
+            private TransportMessage ToTransportMessage(IMessage message)
+            {
+                return new TransportMessage(message.TypeId(), _messageSerializer.Serialize(message), new PeerId("Abc.Testing.0"), "tcp://testing:1234");
+            }
+
+            public void Start()
+            {
+            }
+
+            public bool Cancel()
+            {
+                Stopped?.Invoke();
+                return true;
+            }
+
+            public void OnMessageAcked(MessageId messageId)
+            {
+            }
         }
     }
 }

+ 4 - 4
src/Abc.Zebus.Persistence.Tests/Transport/QueueingTransportTests.cs

@@ -179,10 +179,10 @@ namespace Abc.Zebus.Persistence.Tests.Transport
                 stopped.ShouldBeFalse();
 
                 var targets = _allPeers.Where(peer => peer.PeerId != self.Id).Select(desc => desc.Peer).ToArray(); // do not send to self
-                _innerTransport.ExpectExactly(new TransportMessageSent(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), self), targets));
+                _innerTransport.ExpectExactly(new TransportMessageSent(new TransportMessage(MessageTypeId.PersistenceStopping, default, self), targets));
 
-                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), _allPeers[0].Peer));
-                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), _allPeers[1].Peer));
+                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, default, _allPeers[0].Peer));
+                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, default, _allPeers[1].Peer));
 
                 Wait.Until(() => stopped, 2.Seconds());
             }
@@ -209,7 +209,7 @@ namespace Abc.Zebus.Persistence.Tests.Transport
                 Wait.Until(() => _innerTransport.Messages.Count == 1, 2.Seconds());
                 stopped.ShouldBeFalse();
 
-                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), _allPeers[1].Peer));
+                _innerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStoppingAck, default, _allPeers[1].Peer));
 
                 Wait.Until(() => stopped, 2.Seconds());
             }

+ 1 - 1
src/Abc.Zebus.Persistence/Transport/QueueingTransport.cs

@@ -91,7 +91,7 @@ namespace Abc.Zebus.Persistence.Transport
             var targets = _peerDirectory.GetPeerDescriptors().Select(desc => desc.Peer).Where(peer => peer.Id != _transport.PeerId).ToList();
             _ackCountdown = new CountdownEvent(targets.Count);
 
-            _transport.Send(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PeerId, InboundEndPoint), targets, new SendContext());
+            _transport.Send(new TransportMessage(MessageTypeId.PersistenceStopping, default, PeerId, InboundEndPoint), targets, new SendContext());
 
             _logger.LogInformation($"Waiting for {targets.Count} persistence stopping acknowledgments within the next {_configuration.QueuingTransportStopTimeout.TotalSeconds} seconds");
             var success = _ackCountdown.Wait(_configuration.QueuingTransportStopTimeout);

+ 1 - 1
src/Abc.Zebus.Testing/Transport/TestTransport.cs

@@ -96,7 +96,7 @@ namespace Abc.Zebus.Testing.Transport
 
         public TransportMessage CreateInfrastructureTransportMessage(MessageTypeId messageTypeId)
         {
-            return new TransportMessage(messageTypeId, new MemoryStream(), PeerId, InboundEndPoint);
+            return new TransportMessage(messageTypeId, default, PeerId, InboundEndPoint);
         }
 
         public void RaiseMessageReceived(TransportMessage transportMessage)

+ 2 - 2
src/Abc.Zebus.Tests/Core/BusTests.Dispatch.cs

@@ -49,7 +49,7 @@ namespace Abc.Zebus.Tests.Core
                 SetupDispatch(command, _ => invokerCalled = true);
 
                 var transportMessageReceived = command.ToTransportMessage(_peerUp);
-                transportMessageReceived.Content = Stream.Null;
+                transportMessageReceived.Content = default;
                 _transport.RaiseMessageReceived(transportMessageReceived);
 
                 invokerCalled.ShouldBeTrue();
@@ -194,7 +194,7 @@ namespace Abc.Zebus.Tests.Core
             {
                 var command = new EmptyCommand();
                 var transportMessage = command.ToTransportMessage();
-                transportMessage.Content = Stream.Null;
+                transportMessage.Content = default;
                 var dispatch = _bus.CreateMessageDispatch(transportMessage);
 
                 dispatch.Message.ShouldEqualDeeply(command);

+ 2 - 2
src/Abc.Zebus.Tests/Lotus/ReplayMessageHandlerTests.cs

@@ -18,7 +18,7 @@ namespace Abc.Zebus.Tests.Lotus
         public void Should_replay_messages_on_failed_handlers()
         {
             var peer = new Peer(new PeerId("Test"), "test://test");
-            var message = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new MemoryStream(new byte[20]), peer);
+            var message = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new byte[20], peer);
 
             var dispatcher = new FakeMessageDispatcher();
             var handler = new ReplayMessageHandler(dispatcher, new FakeDispatchFactory());
@@ -32,7 +32,7 @@ namespace Abc.Zebus.Tests.Lotus
         public void Should_replay_messages_on_all_handlers()
         {
             var peer = new Peer(new PeerId("Test"), "test://test");
-            var message = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new MemoryStream(new byte[20]), peer);
+            var message = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new byte[20], peer);
 
             var dispatcher = new FakeMessageDispatcher();
             var handler = new ReplayMessageHandler(dispatcher, new FakeDispatchFactory());

+ 5 - 5
src/Abc.Zebus.Tests/Persistence/PersistentTransportTests.cs

@@ -351,8 +351,8 @@ namespace Abc.Zebus.Tests.Persistence
             using(MessageId.PauseIdGeneration())
             {
                 // Stopping persistence
-                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PersistencePeer));
-                InnerTransport.ExpectExactly(new TransportMessageSent(new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), Self), PersistencePeer));
+                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, default, PersistencePeer));
+                InnerTransport.ExpectExactly(new TransportMessageSent(new TransportMessage(MessageTypeId.PersistenceStoppingAck, default, Self), PersistencePeer));
 
                 InnerTransport.Messages.Clear();
 
@@ -396,7 +396,7 @@ namespace Abc.Zebus.Tests.Persistence
             using (MessageId.PauseIdGeneration())
             {
                 // Stopping persistence
-                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PersistencePeer));
+                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, default, PersistencePeer));
                 InnerTransport.Messages.Clear();
 
                 var ackedMessage = new FakeCommand(456).ToTransportMessage();
@@ -412,7 +412,7 @@ namespace Abc.Zebus.Tests.Persistence
                 });
 
                 // Stopping persistence again
-                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PersistencePeer));
+                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, default, PersistencePeer));
                 InnerTransport.Messages.Clear();
 
                 // starting persistence again - should not have anything to send
@@ -433,7 +433,7 @@ namespace Abc.Zebus.Tests.Persistence
                              .Returns(new List<Peer> { PersistencePeer });
 
                 // Stopping persistence
-                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, new MemoryStream(), PersistencePeer));
+                InnerTransport.RaiseMessageReceived(new TransportMessage(MessageTypeId.PersistenceStopping, default, PersistencePeer));
                 InnerTransport.Messages.Clear();
 
                 var ackedMessage = new FakeCommand(456).ToTransportMessage();

+ 23 - 2
src/Abc.Zebus.Tests/Serialization/MessageSerializerTests.cs

@@ -64,9 +64,18 @@ namespace Abc.Zebus.Tests.Serialization
             var message = new SerializableMessage { Value = 42 };
 
             _messageSerializer.TryClone(message, out var clone).ShouldBeTrue();
-            clone.ShouldNotBeNull();
             clone.ShouldNotBeTheSameAs(message);
-            clone.ShouldBe<SerializableMessage>().Value.ShouldEqual(message.Value);
+            clone.ShouldEqualDeeply(message);
+        }
+
+        [Test]
+        public void should_clone_serializable_message_without_parameterless_constructor()
+        {
+            var message = new SerializableMessageWithoutParameterlessConstructor(123456789);
+
+            _messageSerializer.TryClone(message, out var clone).ShouldBeTrue();
+            clone.ShouldNotBeTheSameAs(message);
+            clone.ShouldEqualDeeply(message);
         }
 
         [Test]
@@ -83,6 +92,18 @@ namespace Abc.Zebus.Tests.Serialization
             public int Value { get; set; }
         }
 
+        [ProtoContract]
+        public class SerializableMessageWithoutParameterlessConstructor : IMessage
+        {
+            [ProtoMember(1)]
+            public int Value { get; set; }
+
+            public SerializableMessageWithoutParameterlessConstructor(int value)
+            {
+                Value = value;
+            }
+        }
+
         public class NonSerializableMessage : IMessage
         {
         }

+ 5 - 5
src/Abc.Zebus.Tests/Serialization/TestMessageSerializer.cs

@@ -8,10 +8,10 @@ namespace Abc.Zebus.Tests.Serialization
     public class TestMessageSerializer : IMessageSerializer
     {
         private readonly Dictionary<MessageTypeId, Exception> _serializationExceptions = new Dictionary<MessageTypeId, Exception>();
-        private readonly Dictionary<MessageTypeId, Func<IMessage, Stream>> _serializationFuncs = new Dictionary<MessageTypeId, Func<IMessage, Stream>>();
+        private readonly Dictionary<MessageTypeId, Func<IMessage, ReadOnlyMemory<byte>>> _serializationFuncs = new Dictionary<MessageTypeId, Func<IMessage, ReadOnlyMemory<byte>>>();
         private readonly MessageSerializer _serializer = new MessageSerializer();
 
-        public void AddSerializationFuncFor<TMessage>(Func<TMessage, Stream> func)
+        public void AddSerializationFuncFor<TMessage>(Func<TMessage, ReadOnlyMemory<byte>> func)
             where TMessage : IMessage
         {
             _serializationFuncs.Add(MessageUtil.TypeId<TMessage>(), msg => func((TMessage)msg));
@@ -28,18 +28,18 @@ namespace Abc.Zebus.Tests.Serialization
             _serializationExceptions.Add(MessageUtil.TypeId<TMessage>(), exception);
         }
 
-        public IMessage Deserialize(MessageTypeId messageTypeId, Stream stream)
+        public IMessage Deserialize(MessageTypeId messageTypeId, ReadOnlyMemory<byte> bytes)
         {
             if (_serializationExceptions.TryGetValue(messageTypeId, out var exception))
                 throw exception;
 
-            return _serializer.Deserialize(messageTypeId, stream);
+            return _serializer.Deserialize(messageTypeId, bytes);
         }
 
         public bool TryClone(IMessage message, out IMessage clone)
             => _serializer.TryClone(message, out clone);
 
-        public Stream Serialize(IMessage message)
+        public ReadOnlyMemory<byte> Serialize(IMessage message)
         {
             if (_serializationExceptions.TryGetValue(message.TypeId(), out var exception))
                 throw exception;

+ 2 - 2
src/Abc.Zebus.Tests/SerializationTests.cs

@@ -22,7 +22,7 @@ namespace Abc.Zebus.Tests
             {
                 MessageId.NextId(),
                 new MessageTypeId("X"),
-                new TransportMessage(new MessageTypeId("lol"), new MemoryStream(new byte[] { 1, 2, 3 }), new PeerId("peer"), "endpoint"),
+                new TransportMessage(new MessageTypeId("lol"), new byte[] { 1, 2, 3 }, new PeerId("peer"), "endpoint"),
                 new BindingKey("Abc", "123"),
                 new Peer(new PeerId("Abc.Testing.0"), "tcp://abctest:123", true, true),
             };
@@ -33,7 +33,7 @@ namespace Abc.Zebus.Tests
         [Test]
         public void should_deserialize_message_with_null_content()
         {
-            var message = Serializer.Deserialize(typeof(EmptyCommand), Stream.Null) as EmptyCommand;
+            var message = Serializer.Deserialize(typeof(EmptyCommand), default) as EmptyCommand;
             message.ShouldNotBeNull();
         }
 

+ 5 - 5
src/Abc.Zebus.Tests/TestData.cs

@@ -1,7 +1,9 @@
 using System;
 using System.IO;
 using Abc.Zebus.Routing;
+using Abc.Zebus.Serialization;
 using Abc.Zebus.Transport;
+using AutoFixture;
 
 namespace Abc.Zebus.Tests
 {
@@ -16,11 +18,9 @@ namespace Abc.Zebus.Tests
 
         public static TransportMessage TransportMessage<TMessage>()
         {
-            var contentBytes = new byte[1234];
-            new Random().NextBytes(contentBytes);
-
-            var content = new MemoryStream();
-            content.Write(contentBytes, 0, contentBytes.Length);
+            var fixture = new Fixture();
+            var message = fixture.Create<TMessage>();
+            var content = Serializer.Serialize(message);
 
             return new TransportMessage(new MessageTypeId(typeof(TMessage)), content, new PeerId("Abc.Testing.0"), "tcp://testing:1234")
             {

+ 5 - 8
src/Abc.Zebus.Tests/Transport/BackwardCompatibilityTests.cs

@@ -15,7 +15,7 @@ namespace Abc.Zebus.Tests.Transport
         [Test]
         public void should_deserialize_1_4_1_transport_messages()
         {
-            var content = new MemoryStream(new byte[] { 1, 2, 3 });
+            var content = new byte[] { 1, 2, 3 };
             var originatorInfo = new OriginatorInfo(new PeerId("peer"), "endpoint", "MACHINEXXX", "username");
             var messageId = new MessageId(Guid.Parse("ce0ac850-a9c5-e511-932e-d8e94a2d2418"));
             var expectedMessage = new TransportMessage(new MessageTypeId("lol"), content, originatorInfo) { Id = messageId };
@@ -30,7 +30,7 @@ namespace Abc.Zebus.Tests.Transport
         [Test]
         public void should_read_WasPersisted_as_null_for_older_versions()
         {
-            var content = new MemoryStream(new byte[] { 1, 2, 3 });
+            var content = new byte[] { 1, 2, 3 };
             var originatorInfo = new OriginatorInfo(new PeerId("peer"), "endpoint", "MACHINEXXX", "username");
             var messageId = new MessageId(Guid.Parse("ce0ac850-a9c5-e511-932e-d8e94a2d2418"));
             var expectedMessage = new TransportMessage(new MessageTypeId("lol"), content, originatorInfo) { Id = messageId, WasPersisted = false };
@@ -55,12 +55,9 @@ namespace Abc.Zebus.Tests.Transport
             var oldOutput = new MemoryStream();
             ProtoBuf.Serializer.Serialize(oldOutput, oldTransportMessage);
 
-            var newTransportMessage = new TransportMessage()
-            {
-                Id = oldTransportMessage.Id,
-                Originator = new OriginatorInfo(),
-                Content = new MemoryStream(),
-            };
+            var newTransportMessage = TransportMessage.Empty();
+            newTransportMessage.Id = oldTransportMessage.Id;
+
             var bufferWriter = new ProtoBufferWriter();
             bufferWriter.WriteTransportMessage(newTransportMessage);
 

+ 3 - 2
src/Abc.Zebus.Tests/Transport/TransportMessageReaderTests.cs

@@ -1,4 +1,5 @@
-using System.Collections.Generic;
+using System;
+using System.Collections.Generic;
 using System.IO;
 using System.Text;
 using Abc.Zebus.Serialization.Protobuf;
@@ -47,7 +48,7 @@ namespace Abc.Zebus.Tests.Transport
 
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
-            deserialized.Content.ShouldEqual(Stream.Null);
+            deserialized.GetContentBytes().ShouldEqual(Array.Empty<byte>());
             deserialized.Originator.ShouldEqualDeeply(transportMessage.Originator);
             deserialized.Environment.ShouldEqual(transportMessage.Environment);
             deserialized.WasPersisted.ShouldEqual(transportMessage.WasPersisted);

+ 64 - 1
src/Abc.Zebus.Tests/Transport/TransportMessageTests.cs

@@ -1,9 +1,18 @@
-using System.IO;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using Abc.Zebus.Serialization;
+using Abc.Zebus.Testing;
 using Abc.Zebus.Testing.Extensions;
 using Abc.Zebus.Tests.Messages;
 using Abc.Zebus.Transport;
+using Abc.Zebus.Util;
 using NUnit.Framework;
 using ProtoBuf;
+using Serializer = ProtoBuf.Serializer;
 
 namespace Abc.Zebus.Tests.Transport
 {
@@ -43,5 +52,59 @@ namespace Abc.Zebus.Tests.Transport
             // Assert
             transportMessage.ShouldEqualDeeply(expectedTransportMessage);
         }
+
+        [Test, Repeat(10)]
+        public void should_serialize_from_multiple_threads()
+        {
+            // Arrange
+            const int threadCount = 100;
+
+            var serializer = new MessageSerializer();
+            var message = new TestMessage(Guid.NewGuid());
+            var transportMessage = serializer.ToTransportMessage(message, new PeerId("Abc.X.0"), "tcp://abctest:123");
+            var serializedTransportMessages = new List<byte[]>();
+            var signal = new ManualResetEventSlim();
+
+            // Act
+            for (var i = 0; i < threadCount; i++)
+            {
+                Task.Run(() =>
+                {
+                    signal.Wait(10.Seconds()).ShouldBeTrue();
+
+                    var bytes = TransportMessage.Serialize(transportMessage);
+                    lock (serializedTransportMessages)
+                    {
+                        serializedTransportMessages.Add(bytes);
+                    }
+                });
+            }
+
+            signal.Set();
+
+            // Assert
+            Wait.Until(() => serializedTransportMessages.Count == threadCount, 5.Seconds());
+
+            foreach (var serializedTransportMessage in serializedTransportMessages)
+            {
+                var deserializedTransportMessage = TransportMessage.Deserialize(serializedTransportMessage);
+                deserializedTransportMessage.ShouldEqualDeeply(transportMessage);
+
+                var deserializedMessage = serializer.ToMessage(deserializedTransportMessage);
+                deserializedMessage.ShouldEqualDeeply(message);
+            }
+        }
+
+        [ProtoContract]
+        private class TestMessage : IEvent
+        {
+            public TestMessage(Guid id)
+            {
+                Id = id;
+            }
+
+            [ProtoMember(1)]
+            public Guid Id { get; set; }
+        }
     }
 }

+ 2 - 10
src/Abc.Zebus.Tests/Transport/ZmqTransportTests.cs

@@ -500,27 +500,19 @@ namespace Abc.Zebus.Tests.Transport
             var messageBytes = new byte[5000];
             new Random().NextBytes(messageBytes);
 
-            var bigMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), ToMemoryStream(messageBytes), new PeerId("X"), senderTransport.InboundEndPoint);
+            var bigMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), messageBytes, new PeerId("X"), senderTransport.InboundEndPoint);
             senderTransport.Send(bigMessage, new[] { receiver });
 
             Wait.Until(() => receivedMessages.Count == 1, 2.Seconds());
 
             receivedMessages[0].ShouldHaveSamePropertiesAs(bigMessage, "Environment", "WasPersisted");
 
-            var smallMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), ToMemoryStream(new byte[1]), new PeerId("X"), senderTransport.InboundEndPoint);
+            var smallMessage = new TransportMessage(new MessageTypeId(typeof(FakeCommand)), new byte[1], new PeerId("X"), senderTransport.InboundEndPoint);
             senderTransport.Send(smallMessage, new[] { receiver });
 
             Wait.Until(() => receivedMessages.Count == 2, 2.Seconds());
 
             receivedMessages[1].ShouldHaveSamePropertiesAs(smallMessage, "Environment", "WasPersisted");
-
-            MemoryStream ToMemoryStream(byte[] bytes)
-            {
-                var stream = new MemoryStream();
-                stream.Write(bytes, 0, bytes.Length);
-
-                return stream;
-            }
         }
 
         [Test]

+ 9 - 12
src/Abc.Zebus/Core/Bus.cs

@@ -759,7 +759,7 @@ namespace Abc.Zebus.Core
             if (!_messageIdToTaskCompletionSources.TryRemove(message.SourceCommandId, out var taskCompletionSource))
                 return;
 
-            var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId.Value, new MemoryStream(message.Payload ?? Array.Empty<byte>()), transportMessage) : null;
+            var response = message.PayloadTypeId != null ? ToMessage(message.PayloadTypeId.Value, message.Payload, transportMessage) : null;
             var commandResult = new CommandResult(message.ErrorCode, message.ResponseMessage, response);
 
             taskCompletionSource.SetResult(commandResult);
@@ -823,26 +823,23 @@ namespace Abc.Zebus.Core
         private IMessage? ToMessage(TransportMessage transportMessage)
             => ToMessage(transportMessage.MessageTypeId, transportMessage.Content, transportMessage);
 
-        private IMessage? ToMessage(MessageTypeId messageTypeId, Stream? messageStream, TransportMessage transportMessage)
+        private IMessage? ToMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, TransportMessage transportMessage)
         {
-            if (messageStream is null)
-                return null;
-
             try
             {
-                return _serializer.ToMessage(transportMessage, messageTypeId, messageStream);
+                return _serializer.ToMessage(transportMessage, messageTypeId, messageContent);
             }
             catch (Exception exception)
             {
-                HandleDeserializationError(messageTypeId, messageStream, transportMessage.Originator, exception, transportMessage);
+                HandleDeserializationError(messageTypeId, messageContent, transportMessage.Originator, exception, transportMessage);
             }
 
             return null;
         }
 
-        private void HandleDeserializationError(MessageTypeId messageTypeId, Stream messageStream, OriginatorInfo originator, Exception exception, TransportMessage transportMessage)
+        private void HandleDeserializationError(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent, OriginatorInfo originator, Exception exception, TransportMessage transportMessage)
         {
-            var dumpLocation = DumpMessageOnDisk(messageTypeId, messageStream);
+            var dumpLocation = DumpMessageOnDisk(messageTypeId, messageContent);
             var errorMessage = $"Unable to deserialize message {messageTypeId.FullName}. Originator: {originator.SenderId}. Message dumped at: {dumpLocation}\r\n{exception}";
             _logger.LogError(errorMessage);
 
@@ -879,7 +876,7 @@ namespace Abc.Zebus.Core
         private MessageContext GetMessageContextForSubscriptionsUpdated()
             => MessageContext.Current ?? MessageContext.CreateOverride(PeerId, EndPoint);
 
-        private string DumpMessageOnDisk(MessageTypeId messageTypeId, Stream messageStream)
+        private string DumpMessageOnDisk(MessageTypeId messageTypeId, ReadOnlyMemory<byte> messageContent)
         {
             try
             {
@@ -888,10 +885,10 @@ namespace Abc.Zebus.Core
 
                 var dumpFileName = $"{_deserializationFailureTimestampProvider.NextUtcTimestamp():yyyyMMdd-HH-mm-ss.fffffff}_{messageTypeId.FullName}";
                 var dumpFilePath = Path.Combine(DeserializationFailureDumpDirectoryPath, dumpFileName);
-                messageStream.Seek(0, SeekOrigin.Begin);
                 using (var fileStream = new FileStream(dumpFilePath, FileMode.Create))
                 {
-                    messageStream.CopyTo(fileStream);
+                    var messageBytes = messageContent.ToArray();
+                    fileStream.Write(messageBytes, 0, messageBytes.Length);
                 }
 
                 return dumpFilePath;

+ 2 - 2
src/Abc.Zebus/Core/BusMessageLogger.cs

@@ -80,7 +80,7 @@ namespace Abc.Zebus.Core
                 return;
 
             var messageText = logHelper.GetMessageText(message);
-            _logger.LogDebug($"RECV remote: {messageText} from {transportMessage.SenderId} ({transportMessage.Content?.Length} bytes). [{transportMessage.Id}]");
+            _logger.LogDebug($"RECV remote: {messageText} from {transportMessage.SenderId} ({transportMessage.Content.Length} bytes). [{transportMessage.Id}]");
         }
 
         public void LogSendMessage(IMessage message, IList<Peer> peers)
@@ -102,7 +102,7 @@ namespace Abc.Zebus.Core
             var messageText = logHelper.GetMessageText(message);
             var targetPeersText = GetTargetPeersText(peers);
 
-            _logger.LogInformation($"SEND: {messageText} to {targetPeersText} ({transportMessage.Content?.Length} bytes) [{transportMessage.Id}]");
+            _logger.LogInformation($"SEND: {messageText} to {targetPeersText} ({transportMessage.Content.Length} bytes) [{transportMessage.Id}]");
         }
 
         private static string GetTargetPeersText(IList<Peer> peers)

+ 11 - 15
src/Abc.Zebus/Core/MessageExecutionCompleted.cs

@@ -5,6 +5,7 @@ using System.IO;
 using System.Linq;
 using Abc.Zebus.Dispatch;
 using Abc.Zebus.Serialization;
+using Newtonsoft.Json;
 using ProtoBuf;
 
 namespace Abc.Zebus.Core
@@ -25,7 +26,14 @@ namespace Abc.Zebus.Core
         public MessageTypeId? PayloadTypeId { get; private set; }
 
         [ProtoMember(4, IsRequired = false)]
-        public byte[]? Payload { get; private set; }
+        private byte[]? PayloadBytes
+        {
+            get => Payload.ToArray();
+            set => Payload = value;
+        }
+
+        [ProtoIgnore, JsonIgnore]
+        public ReadOnlyMemory<byte> Payload { get; private set; }
 
         [ProtoMember(5, IsRequired = false)]
         public string? ResponseMessage { get; private set; } = string.Empty;
@@ -43,7 +51,7 @@ namespace Abc.Zebus.Core
             ResponseMessage = responseMessage ?? string.Empty;
         }
 
-        public MessageExecutionCompleted(MessageId sourceCommandId, MessageTypeId payloadTypeId, byte[]? payload)
+        public MessageExecutionCompleted(MessageId sourceCommandId, MessageTypeId payloadTypeId, ReadOnlyMemory<byte> payload)
         {
             SourceCommandId = sourceCommandId;
             ErrorCode = 0;
@@ -71,8 +79,7 @@ namespace Abc.Zebus.Core
 
         public static MessageExecutionCompleted Success(MessageId sourceCommandId, IMessage payload, IMessageSerializer serializer)
         {
-            var payloadStream = serializer.Serialize(payload);
-            var payloadBytes = ToBytes(payloadStream);
+            var payloadBytes = serializer.Serialize(payload).ToArray();
 
             return new MessageExecutionCompleted(sourceCommandId, payload.TypeId(), payloadBytes);
         }
@@ -83,16 +90,5 @@ namespace Abc.Zebus.Core
 
             return new MessageExecutionCompleted(sourceCommandId, errorStatus.Code, errorStatus.Message);
         }
-
-        private static byte[] ToBytes(Stream payloadStream)
-        {
-            var memoryStream = payloadStream as MemoryStream;
-            if (memoryStream == null)
-            {
-                memoryStream = new MemoryStream();
-                payloadStream.CopyTo(memoryStream);
-            }
-            return memoryStream.ToArray();
-        }
     }
 }

+ 1 - 1
src/Abc.Zebus/Persistence/PersistentTransport.cs

@@ -253,7 +253,7 @@ namespace Abc.Zebus.Persistence
             {
                 _persistenceIsDown = true;
 
-                var ackMessage = new TransportMessage(MessageTypeId.PersistenceStoppingAck, new MemoryStream(), _innerTransport.PeerId, _innerTransport.InboundEndPoint);
+                var ackMessage = new TransportMessage(MessageTypeId.PersistenceStoppingAck, default, _innerTransport.PeerId, _innerTransport.InboundEndPoint);
 
                 _logger.LogInformation($"Sending PersistenceStoppingAck to {transportMessage.Originator.SenderId}");
                 _innerTransport.Send(ackMessage, new[] { new Peer(transportMessage.Originator.SenderId, transportMessage.Originator.SenderEndPoint) }, new SendContext());

+ 4 - 4
src/Abc.Zebus/Serialization/IMessageSerializer.cs

@@ -1,12 +1,12 @@
-using System.Diagnostics.CodeAnalysis;
-using System.IO;
+using System;
+using System.Diagnostics.CodeAnalysis;
 
 namespace Abc.Zebus.Serialization
 {
     public interface IMessageSerializer
     {
-        Stream Serialize(IMessage message);
-        IMessage? Deserialize(MessageTypeId messageTypeId, Stream stream);
+        ReadOnlyMemory<byte> Serialize(IMessage message);
+        IMessage? Deserialize(MessageTypeId messageTypeId, ReadOnlyMemory<byte> stream);
         bool TryClone(IMessage message, [MaybeNullWhen(false)] out IMessage clone);
     }
 }

+ 4 - 8
src/Abc.Zebus/Serialization/MessageSerializer.cs

@@ -1,4 +1,4 @@
-using System.IO;
+using System;
 using Microsoft.Extensions.Logging;
 
 namespace Abc.Zebus.Serialization
@@ -7,7 +7,7 @@ namespace Abc.Zebus.Serialization
     {
         private static readonly ILogger _log = ZebusLogManager.GetLogger(typeof(MessageSerializer));
 
-        public IMessage? Deserialize(MessageTypeId messageTypeId, Stream stream)
+        public IMessage? Deserialize(MessageTypeId messageTypeId, ReadOnlyMemory<byte> stream)
         {
             var messageType = messageTypeId.GetMessageType();
             if (messageType != null)
@@ -17,13 +17,9 @@ namespace Abc.Zebus.Serialization
             return null;
         }
 
-        public Stream Serialize(IMessage message)
+        public ReadOnlyMemory<byte> Serialize(IMessage message)
         {
-            var stream = new MemoryStream();
-            Serializer.Serialize(stream, message);
-            stream.Position = 0;
-
-            return stream;
+            return Serializer.Serialize(message);
         }
 
         public bool TryClone(IMessage message, out IMessage clone)

+ 3 - 5
src/Abc.Zebus/Serialization/MessageSerializerExtensions.cs

@@ -1,4 +1,5 @@
-using System.IO;
+using System;
+using System.IO;
 using Abc.Zebus.Persistence;
 using Abc.Zebus.Transport;
 
@@ -16,13 +17,10 @@ namespace Abc.Zebus.Serialization
 
         public static IMessage? ToMessage(this IMessageSerializer serializer, TransportMessage transportMessage)
         {
-            if (transportMessage.Content is null)
-                return null;
-
             return ToMessage(serializer, transportMessage, transportMessage.MessageTypeId, transportMessage.Content);
         }
 
-        public static IMessage? ToMessage(this IMessageSerializer serializer, TransportMessage transportMessage, MessageTypeId messageTypeId, Stream content)
+        public static IMessage? ToMessage(this IMessageSerializer serializer, TransportMessage transportMessage, MessageTypeId messageTypeId, ReadOnlyMemory<byte> content)
         {
             if (transportMessage.IsPersistTransportMessage)
                 return ToPersistMessageCommand(transportMessage);

+ 6 - 32
src/Abc.Zebus/Serialization/Protobuf/ProtoBufferWriter.cs

@@ -79,7 +79,7 @@ namespace Abc.Zebus.Serialization.Protobuf
             else
             {
                 byte[] bytes = Utf8Encoding.GetBytes(value);
-                WriteRawBytes(bytes);
+                WriteRawBytes(bytes.AsSpan());
             }
         }
 
@@ -147,46 +147,20 @@ namespace Abc.Zebus.Serialization.Protobuf
             }
         }
 
-        internal void WriteRawBytes(byte[] value)
+        public void WriteRawBytes(ReadOnlyMemory<byte> bytes)
         {
-            WriteRawBytes(value, 0, value.Length);
+            WriteRawBytes(bytes.Span);
         }
 
-        internal void WriteRawBytes(byte[] value, int offset, int length)
+        public void WriteRawBytes(ReadOnlySpan<byte> bytes)
         {
+            var length = bytes.Length;
             EnsureCapacity(length);
 
-            value.AsSpan(offset, length).CopyTo(_buffer.AsSpan(_position));
+            bytes.CopyTo(_buffer.AsSpan(_position));
             _position += length;
         }
 
-        public void WriteRawStream(Stream stream)
-        {
-            var length = (int)stream.Length;
-            EnsureCapacity(length);
-
-            stream.Position = 0;
-
-            var memoryStream = stream as MemoryStream;
-            if (memoryStream != null)
-                _position += memoryStream.Read(_buffer, _position, length);
-            else
-                WriteRawStreamSlow(stream);
-        }
-
-        private void WriteRawStreamSlow(Stream stream)
-        {
-            const int blockSize = 4096;
-
-            while (true)
-            {
-                var readCount = stream.Read(_buffer, _position, blockSize);
-                _position += readCount;
-                if (readCount != blockSize)
-                    break;
-            }
-        }
-
         private void EnsureCapacity(int length)
         {
             if (_buffer.Length - _position >= length)

+ 24 - 8
src/Abc.Zebus/Serialization/Serializer.cs

@@ -12,14 +12,15 @@ namespace Abc.Zebus.Serialization
     {
         private static readonly ConcurrentDictionary<Type, bool> _hasParameterLessConstructorByType = new ConcurrentDictionary<Type, bool>();
 
-        public static MemoryStream Serialize(object message)
+        public static ReadOnlyMemory<byte> Serialize(object message)
         {
             var stream = new MemoryStream();
             Serialize(stream, message);
-            return stream;
+
+            return new ReadOnlyMemory<byte>(stream.GetBuffer(), 0, (int)stream.Position);
         }
 
-        public static void Serialize(Stream stream, object message)
+        private static void Serialize(Stream stream, object message)
         {
             try
             {
@@ -32,18 +33,33 @@ namespace Abc.Zebus.Serialization
         }
 
         [return: NotNullIfNotNull("messageType")]
-        public static object? Deserialize(Type? messageType, Stream stream)
+        public static object? Deserialize(Type? messageType, ReadOnlyMemory<byte> bytes)
+        {
+            if (messageType is null)
+                return null;
+
+            var obj = CreateMessageIfRequired(messageType);
+
+            return RuntimeTypeModel.Default.Deserialize(bytes, type: messageType, value: obj);
+        }
+
+        [return: NotNullIfNotNull("messageType")]
+        private static object? Deserialize(Type? messageType, Stream stream)
         {
             if (messageType is null)
                 return null;
 
-            stream.Position = 0; // Reset position
+            var obj = CreateMessageIfRequired(messageType);
 
-            object? obj = null;
+            return RuntimeTypeModel.Default.Deserialize(stream, value: obj, type: messageType);
+        }
+
+        private static object? CreateMessageIfRequired(Type messageType)
+        {
             if (!HasParameterLessConstructor(messageType) && messageType != typeof(string))
-                obj = FormatterServices.GetUninitializedObject(messageType);
+                return FormatterServices.GetUninitializedObject(messageType);
 
-            return RuntimeTypeModel.Default.Deserialize(stream, obj, messageType);
+            return null;
         }
 
         public static bool TryClone<T>(T? message, [NotNullWhen(true)] out T? clone)

+ 28 - 34
src/Abc.Zebus/Transport/TransportMessage.cs

@@ -1,6 +1,5 @@
 using System;
 using System.Collections.Generic;
-using System.IO;
 using Abc.Zebus.Serialization.Protobuf;
 using JetBrains.Annotations;
 using Newtonsoft.Json;
@@ -20,12 +19,12 @@ namespace Abc.Zebus.Transport
         [ProtoMember(3, IsRequired = true)]
         private byte[] ContentBytes
         {
-            get => GetContentBytes();
-            set => Content = new MemoryStream(value, 0, value.Length, false, true);
+            get => Content.ToArray();
+            set => Content = value.AsMemory();
         }
 
         [ProtoIgnore, JsonIgnore]
-        public Stream? Content { get; set; }
+        public ReadOnlyMemory<byte> Content { get; set; }
 
         [ProtoMember(4, IsRequired = true)]
         public OriginatorInfo Originator { get; set; }
@@ -45,22 +44,19 @@ namespace Abc.Zebus.Transport
         [JsonIgnore]
         public PeerId SenderId => Originator.SenderId;
 
-        public TransportMessage(MessageTypeId messageTypeId, Stream content, Peer sender)
+        public TransportMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> content, Peer sender)
             : this(messageTypeId, content, sender.Id, sender.EndPoint)
         {
         }
 
-        public TransportMessage(MessageTypeId messageTypeId, Stream content, PeerId senderId, string senderEndPoint)
+        public TransportMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> content, PeerId senderId, string senderEndPoint)
             : this(messageTypeId, content, CreateOriginator(senderId, senderEndPoint))
         {
         }
 
-        public TransportMessage(MessageTypeId messageTypeId, Stream content, OriginatorInfo originator)
+        public TransportMessage(MessageTypeId messageTypeId, ReadOnlyMemory<byte> content, OriginatorInfo originator)
+            : this(MessageId.NextId(), messageTypeId, content, originator)
         {
-            Id = MessageId.NextId();
-            MessageTypeId = messageTypeId;
-            Content = content;
-            Originator = originator;
         }
 
         [UsedImplicitly]
@@ -69,6 +65,22 @@ namespace Abc.Zebus.Transport
             Originator = default!;
         }
 
+        public TransportMessage(MessageId id, MessageTypeId messageTypeId, ReadOnlyMemory<byte> content, OriginatorInfo originator)
+            : this(id, messageTypeId, content, originator, null, null, null)
+        {
+        }
+
+        internal TransportMessage(MessageId id, MessageTypeId messageTypeId, ReadOnlyMemory<byte> content, OriginatorInfo originator, string? environment, bool? wasPersisted, List<PeerId>? persistentPeerIds)
+        {
+            Id = id;
+            MessageTypeId = messageTypeId;
+            Content = content;
+            Originator = originator;
+            Environment = environment;
+            WasPersisted = wasPersisted;
+            PersistentPeerIds = persistentPeerIds;
+        }
+
         private static OriginatorInfo CreateOriginator(PeerId peerId, string peerEndPoint)
         {
             return new OriginatorInfo(peerId, peerEndPoint, MessageContext.CurrentMachineName, MessageContext.GetInitiatorUserName());
@@ -76,31 +88,11 @@ namespace Abc.Zebus.Transport
 
         public byte[] GetContentBytes()
         {
-            if (Content == null)
-                return Array.Empty<byte>();
-
-            var position = Content.Position;
-            var buffer = new byte[Content.Length];
-            Content.Position = 0;
-            Content.Read(buffer, 0, buffer.Length);
-            Content.Position = position;
-
-            return buffer;
+            return Content.ToArray();
         }
 
-        internal TransportMessage ToPersistTransportMessage(List<PeerId> peerIds) => CloneWithPeerIds(peerIds);
-        internal TransportMessage UnpackPersistTransportMessage() => CloneWithPeerIds(null);
-
-        private TransportMessage CloneWithPeerIds(List<PeerId>? peerIds) => new TransportMessage
-        {
-            Id = Id,
-            MessageTypeId = MessageTypeId,
-            Content = Content,
-            Originator = Originator,
-            Environment = Environment,
-            WasPersisted = WasPersisted,
-            PersistentPeerIds = peerIds,
-        };
+        internal TransportMessage ToPersistTransportMessage(List<PeerId> peerIds) => new TransportMessage(Id, MessageTypeId, Content, Originator, Environment, WasPersisted, peerIds);
+        internal TransportMessage UnpackPersistTransportMessage() => new TransportMessage(Id, MessageTypeId, Content, Originator, Environment, WasPersisted, null);
 
         public static byte[] Serialize(TransportMessage transportMessage)
         {
@@ -115,5 +107,7 @@ namespace Abc.Zebus.Transport
             var reader = new ProtoBufferReader(bytes, bytes.Length);
             return reader.ReadTransportMessage();
         }
+
+        internal static TransportMessage Empty() => new TransportMessage { Originator = new OriginatorInfo() };
     }
 }

+ 10 - 9
src/Abc.Zebus/Transport/TransportMessageReader.cs

@@ -10,16 +10,15 @@ namespace Abc.Zebus.Transport
     {
         internal static TransportMessage ReadTransportMessage(this ProtoBufferReader reader)
         {
-            return reader.TryReadTransportMessage(out var transportMessage) ? transportMessage : new TransportMessage();
+            if (reader.TryReadTransportMessage(out var transportMessage))
+                return transportMessage;
+
+            throw new InvalidOperationException("Unable to read transport message from buffer");
         }
 
         internal static bool TryReadTransportMessage(this ProtoBufferReader reader, out TransportMessage transportMessage)
         {
-            transportMessage = new TransportMessage
-            {
-                Content = Stream.Null,
-                Originator = new OriginatorInfo(),
-            };
+            transportMessage = TransportMessage.Empty();
 
             while (reader.CanRead(1))
             {
@@ -48,7 +47,9 @@ namespace Abc.Zebus.Transport
                     transportMessage.MessageTypeId = new MessageTypeId(messageTypeId);
                     break;
                 case 3:
-                    if (!reader.TryReadStream(out var content))
+                    // ReadOnlyMemory<byte> could be used to get a zero-copy reference to the underlying buffer, but
+                    // TransportMessage is assumed to own its underlying buffer.
+                    if (!reader.TryReadByteArray(out var content))
                         return false;
                     transportMessage.Content = content;
                     break;
@@ -123,7 +124,7 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TryReadStream(this ProtoBufferReader reader, out Stream? value)
+        private static bool TryReadByteArray(this ProtoBufferReader reader, out byte[]? value)
         {
             if (!reader.TryReadLength(out var length) || !reader.TryReadRawBytes(length, out var bytes))
             {
@@ -131,7 +132,7 @@ namespace Abc.Zebus.Transport
                 return false;
             }
 
-            value = new MemoryStream(bytes);
+            value = bytes;
             return true;
         }
 

+ 2 - 2
src/Abc.Zebus/Transport/TransportMessageWriter.cs

@@ -16,10 +16,10 @@ namespace Abc.Zebus.Transport
             writer.WriteRawTag(2 << 3 | 2);
             Write(writer, transportMessage.MessageTypeId);
 
-            var transportMessageContent = transportMessage.Content ?? _emptyStream;
+            var transportMessageContent = transportMessage.Content;
             writer.WriteRawTag(3 << 3 | 2);
             writer.WriteLength((int)transportMessageContent.Length);
-            writer.WriteRawStream(transportMessageContent);
+            writer.WriteRawBytes(transportMessageContent);
 
             writer.WriteRawTag(4 << 3 | 2);
             Write(writer, transportMessage.Originator);

+ 2 - 2
src/Abc.Zebus/Transport/ZmqTransport.cs

@@ -328,7 +328,7 @@ namespace Abc.Zebus.Transport
         {
             _logger.LogInformation("Sending EndOfStreamAck to {0}", transportMessage.Originator.SenderEndPoint);
 
-            var endOfStreamAck = new TransportMessage(MessageTypeId.EndOfStreamAck, new MemoryStream(), PeerId, InboundEndPoint);
+            var endOfStreamAck = new TransportMessage(MessageTypeId.EndOfStreamAck, default, PeerId, InboundEndPoint);
             var closingPeer = new Peer(transportMessage.Originator.SenderId, transportMessage.Originator.SenderEndPoint);
 
             SafeAdd(_outboundSocketActions!, OutboundSocketAction.Send(endOfStreamAck, new[] { closingPeer }, new SendContext()));
@@ -453,7 +453,7 @@ namespace Abc.Zebus.Transport
             {
                 _logger.LogInformation($"Sending EndOfStream to {outboundSocket.EndPoint}");
 
-                var endOfStreamMessage = new TransportMessage(MessageTypeId.EndOfStream, new MemoryStream(), PeerId, InboundEndPoint) { WasPersisted = false };
+                var endOfStreamMessage = new TransportMessage(MessageTypeId.EndOfStream, default, PeerId, InboundEndPoint) { WasPersisted = false };
                 bufferWriter.Reset();
                 bufferWriter.WriteTransportMessage(endOfStreamMessage, _environment);
                 outboundSocket.Send(bufferWriter.Buffer, bufferWriter.Position, endOfStreamMessage);