| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- using System;
- using System.Diagnostics;
- using System.Text;
- using Abc.Zebus.Transport.Zmq;
- using log4net;
- namespace Abc.Zebus.Transport
- {
- internal class ZmqOutboundSocket
- {
- private static readonly ILog _logger = LogManager.GetLogger(typeof(ZmqOutboundSocket));
- private readonly Stopwatch _closedStateStopwatch = new Stopwatch();
- private readonly ZmqContext _context;
- private readonly ZmqSocketOptions _options;
- private readonly IZmqOutboundSocketErrorHandler _errorHandler;
- private ZmqSocket? _socket;
- private int _failedSendCount;
- private bool _isInClosedState;
- private TimeSpan _closedStateDuration;
- public ZmqOutboundSocket(ZmqContext context, PeerId peerId, string endPoint, ZmqSocketOptions options, IZmqOutboundSocketErrorHandler errorHandler)
- {
- _context = context;
- _options = options;
- _errorHandler = errorHandler;
- PeerId = peerId;
- EndPoint = endPoint;
- }
- public PeerId PeerId { get; }
- public bool IsConnected { get; private set; }
- public string EndPoint { get; private set; }
- public void ConnectFor(TransportMessage message)
- {
- if (!CanSendOrConnect(message))
- return;
- try
- {
- _socket = CreateSocket();
- _socket.Connect(EndPoint);
- IsConnected = true;
- _logger.InfoFormat("Socket connected, Peer: {0}, EndPoint: {1}", PeerId, EndPoint);
- }
- catch (Exception ex)
- {
- _socket?.Dispose();
- _socket = null;
- IsConnected = false;
- _logger.ErrorFormat("Unable to connect socket, Peer: {0}, EndPoint: {1}, Exception: {2}", PeerId, EndPoint, ex);
- _errorHandler.OnConnectException(PeerId, EndPoint, ex);
- SwitchToClosedState(_options.ClosedStateDurationAfterConnectFailure);
- }
- }
- private ZmqSocket CreateSocket()
- {
- var socket = new ZmqSocket(_context, ZmqSocketType.PUSH);
- socket.SetOption(ZmqSocketOption.SNDHWM, _options.SendHighWaterMark);
- socket.SetOption(ZmqSocketOption.SNDTIMEO, (int)_options.SendTimeout.TotalMilliseconds);
- if (_options.KeepAlive != null)
- {
- socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE, _options.KeepAlive.Enabled ? 1 : 0);
- if (_options.KeepAlive.KeepAliveTimeout != null)
- socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE_IDLE, (int)_options.KeepAlive.KeepAliveTimeout.Value.TotalSeconds);
- if (_options.KeepAlive.KeepAliveInterval != null)
- socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE_INTVL, (int)_options.KeepAlive.KeepAliveInterval.Value.TotalSeconds);
- }
- socket.SetOption(ZmqSocketOption.ROUTING_ID, Encoding.ASCII.GetBytes(PeerId.ToString()));
- return socket;
- }
- public void ReconnectFor(string endPoint, TransportMessage message)
- {
- Disconnect();
- EndPoint = endPoint;
- ConnectFor(message);
- }
- public void Disconnect()
- {
- if (!IsConnected)
- return;
- try
- {
- _socket!.SetOption(ZmqSocketOption.LINGER, 0);
- _socket!.Dispose();
- _logger.InfoFormat("Socket disconnected, Peer: {0}", PeerId);
- }
- catch (Exception ex)
- {
- _logger.ErrorFormat("Unable to disconnect socket, Peer: {0}, Exception: {1}", PeerId, ex);
- _errorHandler.OnDisconnectException(PeerId, EndPoint, ex);
- }
- IsConnected = false;
- }
- public void Send(byte[] buffer, int length, TransportMessage message)
- {
- if (!CanSendOrConnect(message))
- return;
- if (_socket!.TrySend(buffer, 0, length, out var error))
- {
- _failedSendCount = 0;
- return;
- }
- var hasReachedHighWaterMark = error == ZmqErrorCode.EAGAIN;
- var errorMesage = hasReachedHighWaterMark ? "High water mark reached" : error.ToErrorMessage();
- _logger.ErrorFormat("Unable to send message, destination peer: {0}, MessageTypeId: {1}, MessageId: {2}, Error: {3}", PeerId, message.MessageTypeId, message.Id, errorMesage);
- _errorHandler.OnSendFailed(PeerId, EndPoint, message.MessageTypeId, message.Id);
- if (_failedSendCount >= _options.SendRetriesBeforeSwitchingToClosedState)
- SwitchToClosedState(_options.ClosedStateDurationAfterSendFailure);
- ++_failedSendCount;
- }
- private bool CanSendOrConnect(TransportMessage message)
- {
- if (_isInClosedState)
- {
- if (_closedStateStopwatch.Elapsed < _closedStateDuration)
- {
- _logger.WarnFormat("Send or connect ignored in closed state, Peer: {0}, MessageTypeId: {1}, MessageId: {2}", PeerId, message.MessageTypeId, message.Id);
- return false;
- }
- SwitchToOpenState();
- }
- return true;
- }
- private void SwitchToClosedState(TimeSpan duration)
- {
- _logger.ErrorFormat("Switching to closed state, Peer: {0}, Duration: {1}", PeerId, duration);
- _closedStateStopwatch.Start();
- _closedStateDuration = duration;
- _isInClosedState = true;
- }
- private void SwitchToOpenState()
- {
- _logger.InfoFormat("Switching back to open state, Peer: {0}", PeerId);
- _isInClosedState = false;
- _closedStateStopwatch.Reset();
- }
- }
- }
|