|
@@ -18,17 +18,17 @@ namespace Abc.Zebus.Transport
|
|
|
private readonly IZmqOutboundSocketErrorHandler _errorHandler;
|
|
|
private readonly ZmqEndPoint _configuredInboundEndPoint;
|
|
|
private ILog _logger = LogManager.GetLogger(typeof(ZmqTransport));
|
|
|
- private ConcurrentDictionary<PeerId, ZmqOutboundSocket> _outboundSockets;
|
|
|
- private BlockingCollection<OutboundSocketAction> _outboundSocketActions;
|
|
|
- private BlockingCollection<PendingDisconnect> _pendingDisconnects;
|
|
|
- private ZmqContext _context;
|
|
|
- private Thread _inboundThread;
|
|
|
- private Thread _outboundThread;
|
|
|
- private Thread _disconnectThread;
|
|
|
+ private ConcurrentDictionary<PeerId, ZmqOutboundSocket> _outboundSockets = new ConcurrentDictionary<PeerId, ZmqOutboundSocket>();
|
|
|
+ private BlockingCollection<OutboundSocketAction>? _outboundSocketActions;
|
|
|
+ private BlockingCollection<PendingDisconnect> _pendingDisconnects = new BlockingCollection<PendingDisconnect>();
|
|
|
+ private ZmqContext? _context;
|
|
|
+ private Thread? _inboundThread;
|
|
|
+ private Thread? _outboundThread;
|
|
|
+ private Thread? _disconnectThread;
|
|
|
private volatile bool _isListening;
|
|
|
- private ZmqEndPoint _realInboundEndPoint;
|
|
|
- private string _environment;
|
|
|
- private CountdownEvent _outboundSocketsToStop;
|
|
|
+ private ZmqEndPoint? _realInboundEndPoint;
|
|
|
+ private string _environment = string.Empty;
|
|
|
+ private CountdownEvent? _outboundSocketsToStop;
|
|
|
private bool _isRunning;
|
|
|
|
|
|
public ZmqTransport(IZmqTransportConfiguration configuration, ZmqSocketOptions socketOptions, IZmqOutboundSocketErrorHandler errorHandler)
|
|
@@ -39,7 +39,7 @@ namespace Abc.Zebus.Transport
|
|
|
SocketOptions = socketOptions;
|
|
|
}
|
|
|
|
|
|
- public event Action<TransportMessage> MessageReceived;
|
|
|
+ public event Action<TransportMessage>? MessageReceived;
|
|
|
|
|
|
public bool IsListening => _isListening;
|
|
|
|
|
@@ -121,25 +121,25 @@ namespace Abc.Zebus.Transport
|
|
|
if (discardPendingMessages)
|
|
|
DiscardItems(_pendingDisconnects);
|
|
|
|
|
|
- if (!_disconnectThread.Join(30.Seconds()))
|
|
|
+ if (!_disconnectThread!.Join(30.Seconds()))
|
|
|
_logger.Error("Unable to terminate disconnect thread");
|
|
|
|
|
|
- _outboundSocketActions.CompleteAdding();
|
|
|
+ _outboundSocketActions!.CompleteAdding();
|
|
|
|
|
|
if (discardPendingMessages)
|
|
|
- DiscardItems(_outboundSocketActions);
|
|
|
+ DiscardItems(_outboundSocketActions!);
|
|
|
|
|
|
- if (!_outboundThread.Join(30.Seconds()))
|
|
|
+ if (!_outboundThread!.Join(30.Seconds()))
|
|
|
_logger.Error("Unable to terminate outbound thread");
|
|
|
|
|
|
_isListening = false;
|
|
|
- if (!_inboundThread.Join(30.Seconds()))
|
|
|
+ if (!_inboundThread!.Join(30.Seconds()))
|
|
|
_logger.Error("Unable to terminate inbound thread");
|
|
|
|
|
|
- _outboundSocketActions.Dispose();
|
|
|
+ _outboundSocketActions!.Dispose();
|
|
|
_outboundSocketActions = null;
|
|
|
|
|
|
- _context.Dispose();
|
|
|
+ _context!.Dispose();
|
|
|
_logger.InfoFormat("{0} Stopped", PeerId);
|
|
|
}
|
|
|
|
|
@@ -157,7 +157,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
public void Send(TransportMessage message, IEnumerable<Peer> peers, SendContext context)
|
|
|
{
|
|
|
- _outboundSocketActions.Add(OutboundSocketAction.Send(message, peers, context));
|
|
|
+ _outboundSocketActions!.Add(OutboundSocketAction.Send(message, peers, context));
|
|
|
}
|
|
|
|
|
|
private void Disconnect(PeerId peerId, bool delayed = true)
|
|
@@ -171,7 +171,7 @@ namespace Abc.Zebus.Transport
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- SafeAdd(_outboundSocketActions, OutboundSocketAction.Disconnect(peerId));
|
|
|
+ SafeAdd(_outboundSocketActions!, OutboundSocketAction.Disconnect(peerId));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -205,12 +205,12 @@ namespace Abc.Zebus.Transport
|
|
|
_logger.InfoFormat("InboundProc terminated");
|
|
|
}
|
|
|
|
|
|
- private ZmqInboundSocket CreateInboundSocket(InboundProcStartSequenceState state)
|
|
|
+ private ZmqInboundSocket? CreateInboundSocket(InboundProcStartSequenceState state)
|
|
|
{
|
|
|
- ZmqInboundSocket inboundSocket = null;
|
|
|
+ ZmqInboundSocket? inboundSocket = null;
|
|
|
try
|
|
|
{
|
|
|
- inboundSocket = new ZmqInboundSocket(_context, PeerId, _configuredInboundEndPoint, SocketOptions, _environment);
|
|
|
+ inboundSocket = new ZmqInboundSocket(_context!, PeerId, _configuredInboundEndPoint, SocketOptions, _environment);
|
|
|
_realInboundEndPoint = inboundSocket.Bind();
|
|
|
return inboundSocket;
|
|
|
}
|
|
@@ -231,7 +231,7 @@ namespace Abc.Zebus.Transport
|
|
|
{
|
|
|
inboundSocket.Disconnect();
|
|
|
|
|
|
- CodedInputStream inputStream;
|
|
|
+ CodedInputStream? inputStream;
|
|
|
while ((inputStream = inboundSocket.Receive(100.Milliseconds())) != null)
|
|
|
DeserializeAndForwardTransportMessage(inputStream);
|
|
|
}
|
|
@@ -279,7 +279,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
_logger.InfoFormat("Received EndOfStreamAck for {0}, {1}", senderId, senderEndPoint);
|
|
|
|
|
|
- _outboundSocketsToStop.Signal();
|
|
|
+ _outboundSocketsToStop!.Signal();
|
|
|
}
|
|
|
|
|
|
private void SendEndOfStreamAck(TransportMessage transportMessage)
|
|
@@ -289,8 +289,8 @@ namespace Abc.Zebus.Transport
|
|
|
var endOfStreamAck = new TransportMessage(MessageTypeId.EndOfStreamAck, new MemoryStream(), PeerId, InboundEndPoint);
|
|
|
var closingPeer = new Peer(transportMessage.Originator.SenderId, transportMessage.Originator.SenderEndPoint);
|
|
|
|
|
|
- SafeAdd(_outboundSocketActions, OutboundSocketAction.Send(endOfStreamAck, new[] { closingPeer }, new SendContext()));
|
|
|
- SafeAdd(_pendingDisconnects, new PendingDisconnect(closingPeer.Id, SystemDateTime.UtcNow.Add(_configuration.WaitForEndOfStreamAckTimeout)));
|
|
|
+ SafeAdd(_outboundSocketActions!, OutboundSocketAction.Send(endOfStreamAck, new[] { closingPeer }, new SendContext()));
|
|
|
+ SafeAdd(_pendingDisconnects!, new PendingDisconnect(closingPeer.Id, SystemDateTime.UtcNow.Add(_configuration.WaitForEndOfStreamAckTimeout)));
|
|
|
}
|
|
|
|
|
|
private bool IsFromCurrentEnvironment(TransportMessage transportMessage)
|
|
@@ -315,7 +315,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
var outputStream = new CodedOutputStream();
|
|
|
|
|
|
- foreach (var socketAction in _outboundSocketActions.GetConsumingEnumerable())
|
|
|
+ foreach (var socketAction in _outboundSocketActions!.GetConsumingEnumerable())
|
|
|
{
|
|
|
if (socketAction.IsDisconnect)
|
|
|
{
|
|
@@ -389,7 +389,7 @@ namespace Abc.Zebus.Transport
|
|
|
{
|
|
|
if (!_outboundSockets.TryGetValue(peer.Id, out var outboundSocket))
|
|
|
{
|
|
|
- outboundSocket = new ZmqOutboundSocket(_context, peer.Id, peer.EndPoint, SocketOptions, _errorHandler);
|
|
|
+ outboundSocket = new ZmqOutboundSocket(_context!, peer.Id, peer.EndPoint, SocketOptions, _errorHandler);
|
|
|
outboundSocket.ConnectFor(transportMessage);
|
|
|
|
|
|
_outboundSockets.TryAdd(peer.Id, outboundSocket);
|
|
@@ -444,7 +444,7 @@ namespace Abc.Zebus.Transport
|
|
|
Thread.Sleep(500);
|
|
|
}
|
|
|
|
|
|
- SafeAdd(_outboundSocketActions, OutboundSocketAction.Disconnect(pendingDisconnect.PeerId));
|
|
|
+ SafeAdd(_outboundSocketActions!, OutboundSocketAction.Disconnect(pendingDisconnect.PeerId));
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -462,7 +462,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
private readonly struct OutboundSocketAction
|
|
|
{
|
|
|
- private static readonly TransportMessage _disconnectMessage = new TransportMessage(default, null, new PeerId(), null);
|
|
|
+ private static readonly TransportMessage _disconnectMessage = new TransportMessage(default, null!, new PeerId(), null!);
|
|
|
|
|
|
private OutboundSocketAction(TransportMessage message, IEnumerable<Peer> targets, SendContext context)
|
|
|
{
|
|
@@ -480,7 +480,7 @@ namespace Abc.Zebus.Transport
|
|
|
=> new OutboundSocketAction(message, peers, context);
|
|
|
|
|
|
public static OutboundSocketAction Disconnect(PeerId peerId)
|
|
|
- => new OutboundSocketAction(_disconnectMessage, new List<Peer> { new Peer(peerId, null) }, null);
|
|
|
+ => new OutboundSocketAction(_disconnectMessage, new List<Peer> { new Peer(peerId, null!) }, null!);
|
|
|
}
|
|
|
|
|
|
private class PendingDisconnect
|
|
@@ -497,7 +497,7 @@ namespace Abc.Zebus.Transport
|
|
|
|
|
|
private class InboundProcStartSequenceState
|
|
|
{
|
|
|
- private Exception _inboundProcStartException;
|
|
|
+ private Exception? _inboundProcStartException;
|
|
|
private readonly ManualResetEvent _inboundProcStartedSignal = new ManualResetEvent(false);
|
|
|
|
|
|
public void Wait()
|