Bläddra i källkod

Adjust variable name to new type names

Olivier Coanet 4 år sedan
förälder
incheckning
bfc9d06612

+ 2 - 2
src/Abc.Zebus.Persistence.RocksDb.Tests/RocksDbStorageTests.cs

@@ -263,8 +263,8 @@ namespace Abc.Zebus.Persistence.RocksDb.Tests
         {
             public static TransportMessage Deserialize(byte[] bytes)
             {
-                var inputStream = new ProtoBufferReader(bytes, bytes.Length);
-                var readTransportMessage = inputStream.ReadTransportMessage();
+                var bufferReader = new ProtoBufferReader(bytes, bytes.Length);
+                var readTransportMessage = bufferReader.ReadTransportMessage();
                 return readTransportMessage;
             }
         }

+ 2 - 2
src/Abc.Zebus.Persistence/Storage/TransportMessageDeserializer.cs

@@ -7,8 +7,8 @@ namespace Abc.Zebus.Persistence.Storage
     {
         public static TransportMessage Deserialize(byte[] bytes)
         {
-            var inputStream = new ProtoBufferReader(bytes, bytes.Length);
-            return inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(bytes, bytes.Length);
+            return bufferReader.ReadTransportMessage();
         }
     }
 }

+ 8 - 8
src/Abc.Zebus.Persistence/Storage/TransportMessageSerializer.cs

@@ -10,25 +10,25 @@ namespace Abc.Zebus.Persistence.Storage
     internal class TransportMessageSerializer
     {
         private readonly int _maximumCapacity;
-        private ProtoBufferWriter _outputStream;
+        private ProtoBufferWriter _bufferWriter;
 
         public TransportMessageSerializer(int maximumCapacity = 50 * 1024)
         {
             _maximumCapacity = maximumCapacity;
-            _outputStream = new ProtoBufferWriter();
+            _bufferWriter = new ProtoBufferWriter();
         }
 
         public byte[] Serialize(TransportMessage transportMessage)
         {
-            _outputStream.Reset();
-            _outputStream.WriteTransportMessage(transportMessage);
+            _bufferWriter.Reset();
+            _bufferWriter.WriteTransportMessage(transportMessage);
 
-            var bytes = new byte[_outputStream.Position];
-            Buffer.BlockCopy(_outputStream.Buffer, 0, bytes, 0, _outputStream.Position);
+            var bytes = new byte[_bufferWriter.Position];
+            Buffer.BlockCopy(_bufferWriter.Buffer, 0, bytes, 0, _bufferWriter.Position);
 
             // prevent service from leaking after fat transport message serializations
-            if (_outputStream.Position > _maximumCapacity)
-                _outputStream = new ProtoBufferWriter(new byte[_maximumCapacity]);
+            if (_bufferWriter.Position > _maximumCapacity)
+                _bufferWriter = new ProtoBufferWriter(new byte[_maximumCapacity]);
 
             return bytes;
         }

+ 7 - 7
src/Abc.Zebus.Tests/Transport/BackwardCompatibilityTests.cs

@@ -21,9 +21,9 @@ namespace Abc.Zebus.Tests.Transport
             var expectedMessage = new TransportMessage(new MessageTypeId("lol"), content, originatorInfo) { Id = messageId };
 
             var stream = GetTransportMessageStream_1_4_1();
-            var codedInputStream = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
+            var bufferReader = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
 
-            var message = codedInputStream.ReadTransportMessage();
+            var message = bufferReader.ReadTransportMessage();
             message.ShouldHaveSamePropertiesAs(expectedMessage);
         }
 
@@ -36,9 +36,9 @@ namespace Abc.Zebus.Tests.Transport
             var expectedMessage = new TransportMessage(new MessageTypeId("lol"), content, originatorInfo) { Id = messageId, WasPersisted = false };
 
             var stream = GetTransportMessageStream_1_4_1();
-            var codedInputStream = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
+            var bufferReader = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
 
-            var message = codedInputStream.ReadTransportMessage();
+            var message = bufferReader.ReadTransportMessage();
             message.ShouldHaveSamePropertiesAs(expectedMessage, "WasPersisted");
             message.WasPersisted.ShouldBeNull();
         }
@@ -61,10 +61,10 @@ namespace Abc.Zebus.Tests.Transport
                 Originator = new OriginatorInfo(),
                 Content = new MemoryStream(),
             };
-            var newOutput = new ProtoBufferWriter();
-            newOutput.WriteTransportMessage(newTransportMessage);
+            var bufferWriter = new ProtoBufferWriter();
+            bufferWriter.WriteTransportMessage(newTransportMessage);
 
-            newOutput.ToArray().SequenceEqual(oldOutput.ToArray()).ShouldBeTrue();
+            bufferWriter.ToArray().SequenceEqual(oldOutput.ToArray()).ShouldBeTrue();
         }
 
         private MemoryStream GetTransportMessageStream_1_4_1()

+ 23 - 23
src/Abc.Zebus.Tests/Transport/TransportMessageReaderTests.cs

@@ -20,11 +20,11 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
 
-            var outputStream = new ProtoBufferWriter();
-            outputStream.WriteTransportMessage(transportMessage);
+            var bufferWriter = new ProtoBufferWriter();
+            bufferWriter.WriteTransportMessage(transportMessage);
 
-            var inputStream = new ProtoBufferReader(outputStream.Buffer, outputStream.Position);
-            var deserialized = inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(bufferWriter.Buffer, bufferWriter.Position);
+            var deserialized = bufferReader.ReadTransportMessage();
 
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
@@ -39,11 +39,11 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = new EmptyCommand().ToTransportMessage();
 
-            var outputStream = new ProtoBufferWriter();
-            outputStream.WriteTransportMessage(transportMessage);
+            var bufferWriter = new ProtoBufferWriter();
+            bufferWriter.WriteTransportMessage(transportMessage);
 
-            var inputStream = new ProtoBufferReader(outputStream.Buffer, outputStream.Position);
-            var deserialized = inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(bufferWriter.Buffer, bufferWriter.Position);
+            var deserialized = bufferReader.ReadTransportMessage();
 
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
@@ -65,11 +65,11 @@ namespace Abc.Zebus.Tests.Transport
         {
             var buffer = Encoding.ASCII.GetBytes(content);
 
-            var inputStream = new ProtoBufferReader(buffer, buffer.Length);
+            var bufferReader = new ProtoBufferReader(buffer, buffer.Length);
             TransportMessage transportMessage = null;
             bool? result = null;
 
-            Assert.DoesNotThrow(() => result = inputStream.TryReadTransportMessage(out transportMessage));
+            Assert.DoesNotThrow(() => result = bufferReader.TryReadTransportMessage(out transportMessage));
 
             result.ShouldNotBeNull();
             result.ShouldEqual(false);
@@ -88,12 +88,12 @@ namespace Abc.Zebus.Tests.Transport
                 new PeerId("Abc.Testing.B"),
             };
 
-            var outputStream = new ProtoBufferWriter();
-            outputStream.WriteTransportMessage(transportMessage);
-            outputStream.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
+            var bufferWriter = new ProtoBufferWriter();
+            bufferWriter.WriteTransportMessage(transportMessage);
+            bufferWriter.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
 
-            var inputStream = new ProtoBufferReader(outputStream.Buffer, outputStream.Position);
-            var deserialized = inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(bufferWriter.Buffer, bufferWriter.Position);
+            var deserialized = bufferReader.ReadTransportMessage();
 
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
@@ -108,8 +108,8 @@ namespace Abc.Zebus.Tests.Transport
             var stream = new MemoryStream();
             Serializer.Serialize(stream, transportMessage);
 
-            var inputStream = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
-            var deserialized = inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(stream.GetBuffer(), (int)stream.Length);
+            var deserialized = bufferReader.ReadTransportMessage();
 
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
@@ -124,19 +124,19 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
 
-            var outputStream = new ProtoBufferWriter();
-            outputStream.WriteTransportMessage(transportMessage);
+            var bufferWriter = new ProtoBufferWriter();
+            bufferWriter.WriteTransportMessage(transportMessage);
 
-            var inputStream = new ProtoBufferReader(outputStream.Buffer, outputStream.Position);
-            inputStream.ReadTransportMessage();
+            var bufferReader = new ProtoBufferReader(bufferWriter.Buffer, bufferWriter.Position);
+            bufferReader.ReadTransportMessage();
 
             const int count = 100_000_000;
             using (Measure.Throughput(count))
             {
                 for (var i = 0; i < count; i++)
                 {
-                    inputStream.Reset();
-                    inputStream.ReadTransportMessage();
+                    bufferReader.Reset();
+                    bufferReader.ReadTransportMessage();
                 }
             }
         }

+ 28 - 28
src/Abc.Zebus.Tests/Transport/TransportMessageWriterTests.cs

@@ -20,10 +20,10 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
 
-            var deserialized = Serializer.Deserialize<TransportMessage>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserialized = Serializer.Deserialize<TransportMessage>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
             deserialized.GetContentBytes().ShouldEqual(transportMessage.GetContentBytes());
@@ -38,10 +38,10 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
 
-            var deserialized = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserialized = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
             deserialized.Content.ShouldEqual(transportMessage.GetContentBytes());
@@ -63,11 +63,11 @@ namespace Abc.Zebus.Tests.Transport
                 new PeerId("Abc.Testing.B"),
             };
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
-            stream.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
+            writer.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
 
-            var deserialized = Serializer.Deserialize<TransportMessage>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserialized = Serializer.Deserialize<TransportMessage>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserialized.Id.ShouldEqual(transportMessage.Id);
             deserialized.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
             deserialized.PersistentPeerIds.ShouldBeEquivalentTo(transportMessage.PersistentPeerIds);
@@ -80,10 +80,10 @@ namespace Abc.Zebus.Tests.Transport
             transportMessage.Environment = null;
             transportMessage.Originator = new OriginatorInfo(new PeerId(null), null, null, null);
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
 
-            var deserializedTransportMessage1 = Serializer.Deserialize<TransportMessage>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserializedTransportMessage1 = Serializer.Deserialize<TransportMessage>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserializedTransportMessage1.Id.ShouldEqual(transportMessage.Id);
             deserializedTransportMessage1.MessageTypeId.ShouldEqual(transportMessage.MessageTypeId);
             deserializedTransportMessage1.GetContentBytes().ShouldEqual(transportMessage.GetContentBytes());
@@ -101,11 +101,11 @@ namespace Abc.Zebus.Tests.Transport
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
             transportMessage.WasPersisted = previousWasPersistedValue;
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
-            stream.SetWasPersisted(newWasPersistedValue);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
+            writer.SetWasPersisted(newWasPersistedValue);
 
-            var deserializedTransportMessage1 = Serializer.Deserialize<TransportMessage>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserializedTransportMessage1 = Serializer.Deserialize<TransportMessage>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserializedTransportMessage1.Id.ShouldEqual(transportMessage.Id);
             deserializedTransportMessage1.Environment.ShouldEqual(transportMessage.Environment);
             deserializedTransportMessage1.WasPersisted.ShouldEqual(newWasPersistedValue);
@@ -116,17 +116,17 @@ namespace Abc.Zebus.Tests.Transport
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
 
-            var stream = new ProtoBufferWriter();
-            stream.WriteTransportMessage(transportMessage);
+            var writer = new ProtoBufferWriter();
+            writer.WriteTransportMessage(transportMessage);
 
-            var deserialized1 = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserialized1 = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserialized1.WasPersisted.ShouldEqual(true);
 
-            stream.Reset();
+            writer.Reset();
             transportMessage.WasPersisted = false;
-            stream.WriteTransportMessage(transportMessage);
+            writer.WriteTransportMessage(transportMessage);
 
-            var deserialized2 = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(stream.Buffer, 0, stream.Position));
+            var deserialized2 = Serializer.Deserialize<TransportMessage_1_5_0>(new MemoryStream(writer.Buffer, 0, writer.Position));
             deserialized2.WasPersisted.ShouldEqual(false);
         }
 
@@ -134,19 +134,19 @@ namespace Abc.Zebus.Tests.Transport
         public void MeasureWritePerformance()
         {
             var transportMessage = TestDataBuilder.CreateTransportMessage<FakeCommand>();
-            var stream = new ProtoBufferWriter();
+            var writer = new ProtoBufferWriter();
 
-            stream.WriteTransportMessage(transportMessage);
+            writer.WriteTransportMessage(transportMessage);
 
             const int count = 10 * 1000 * 1000;
             using (Measure.Throughput(count))
             {
                 for (var i = 0; i < count; i++)
                 {
-                    stream.Reset();
-                    stream.WriteTransportMessage(transportMessage);
+                    writer.Reset();
+                    writer.WriteTransportMessage(transportMessage);
                 }
             }
         }
     }
-}
+}

+ 42 - 42
src/Abc.Zebus/Transport/TransportMessageReader.cs

@@ -8,12 +8,12 @@ namespace Abc.Zebus.Transport
 {
     internal static class TransportMessageReader
     {
-        internal static TransportMessage ReadTransportMessage(this ProtoBufferReader input)
+        internal static TransportMessage ReadTransportMessage(this ProtoBufferReader reader)
         {
-            return input.TryReadTransportMessage(out var transportMessage) ? transportMessage : new TransportMessage();
+            return reader.TryReadTransportMessage(out var transportMessage) ? transportMessage : new TransportMessage();
         }
 
-        internal static bool TryReadTransportMessage(this ProtoBufferReader input, out TransportMessage transportMessage)
+        internal static bool TryReadTransportMessage(this ProtoBufferReader reader, out TransportMessage transportMessage)
         {
             transportMessage = new TransportMessage
             {
@@ -21,60 +21,60 @@ namespace Abc.Zebus.Transport
                 Originator = new OriginatorInfo(),
             };
 
-            while (input.CanRead(1))
+            while (reader.CanRead(1))
             {
-                if (!input.TryReadMember(transportMessage))
+                if (!reader.TryReadMember(transportMessage))
                     return false;
             }
 
             return true;
         }
 
-        private static bool TryReadMember(this ProtoBufferReader input, TransportMessage transportMessage)
+        private static bool TryReadMember(this ProtoBufferReader reader, TransportMessage transportMessage)
         {
-            if (!input.TryReadTag(out var number, out var wireType))
+            if (!reader.TryReadTag(out var number, out var wireType))
                 return false;
 
             switch (number)
             {
                 case 1:
-                    if (!input.TryReadSingleGuid(out var id))
+                    if (!reader.TryReadSingleGuid(out var id))
                         return false;
                     transportMessage.Id = new MessageId(id);
                     break;
                 case 2:
-                    if (!input.TryReadSingleString(out var messageTypeId))
+                    if (!reader.TryReadSingleString(out var messageTypeId))
                         return false;
                     transportMessage.MessageTypeId = new MessageTypeId(messageTypeId);
                     break;
                 case 3:
-                    if (!input.TryReadStream(out var content))
+                    if (!reader.TryReadStream(out var content))
                         return false;
                     transportMessage.Content = content;
                     break;
                 case 4:
-                    if (!input.TryReadOriginatorInfo(out var originator) || originator == null)
+                    if (!reader.TryReadOriginatorInfo(out var originator) || originator == null)
                         return false;
                     transportMessage.Originator = originator;
                     break;
                 case 5:
-                    if (!input.TryReadString(out var environment))
+                    if (!reader.TryReadString(out var environment))
                         return false;
                     transportMessage.Environment = environment;
                     break;
                 case 6:
-                    if (!input.TryReadBool(out var wasPersisted))
+                    if (!reader.TryReadBool(out var wasPersisted))
                         return false;
                     transportMessage.WasPersisted = wasPersisted;
                     break;
                 case 7:
-                    if (!input.TryReadSingleString(out var persistentPeerId))
+                    if (!reader.TryReadSingleString(out var persistentPeerId))
                         return false;
                     transportMessage.PersistentPeerIds ??= new List<PeerId>();
                     transportMessage.PersistentPeerIds.Add(new PeerId(persistentPeerId));
                     break;
                 default:
-                    if (!input.TrySkipUnknown(wireType))
+                    if (!reader.TrySkipUnknown(wireType))
                         return false;
                     break;
             }
@@ -82,38 +82,38 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TryReadOriginatorInfo(this ProtoBufferReader input, out OriginatorInfo? originatorInfo)
+        private static bool TryReadOriginatorInfo(this ProtoBufferReader reader, out OriginatorInfo? originatorInfo)
         {
             originatorInfo = default;
 
-            if (!input.TryReadLength(out var length))
+            if (!reader.TryReadLength(out var length))
                 return false;
 
-            var endPosition = input.Position + length;
+            var endPosition = reader.Position + length;
 
             var senderId = new PeerId();
             string? senderEndPoint = null;
             string? initiatorUserName = null;
 
-            while (input.Position < endPosition && input.TryReadTag(out var number, out var wireType))
+            while (reader.Position < endPosition && reader.TryReadTag(out var number, out var wireType))
             {
                 switch (number)
                 {
                     case 1:
-                        if (!input.TryReadSingleString(out var peerId))
+                        if (!reader.TryReadSingleString(out var peerId))
                             return false;
                         senderId = new PeerId(peerId);
                         break;
                     case 2:
-                        if (!input.TryReadString(out senderEndPoint))
+                        if (!reader.TryReadString(out senderEndPoint))
                             return false;
                         break;
                     case 5:
-                        if (!input.TryReadString(out initiatorUserName))
+                        if (!reader.TryReadString(out initiatorUserName))
                             return false;
                         break;
                     default:
-                        if (!input.TrySkipUnknown(wireType))
+                        if (!reader.TrySkipUnknown(wireType))
                             return false;
                         break;
                 }
@@ -123,9 +123,9 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TryReadStream(this ProtoBufferReader input, out Stream? value)
+        private static bool TryReadStream(this ProtoBufferReader reader, out Stream? value)
         {
-            if (!input.TryReadLength(out var length) || !input.TryReadRawBytes(length, out var bytes))
+            if (!reader.TryReadLength(out var length) || !reader.TryReadRawBytes(length, out var bytes))
             {
                 value = default;
                 return false;
@@ -135,24 +135,24 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TryReadSingleString(this ProtoBufferReader input, out string? value)
+        private static bool TryReadSingleString(this ProtoBufferReader reader, out string? value)
         {
             value = default;
 
-            if (!input.TryReadLength(out var length))
+            if (!reader.TryReadLength(out var length))
                 return false;
 
-            var endPosition = input.Position + length;
-            while (input.Position < endPosition && input.TryReadTag(out var number, out var wireType))
+            var endPosition = reader.Position + length;
+            while (reader.Position < endPosition && reader.TryReadTag(out var number, out var wireType))
             {
                 switch (number)
                 {
                     case 1:
-                        if (!input.TryReadString(out value))
+                        if (!reader.TryReadString(out value))
                             return false;
                         break;
                     default:
-                        if (!input.TrySkipUnknown(wireType))
+                        if (!reader.TrySkipUnknown(wireType))
                             return false;
                         break;
                 }
@@ -161,24 +161,24 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TryReadSingleGuid(this ProtoBufferReader input, out Guid value)
+        private static bool TryReadSingleGuid(this ProtoBufferReader reader, out Guid value)
         {
             value = default;
 
-            if (!input.TryReadLength(out var length))
+            if (!reader.TryReadLength(out var length))
                 return false;
 
-            var endPosition = input.Position + length;
-            while (input.Position < endPosition && input.TryReadTag(out var number, out var wireType))
+            var endPosition = reader.Position + length;
+            while (reader.Position < endPosition && reader.TryReadTag(out var number, out var wireType))
             {
                 switch (number)
                 {
                     case 1:
-                        if (!input.TryReadGuid(out value))
+                        if (!reader.TryReadGuid(out value))
                             return false;
                         break;
                     default:
-                        if (!input.TrySkipUnknown(wireType))
+                        if (!reader.TrySkipUnknown(wireType))
                             return false;
                         break;
                 }
@@ -187,7 +187,7 @@ namespace Abc.Zebus.Transport
             return true;
         }
 
-        private static bool TrySkipUnknown(this ProtoBufferReader input, WireType wireType)
+        private static bool TrySkipUnknown(this ProtoBufferReader reader, WireType wireType)
         {
             switch (wireType)
             {
@@ -195,13 +195,13 @@ namespace Abc.Zebus.Transport
                     return false;
 
                 case WireType.Variant:
-                    return input.TryReadRawVariant(out _);
+                    return reader.TryReadRawVariant(out _);
 
                 case WireType.Fixed64:
-                    return input.TryReadFixed64(out _);
+                    return reader.TryReadFixed64(out _);
 
                 case WireType.String:
-                    return input.TrySkipString();
+                    return reader.TrySkipString();
 
                 case WireType.StartGroup:
                     return false;
@@ -210,7 +210,7 @@ namespace Abc.Zebus.Transport
                     return false;
 
                 case WireType.Fixed32:
-                    return input.TryReadFixed32(out _);
+                    return reader.TryReadFixed32(out _);
 
                 default:
                     return false;

+ 46 - 46
src/Abc.Zebus/Transport/TransportMessageWriter.cs

@@ -8,43 +8,43 @@ namespace Abc.Zebus.Transport
     {
         private static readonly MemoryStream _emptyStream = new MemoryStream(new byte[0]);
 
-        internal static void WriteTransportMessage(this ProtoBufferWriter output, TransportMessage transportMessage, string? environmentOverride = null)
+        internal static void WriteTransportMessage(this ProtoBufferWriter writer, TransportMessage transportMessage, string? environmentOverride = null)
         {
-            output.WriteRawTag(1 << 3 | 2);
-            Write(output, transportMessage.Id);
+            writer.WriteRawTag(1 << 3 | 2);
+            Write(writer, transportMessage.Id);
 
-            output.WriteRawTag(2 << 3 | 2);
-            Write(output, transportMessage.MessageTypeId);
+            writer.WriteRawTag(2 << 3 | 2);
+            Write(writer, transportMessage.MessageTypeId);
 
             var transportMessageContent = transportMessage.Content ?? _emptyStream;
-            output.WriteRawTag(3 << 3 | 2);
-            output.WriteLength((int)transportMessageContent.Length);
-            output.WriteRawStream(transportMessageContent);
+            writer.WriteRawTag(3 << 3 | 2);
+            writer.WriteLength((int)transportMessageContent.Length);
+            writer.WriteRawStream(transportMessageContent);
 
-            output.WriteRawTag(4 << 3 | 2);
-            Write(output, transportMessage.Originator);
+            writer.WriteRawTag(4 << 3 | 2);
+            Write(writer, transportMessage.Originator);
 
             var environment = environmentOverride ?? transportMessage.Environment;
             if (environment != null)
             {
-                output.WriteRawTag(5 << 3 | 2);
+                writer.WriteRawTag(5 << 3 | 2);
                 var environmentLength = GetUtf8ByteCount(environment);
-                output.WriteString(environment, environmentLength);
+                writer.WriteString(environment, environmentLength);
             }
 
             if (transportMessage.WasPersisted != null)
-                WriteWasPersisted(output, transportMessage.WasPersisted.Value);
+                WriteWasPersisted(writer, transportMessage.WasPersisted.Value);
         }
 
-        internal static void SetWasPersisted(this ProtoBufferWriter output, bool wasPersisted)
+        internal static void SetWasPersisted(this ProtoBufferWriter writer, bool wasPersisted)
         {
-            if (output.TryWriteBoolAtSavedPosition(wasPersisted))
+            if (writer.TryWriteBoolAtSavedPosition(wasPersisted))
                 return;
 
-            WriteWasPersisted(output, wasPersisted);
+            WriteWasPersisted(writer, wasPersisted);
         }
 
-        internal static void WritePersistentPeerIds(this ProtoBufferWriter output, TransportMessage transportMessage, List<PeerId>? persistentPeerIdOverride)
+        internal static void WritePersistentPeerIds(this ProtoBufferWriter writer, TransportMessage transportMessage, List<PeerId>? persistentPeerIdOverride)
         {
             var peerIds = persistentPeerIdOverride ?? transportMessage.PersistentPeerIds;
             if (peerIds == null)
@@ -56,43 +56,43 @@ namespace Abc.Zebus.Transport
                 if (string.IsNullOrEmpty(peerIdString))
                     continue;
 
-                output.WriteRawTag(7 << 3 | 2);
+                writer.WriteRawTag(7 << 3 | 2);
 
                 var peerIdStringLength = GetUtf8ByteCount(peerIdString);
                 var peerIdLength = 1 + ProtoBufferWriter.ComputeStringSize(peerIdStringLength);
 
-                output.WriteLength(peerIdLength);
-                output.WriteRawTag(1 << 3 | 2);
-                output.WriteString(peerIdString, peerIdStringLength);
+                writer.WriteLength(peerIdLength);
+                writer.WriteRawTag(1 << 3 | 2);
+                writer.WriteString(peerIdString, peerIdStringLength);
             }
         }
 
-        private static void Write(ProtoBufferWriter output, MessageId messageId)
+        private static void Write(ProtoBufferWriter writer, MessageId messageId)
         {
             var size = 1 + GetMessageSizeWithLength(ProtoBufferWriter.GuidSize);
-            output.WriteLength(size);
-            output.WriteRawTag(1 << 3 | 2);
+            writer.WriteLength(size);
+            writer.WriteRawTag(1 << 3 | 2);
 
-            output.WriteGuid(messageId.Value);
+            writer.WriteGuid(messageId.Value);
         }
 
-        private static void Write(ProtoBufferWriter output, MessageTypeId messageTypeId)
+        private static void Write(ProtoBufferWriter writer, MessageTypeId messageTypeId)
         {
             if (messageTypeId.FullName == null)
             {
-                output.WriteLength(0);
+                writer.WriteLength(0);
             }
             else
             {
                 var fullNameLength = GetUtf8ByteCount(messageTypeId.FullName);
                 var size = 1 + ProtoBufferWriter.ComputeStringSize(fullNameLength);
-                output.WriteLength(size);
-                output.WriteRawTag(1 << 3 | 2);
-                output.WriteString(messageTypeId.FullName, fullNameLength);
+                writer.WriteLength(size);
+                writer.WriteRawTag(1 << 3 | 2);
+                writer.WriteString(messageTypeId.FullName, fullNameLength);
             }
         }
 
-        private static void Write(ProtoBufferWriter output, OriginatorInfo originatorInfo)
+        private static void Write(ProtoBufferWriter writer, OriginatorInfo originatorInfo)
         {
             var size = 0;
 
@@ -149,31 +149,31 @@ namespace Abc.Zebus.Transport
                 size += 1 + ProtoBufferWriter.ComputeStringSize(initiatorUserNameLength);
             }
 
-            output.WriteLength(size);
+            writer.WriteLength(size);
 
-            output.WriteRawTag(1 << 3 | 2);
-            output.WriteLength(senderIdLength);
+            writer.WriteRawTag(1 << 3 | 2);
+            writer.WriteLength(senderIdLength);
 
             if (!string.IsNullOrEmpty(senderIdString))
             {
-                output.WriteRawTag(1 << 3 | 2);
-                output.WriteString(senderIdString, senderIdStringLength);
+                writer.WriteRawTag(1 << 3 | 2);
+                writer.WriteString(senderIdString, senderIdStringLength);
             }
 
             if (originatorInfo.SenderEndPoint != null)
             {
-                output.WriteRawTag(2 << 3 | 2);
-                output.WriteString(originatorInfo.SenderEndPoint, senderEndPointLength);
+                writer.WriteRawTag(2 << 3 | 2);
+                writer.WriteString(originatorInfo.SenderEndPoint, senderEndPointLength);
             }
             if (originatorInfo.SenderMachineName != null)
             {
-                output.WriteRawTag(3 << 3 | 2);
-                output.WriteString(originatorInfo.SenderMachineName, senderMachineNameLength);
+                writer.WriteRawTag(3 << 3 | 2);
+                writer.WriteString(originatorInfo.SenderMachineName, senderMachineNameLength);
             }
             if (originatorInfo.InitiatorUserName != null)
             {
-                output.WriteRawTag(5 << 3 | 2);
-                output.WriteString(originatorInfo.InitiatorUserName, initiatorUserNameLength);
+                writer.WriteRawTag(5 << 3 | 2);
+                writer.WriteString(originatorInfo.InitiatorUserName, initiatorUserNameLength);
             }
         }
 
@@ -195,11 +195,11 @@ namespace Abc.Zebus.Transport
             return s.Length;
         }
 
-        private static void WriteWasPersisted(ProtoBufferWriter output, bool value)
+        private static void WriteWasPersisted(ProtoBufferWriter writer, bool value)
         {
-            output.WriteRawTag(6 << 3 | 0);
-            output.SavePosition();
-            output.WriteBool(value);
+            writer.WriteRawTag(6 << 3 | 0);
+            writer.SavePosition();
+            writer.WriteBool(value);
         }
     }
 }

+ 1 - 0
src/Abc.Zebus/Transport/ZmqInboundSocket.cs

@@ -50,6 +50,7 @@ namespace Abc.Zebus.Transport
             _socket?.Dispose();
         }
 
+        // TODO: return Span instead of ProtoBufferReader
         public ProtoBufferReader? Receive(TimeSpan? timeout = null)
         {
             var receiveTimeout = timeout ?? _options.ReceiveTimeout;

+ 30 - 30
src/Abc.Zebus/Transport/ZmqTransport.cs

@@ -192,11 +192,11 @@ namespace Abc.Zebus.Transport
             {
                 while (_isListening)
                 {
-                    var inputStream = inboundSocket.Receive();
-                    if (inputStream == null)
+                    var bufferReader = inboundSocket.Receive();
+                    if (bufferReader == null)
                         continue;
 
-                    DeserializeAndForwardTransportMessage(inputStream);
+                    DeserializeAndForwardTransportMessage(bufferReader);
                 }
 
                 GracefullyDisconnectInboundSocket(inboundSocket);
@@ -231,16 +231,16 @@ namespace Abc.Zebus.Transport
         {
             inboundSocket.Disconnect();
 
-            ProtoBufferReader? inputStream;
-            while ((inputStream = inboundSocket.Receive(100.Milliseconds())) != null)
-                DeserializeAndForwardTransportMessage(inputStream);
+            ProtoBufferReader? bufferReader;
+            while ((bufferReader = inboundSocket.Receive(100.Milliseconds())) != null)
+                DeserializeAndForwardTransportMessage(bufferReader);
         }
 
-        private void DeserializeAndForwardTransportMessage(ProtoBufferReader inputStream)
+        private void DeserializeAndForwardTransportMessage(ProtoBufferReader bufferReader)
         {
             try
             {
-                if (!TryDeserializeTransportMessage(inputStream, out var transportMessage))
+                if (!TryDeserializeTransportMessage(bufferReader, out var transportMessage))
                     return;
 
                 if (!ValidateTransportMessage(transportMessage))
@@ -267,12 +267,12 @@ namespace Abc.Zebus.Transport
             }
         }
 
-        private bool TryDeserializeTransportMessage(ProtoBufferReader reader, out TransportMessage transportMessage)
+        private bool TryDeserializeTransportMessage(ProtoBufferReader bufferReader, out TransportMessage transportMessage)
         {
-            if (reader.TryReadTransportMessage(out transportMessage))
+            if (bufferReader.TryReadTransportMessage(out transportMessage))
                 return true;
 
-            _logger.Debug($"Unable to read transport message, Length: {reader.Length}, Bytes: {reader.ToDebugString(50)}");
+            _logger.Debug($"Unable to read transport message, Length: {bufferReader.Length}, Bytes: {bufferReader.ToDebugString(50)}");
 
             return false;
         }
@@ -346,7 +346,7 @@ namespace Abc.Zebus.Transport
             Thread.CurrentThread.Name = "ZmqTransport.OutboundProc";
             _logger.DebugFormat("Starting outbound proc...");
 
-            var outputStream = new ProtoBufferWriter();
+            var bufferWriter = new ProtoBufferWriter();
 
             foreach (var socketAction in _outboundSocketActions!.GetConsumingEnumerable())
             {
@@ -356,42 +356,42 @@ namespace Abc.Zebus.Transport
                 }
                 else
                 {
-                    WriteTransportMessageAndSendToPeers(socketAction.Message, socketAction.Targets, socketAction.Context, outputStream);
+                    WriteTransportMessageAndSendToPeers(socketAction.Message, socketAction.Targets, socketAction.Context, bufferWriter);
                 }
             }
 
-            GracefullyDisconnectOutboundSockets(outputStream);
+            GracefullyDisconnectOutboundSockets(bufferWriter);
 
             _logger.InfoFormat("OutboundProc terminated");
         }
 
-        private void WriteTransportMessageAndSendToPeers(TransportMessage transportMessage, List<Peer> peers, SendContext context, ProtoBufferWriter outputStream)
+        private void WriteTransportMessageAndSendToPeers(TransportMessage transportMessage, List<Peer> peers, SendContext context, ProtoBufferWriter bufferWriter)
         {
-            outputStream.Reset();
-            outputStream.WriteTransportMessage(transportMessage, _environment);
+            bufferWriter.Reset();
+            bufferWriter.WriteTransportMessage(transportMessage, _environment);
 
             if (context.PersistencePeer == null && transportMessage.IsPersistTransportMessage)
             {
-                outputStream.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
+                bufferWriter.WritePersistentPeerIds(transportMessage, transportMessage.PersistentPeerIds);
             }
 
             foreach (var target in peers)
             {
                 var isPersistent = context.WasPersisted(target.Id);
-                outputStream.SetWasPersisted(isPersistent);
+                bufferWriter.SetWasPersisted(isPersistent);
 
-                SendToPeer(transportMessage, outputStream, target);
+                SendToPeer(transportMessage, bufferWriter, target);
             }
 
             if (context.PersistencePeer != null)
             {
-                outputStream.WritePersistentPeerIds(transportMessage, context.PersistentPeerIds);
+                bufferWriter.WritePersistentPeerIds(transportMessage, context.PersistentPeerIds);
 
-                SendToPeer(transportMessage, outputStream, context.PersistencePeer);
+                SendToPeer(transportMessage, bufferWriter, context.PersistencePeer);
             }
         }
 
-        private void SendToPeer(TransportMessage transportMessage, ProtoBufferWriter outputStream, Peer target)
+        private void SendToPeer(TransportMessage transportMessage, ProtoBufferWriter bufferWriter, Peer target)
         {
             var outboundSocket = GetConnectedOutboundSocket(target, transportMessage);
             if (!outboundSocket.IsConnected)
@@ -402,7 +402,7 @@ namespace Abc.Zebus.Transport
 
             try
             {
-                outboundSocket.Send(outputStream.Buffer, outputStream.Position, transportMessage);
+                outboundSocket.Send(bufferWriter.Buffer, bufferWriter.Position, transportMessage);
             }
             catch (Exception ex)
             {
@@ -438,13 +438,13 @@ namespace Abc.Zebus.Transport
             return outboundSocket;
         }
 
-        private void GracefullyDisconnectOutboundSockets(ProtoBufferWriter outputStream)
+        private void GracefullyDisconnectOutboundSockets(ProtoBufferWriter bufferWriter)
         {
             var connectedOutboundSockets = _outboundSockets.Values.Where(x => x.IsConnected).ToList();
 
             _outboundSocketsToStop = new CountdownEvent(connectedOutboundSockets.Count);
 
-            SendEndOfStreamMessages(connectedOutboundSockets, outputStream);
+            SendEndOfStreamMessages(connectedOutboundSockets, bufferWriter);
 
             _logger.InfoFormat("Waiting for {0} outbound socket end of stream acks", _outboundSocketsToStop.InitialCount);
             if (!_outboundSocketsToStop.Wait(_configuration.WaitForEndOfStreamAckTimeout))
@@ -453,16 +453,16 @@ namespace Abc.Zebus.Transport
             DisconnectPeers(connectedOutboundSockets.Select(x => x.PeerId).ToList());
         }
 
-        private void SendEndOfStreamMessages(List<ZmqOutboundSocket> connectedOutboundSockets, ProtoBufferWriter outputStream)
+        private void SendEndOfStreamMessages(List<ZmqOutboundSocket> connectedOutboundSockets, ProtoBufferWriter bufferWriter)
         {
             foreach (var outboundSocket in connectedOutboundSockets)
             {
                 _logger.InfoFormat("Sending EndOfStream to {0}", outboundSocket.EndPoint);
 
                 var endOfStreamMessage = new TransportMessage(MessageTypeId.EndOfStream, new MemoryStream(), PeerId, InboundEndPoint) { WasPersisted = false };
-                outputStream.Reset();
-                outputStream.WriteTransportMessage(endOfStreamMessage, _environment);
-                outboundSocket.Send(outputStream.Buffer, outputStream.Position, endOfStreamMessage);
+                bufferWriter.Reset();
+                bufferWriter.WriteTransportMessage(endOfStreamMessage, _environment);
+                outboundSocket.Send(bufferWriter.Buffer, bufferWriter.Position, endOfStreamMessage);
             }
         }