|
@@ -8,7 +8,7 @@ using Abc.Zebus.Directory;
|
|
|
using Abc.Zebus.Serialization.Protobuf;
|
|
|
using Abc.Zebus.Transport.Zmq;
|
|
|
using Abc.Zebus.Util;
|
|
|
-using log4net;
|
|
|
+using Microsoft.Extensions.Logging;
|
|
|
|
|
|
namespace Abc.Zebus.Transport
|
|
|
{
|
|
@@ -18,7 +18,7 @@ namespace Abc.Zebus.Transport
|
|
|
private readonly ZmqSocketOptions _socketOptions;
|
|
|
private readonly IZmqOutboundSocketErrorHandler _errorHandler;
|
|
|
private readonly ZmqEndPoint _configuredInboundEndPoint;
|
|
|
- private ILog _logger = LogManager.GetLogger(typeof(ZmqTransport));
|
|
|
+ private ILogger _logger = ZebusLogManager.GetLogger(typeof(ZmqTransport));
|
|
|
private ConcurrentDictionary<PeerId, ZmqOutboundSocket> _outboundSockets = new ConcurrentDictionary<PeerId, ZmqOutboundSocket>();
|
|
|
private BlockingCollection<OutboundSocketAction>? _outboundSocketActions;
|
|
|
private BlockingCollection<PendingDisconnect> _pendingDisconnects = new BlockingCollection<PendingDisconnect>();
|
|
@@ -58,7 +58,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
internal void SetLogId(int logId)
|
|
|
{
|
|
|
- _logger = LogManager.GetLogger(typeof(ZmqTransport).Assembly, typeof(ZmqTransport).FullName + "#" + logId);
|
|
|
+ _logger = ZebusLogManager.GetLogger(typeof(ZmqTransport).FullName + "#" + logId);
|
|
|
}
|
|
|
|
|
|
public void Configure(PeerId peerId, string environment)
|
|
@@ -87,7 +87,7 @@ namespace Abc.Zebus.Transport
|
|
|
public void Start()
|
|
|
{
|
|
|
var zmqVersion = ZmqUtil.GetVersion();
|
|
|
- _logger.InfoFormat("Loaded ZMQ v{0}", zmqVersion.ToString(3));
|
|
|
+ _logger.LogInformation($"Loaded ZMQ v{zmqVersion.ToString(3)}");
|
|
|
|
|
|
if (zmqVersion.Major != 4)
|
|
|
throw new InvalidOperationException($"Expected ZMQ v4.*, loaded ZMQ v{zmqVersion.ToString(3)}");
|
|
@@ -128,7 +128,7 @@ namespace Abc.Zebus.Transport
|
|
|
DiscardItems(_pendingDisconnects);
|
|
|
|
|
|
if (!_disconnectThread!.Join(30.Seconds()))
|
|
|
- _logger.Error("Unable to terminate disconnect thread");
|
|
|
+ _logger.LogError("Unable to terminate disconnect thread");
|
|
|
|
|
|
_outboundSocketActions!.CompleteAdding();
|
|
|
|
|
@@ -136,17 +136,17 @@ namespace Abc.Zebus.Transport
|
|
|
DiscardItems(_outboundSocketActions!);
|
|
|
|
|
|
if (!_outboundThread!.Join(30.Seconds()))
|
|
|
- _logger.Error("Unable to terminate outbound thread");
|
|
|
+ _logger.LogError("Unable to terminate outbound thread");
|
|
|
|
|
|
_isListening = false;
|
|
|
if (!_inboundThread!.Join(30.Seconds()))
|
|
|
- _logger.Error("Unable to terminate inbound thread");
|
|
|
+ _logger.LogError("Unable to terminate inbound thread");
|
|
|
|
|
|
_outboundSocketActions!.Dispose();
|
|
|
_outboundSocketActions = null;
|
|
|
|
|
|
_context!.Dispose();
|
|
|
- _logger.InfoFormat("{0} Stopped", PeerId);
|
|
|
+ _logger.LogInformation($"{PeerId} Stopped");
|
|
|
}
|
|
|
|
|
|
private static void DiscardItems<T>(BlockingCollection<T> collection)
|
|
@@ -169,7 +169,7 @@ namespace Abc.Zebus.Transport
|
|
|
private void Disconnect(PeerId peerId, bool delayed = true)
|
|
|
{
|
|
|
if (_outboundSockets.ContainsKey(peerId))
|
|
|
- _logger.InfoFormat("Queueing disconnect, PeerId: {0}, Delayed: {1}", peerId, delayed);
|
|
|
+ _logger.LogInformation($"Queueing disconnect, PeerId: {peerId}, Delayed: {delayed}");
|
|
|
|
|
|
if (delayed)
|
|
|
{
|
|
@@ -188,7 +188,7 @@ namespace Abc.Zebus.Transport
|
|
|
private void InboundProc(InboundProcStartSequenceState state)
|
|
|
{
|
|
|
Thread.CurrentThread.Name = "ZmqTransport.InboundProc";
|
|
|
- _logger.DebugFormat("Starting inbound proc...");
|
|
|
+ _logger.LogDebug("Starting inbound proc...");
|
|
|
|
|
|
var inboundSocket = CreateInboundSocket(state);
|
|
|
if (inboundSocket == null)
|
|
@@ -208,7 +208,7 @@ namespace Abc.Zebus.Transport
|
|
|
GracefullyDisconnectInboundSocket(inboundSocket);
|
|
|
}
|
|
|
|
|
|
- _logger.InfoFormat("InboundProc terminated");
|
|
|
+ _logger.LogInformation("InboundProc terminated");
|
|
|
}
|
|
|
|
|
|
private ZmqInboundSocket? CreateInboundSocket(InboundProcStartSequenceState state)
|
|
@@ -269,7 +269,7 @@ namespace Abc.Zebus.Transport
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- _logger.ErrorFormat("Failed to process inbound transport message, PeerId: {0}: {0}", ex);
|
|
|
+ _logger.LogError(ex, "Failed to process inbound transport message");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -278,7 +278,7 @@ namespace Abc.Zebus.Transport
|
|
|
if (bufferReader.TryReadTransportMessage(out transportMessage))
|
|
|
return true;
|
|
|
|
|
|
- _logger.Debug($"Unable to read transport message, Length: {bufferReader.Length}, Bytes: {bufferReader.ToDebugString(50)}");
|
|
|
+ _logger.LogDebug($"Unable to read transport message, Length: {bufferReader.Length}, Bytes: {bufferReader.ToDebugString(50)}");
|
|
|
|
|
|
return false;
|
|
|
}
|
|
@@ -291,17 +291,17 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
if (!isValid)
|
|
|
{
|
|
|
- _logger.Debug($"Invalid transport message received, Id: {transportMessage.Id}, MessageTypeId: {transportMessage.MessageTypeId}, Originator: {transportMessage.Originator.SenderId}");
|
|
|
+ _logger.LogDebug($"Invalid transport message received, Id: {transportMessage.Id}, MessageTypeId: {transportMessage.MessageTypeId}, Originator: {transportMessage.Originator.SenderId}");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
if (transportMessage.Environment == null) // TODO: treat as invalid message
|
|
|
{
|
|
|
- _logger.Info($"Receiving message with null environment from {transportMessage.Originator.SenderId}");
|
|
|
+ _logger.LogInformation($"Receiving message with null environment from {transportMessage.Originator.SenderId}");
|
|
|
}
|
|
|
else if (transportMessage.Environment != _environment)
|
|
|
{
|
|
|
- _logger.Error($"Receiving messages from wrong environment: {transportMessage.Environment} from {transportMessage.Originator.SenderEndPoint}, discarding message type {transportMessage.MessageTypeId}");
|
|
|
+ _logger.LogError($"Receiving messages from wrong environment: {transportMessage.Environment} from {transportMessage.Originator.SenderEndPoint}, discarding message type {transportMessage.MessageTypeId}");
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -316,18 +316,18 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
if (!_outboundSockets.ContainsKey(senderId))
|
|
|
{
|
|
|
- _logger.ErrorFormat("Received EndOfStreamAck for an unknown socket ({0}) PeerId: {1} (Known peers: {2})", senderEndPoint, senderId, string.Join(", ", _outboundSockets.Keys));
|
|
|
+ _logger.LogError("Received EndOfStreamAck for an unknown socket ({0}) PeerId: {1} (Known peers: {2})", senderEndPoint, senderId, string.Join(", ", _outboundSockets.Keys));
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- _logger.InfoFormat("Received EndOfStreamAck for {0}, {1}", senderId, senderEndPoint);
|
|
|
+ _logger.LogInformation("Received EndOfStreamAck for {0}, {1}", senderId, senderEndPoint);
|
|
|
|
|
|
_outboundSocketsToStop!.Signal();
|
|
|
}
|
|
|
|
|
|
private void SendEndOfStreamAck(TransportMessage transportMessage)
|
|
|
{
|
|
|
- _logger.InfoFormat("Sending EndOfStreamAck to {0}", transportMessage.Originator.SenderEndPoint);
|
|
|
+ _logger.LogInformation("Sending EndOfStreamAck to {0}", transportMessage.Originator.SenderEndPoint);
|
|
|
|
|
|
var endOfStreamAck = new TransportMessage(MessageTypeId.EndOfStreamAck, new MemoryStream(), PeerId, InboundEndPoint);
|
|
|
var closingPeer = new Peer(transportMessage.Originator.SenderId, transportMessage.Originator.SenderEndPoint);
|
|
@@ -339,7 +339,7 @@ namespace Abc.Zebus.Transport
|
|
|
private void OutboundProc()
|
|
|
{
|
|
|
Thread.CurrentThread.Name = "ZmqTransport.OutboundProc";
|
|
|
- _logger.DebugFormat("Starting outbound proc...");
|
|
|
+ _logger.LogDebug("Starting outbound proc...");
|
|
|
|
|
|
var bufferWriter = new ProtoBufferWriter();
|
|
|
|
|
@@ -357,7 +357,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
GracefullyDisconnectOutboundSockets(bufferWriter);
|
|
|
|
|
|
- _logger.InfoFormat("OutboundProc terminated");
|
|
|
+ _logger.LogInformation("OutboundProc terminated");
|
|
|
}
|
|
|
|
|
|
private void WriteTransportMessageAndSendToPeers(TransportMessage transportMessage, List<Peer> peers, SendContext context, ProtoBufferWriter bufferWriter)
|
|
@@ -391,7 +391,7 @@ namespace Abc.Zebus.Transport
|
|
|
var outboundSocket = GetConnectedOutboundSocket(target, transportMessage);
|
|
|
if (!outboundSocket.IsConnected)
|
|
|
{
|
|
|
- _logger.Error($"Could not send message of type {transportMessage.MessageTypeId.FullName} to peer {target.Id} because outbound socket was not connected");
|
|
|
+ _logger.LogError($"Could not send message of type {transportMessage.MessageTypeId.FullName} to peer {target.Id} because outbound socket was not connected");
|
|
|
return;
|
|
|
}
|
|
|
|
|
@@ -401,7 +401,7 @@ namespace Abc.Zebus.Transport
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- _logger.Error($"Failed to send message, PeerId: {target.Id}, EndPoint: {target.EndPoint}, Exception: {ex}");
|
|
|
+ _logger.LogError(ex, $"Failed to send message, PeerId: {target.Id}, EndPoint: {target.EndPoint}");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -441,9 +441,9 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
SendEndOfStreamMessages(connectedOutboundSockets, bufferWriter);
|
|
|
|
|
|
- _logger.InfoFormat("Waiting for {0} outbound socket end of stream acks", _outboundSocketsToStop.InitialCount);
|
|
|
+ _logger.LogInformation($"Waiting for {_outboundSocketsToStop.InitialCount} outbound socket end of stream acks");
|
|
|
if (!_outboundSocketsToStop.Wait(_configuration.WaitForEndOfStreamAckTimeout))
|
|
|
- _logger.WarnFormat("{0} peers did not respond to end of stream", _outboundSocketsToStop.CurrentCount);
|
|
|
+ _logger.LogWarning($"{_outboundSocketsToStop.CurrentCount} peers did not respond to end of stream");
|
|
|
|
|
|
DisconnectPeers(connectedOutboundSockets.Select(x => x.PeerId).ToList());
|
|
|
}
|
|
@@ -452,7 +452,7 @@ namespace Abc.Zebus.Transport
|
|
|
{
|
|
|
foreach (var outboundSocket in connectedOutboundSockets)
|
|
|
{
|
|
|
- _logger.InfoFormat("Sending EndOfStream to {0}", outboundSocket.EndPoint);
|
|
|
+ _logger.LogInformation($"Sending EndOfStream to {outboundSocket.EndPoint}");
|
|
|
|
|
|
var endOfStreamMessage = new TransportMessage(MessageTypeId.EndOfStream, new MemoryStream(), PeerId, InboundEndPoint) { WasPersisted = false };
|
|
|
bufferWriter.Reset();
|
|
@@ -487,7 +487,7 @@ namespace Abc.Zebus.Transport
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
- _logger.WarnFormat("Unable to enqueue item, Type: {0}, Exception: {1}", typeof(T).Name, ex);
|
|
|
+ _logger.LogWarning(ex, $"Unable to enqueue item, Type: {typeof(T).Name}");
|
|
|
}
|
|
|
}
|
|
|
|