|
|
@@ -240,10 +240,10 @@ namespace Abc.Zebus.Transport
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- if (!inputStream.TryReadTransportMessage(out var transportMessage))
|
|
|
+ if (!TryDeserializeTransportMessage(inputStream, out var transportMessage))
|
|
|
return;
|
|
|
|
|
|
- if (!IsValidTransportMessage(transportMessage) || !IsFromCurrentEnvironment(transportMessage))
|
|
|
+ if (!ValidateTransportMessage(transportMessage))
|
|
|
return;
|
|
|
|
|
|
if (transportMessage.MessageTypeId == MessageTypeId.EndOfStream)
|
|
|
@@ -263,17 +263,46 @@ namespace Abc.Zebus.Transport
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- _logger.ErrorFormat("Failed to process inbound transport message: {0}", ex);
|
|
|
+ _logger.ErrorFormat("Failed to process inbound transport message, PeerId: {0}: {0}", ex);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private bool IsValidTransportMessage(TransportMessage transportMessage)
|
|
|
+ private bool TryDeserializeTransportMessage(ProtoBufferReader reader, out TransportMessage transportMessage)
|
|
|
{
|
|
|
- return transportMessage.Id.Value != default
|
|
|
- && transportMessage.MessageTypeId.FullName != default
|
|
|
- && transportMessage.Originator != default;
|
|
|
+ if (reader.TryReadTransportMessage(out transportMessage))
|
|
|
+ return true;
|
|
|
+
|
|
|
+ _logger.Debug($"Unable to read transport message, Length: {reader.Length}, Bytes: {reader.ToDebugString(50)}");
|
|
|
+
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private bool ValidateTransportMessage(TransportMessage transportMessage)
|
|
|
+ {
|
|
|
+ var isValid = transportMessage.Id.Value != default
|
|
|
+ && transportMessage.MessageTypeId.FullName != default
|
|
|
+ && transportMessage.Originator.SenderId != default;
|
|
|
+
|
|
|
+ if (!isValid)
|
|
|
+ {
|
|
|
+ _logger.Debug($"Invalid transport message received, Id: {transportMessage.Id}, MessageTypeId: {transportMessage.MessageTypeId}, Originator: {transportMessage.Originator.SenderId}");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (transportMessage.Environment == null) // TODO: treat as invalid message
|
|
|
+ {
|
|
|
+ _logger.Info($"Receiving message with null environment from {transportMessage.Originator.SenderId}");
|
|
|
+ }
|
|
|
+ else if (transportMessage.Environment != _environment)
|
|
|
+ {
|
|
|
+ _logger.Error($"Receiving messages from wrong environment: {transportMessage.Environment} from {transportMessage.Originator.SenderEndPoint}, discarding message type {transportMessage.MessageTypeId}");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
+
|
|
|
private void OnEndOfStreamAck(TransportMessage transportMessage)
|
|
|
{
|
|
|
var senderId = transportMessage.Originator.SenderId;
|
|
|
@@ -303,11 +332,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
private bool IsFromCurrentEnvironment(TransportMessage transportMessage)
|
|
|
{
|
|
|
- if (transportMessage.Environment == null)
|
|
|
- {
|
|
|
- _logger.DebugFormat("Receiving message with null environment from {0}", transportMessage.Originator.SenderId);
|
|
|
- }
|
|
|
- else if (transportMessage.Environment != _environment)
|
|
|
+ if (transportMessage.Environment != _environment)
|
|
|
{
|
|
|
_logger.ErrorFormat("Receiving messages from wrong environment: {0} from {1}, discarding message type {2}", transportMessage.Environment, transportMessage.Originator.SenderEndPoint, transportMessage.MessageTypeId);
|
|
|
return false;
|