ZmqOutboundSocket.cs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. using System;
  2. using System.Diagnostics;
  3. using System.Text;
  4. using Abc.Zebus.Transport.Zmq;
  5. using log4net;
  6. namespace Abc.Zebus.Transport
  7. {
  8. internal class ZmqOutboundSocket
  9. {
  10. private static readonly ILog _logger = LogManager.GetLogger(typeof(ZmqOutboundSocket));
  11. private readonly Stopwatch _closedStateStopwatch = new Stopwatch();
  12. private readonly ZmqContext _context;
  13. private readonly ZmqSocketOptions _options;
  14. private readonly IZmqOutboundSocketErrorHandler _errorHandler;
  15. private ZmqSocket? _socket;
  16. private int _failedSendCount;
  17. private bool _isInClosedState;
  18. private TimeSpan _closedStateDuration;
  19. public ZmqOutboundSocket(ZmqContext context, PeerId peerId, string endPoint, ZmqSocketOptions options, IZmqOutboundSocketErrorHandler errorHandler)
  20. {
  21. _context = context;
  22. _options = options;
  23. _errorHandler = errorHandler;
  24. PeerId = peerId;
  25. EndPoint = endPoint;
  26. }
  27. public PeerId PeerId { get; }
  28. public bool IsConnected { get; private set; }
  29. public string EndPoint { get; private set; }
  30. public void ConnectFor(TransportMessage message)
  31. {
  32. if (!CanSendOrConnect(message))
  33. return;
  34. try
  35. {
  36. _socket = CreateSocket();
  37. _socket.Connect(EndPoint);
  38. IsConnected = true;
  39. _logger.InfoFormat("Socket connected, Peer: {0}, EndPoint: {1}", PeerId, EndPoint);
  40. }
  41. catch (Exception ex)
  42. {
  43. _socket?.Dispose();
  44. _socket = null;
  45. IsConnected = false;
  46. _logger.ErrorFormat("Unable to connect socket, Peer: {0}, EndPoint: {1}, Exception: {2}", PeerId, EndPoint, ex);
  47. _errorHandler.OnConnectException(PeerId, EndPoint, ex);
  48. SwitchToClosedState(_options.ClosedStateDurationAfterConnectFailure);
  49. }
  50. }
  51. private ZmqSocket CreateSocket()
  52. {
  53. var socket = new ZmqSocket(_context, ZmqSocketType.PUSH);
  54. socket.SetOption(ZmqSocketOption.SNDHWM, _options.SendHighWaterMark);
  55. socket.SetOption(ZmqSocketOption.SNDTIMEO, (int)_options.SendTimeout.TotalMilliseconds);
  56. if (_options.KeepAlive != null)
  57. {
  58. socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE, _options.KeepAlive.Enabled ? 1 : 0);
  59. if (_options.KeepAlive.KeepAliveTimeout != null)
  60. socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE_IDLE, (int)_options.KeepAlive.KeepAliveTimeout.Value.TotalSeconds);
  61. if (_options.KeepAlive.KeepAliveInterval != null)
  62. socket.SetOption(ZmqSocketOption.TCP_KEEPALIVE_INTVL, (int)_options.KeepAlive.KeepAliveInterval.Value.TotalSeconds);
  63. }
  64. socket.SetOption(ZmqSocketOption.ROUTING_ID, Encoding.ASCII.GetBytes(PeerId.ToString()));
  65. return socket;
  66. }
  67. public void ReconnectFor(string endPoint, TransportMessage message)
  68. {
  69. Disconnect();
  70. EndPoint = endPoint;
  71. ConnectFor(message);
  72. }
  73. public void Disconnect()
  74. {
  75. if (!IsConnected)
  76. return;
  77. try
  78. {
  79. _socket!.SetOption(ZmqSocketOption.LINGER, 0);
  80. _socket!.Dispose();
  81. _logger.InfoFormat("Socket disconnected, Peer: {0}", PeerId);
  82. }
  83. catch (Exception ex)
  84. {
  85. _logger.ErrorFormat("Unable to disconnect socket, Peer: {0}, Exception: {1}", PeerId, ex);
  86. _errorHandler.OnDisconnectException(PeerId, EndPoint, ex);
  87. }
  88. IsConnected = false;
  89. }
  90. public void Send(byte[] buffer, int length, TransportMessage message)
  91. {
  92. if (!CanSendOrConnect(message))
  93. return;
  94. if (_socket!.TrySend(buffer, 0, length, out var error))
  95. {
  96. _failedSendCount = 0;
  97. return;
  98. }
  99. var hasReachedHighWaterMark = error == ZmqErrorCode.EAGAIN;
  100. var errorMesage = hasReachedHighWaterMark ? "High water mark reached" : error.ToErrorMessage();
  101. _logger.ErrorFormat("Unable to send message, destination peer: {0}, MessageTypeId: {1}, MessageId: {2}, Error: {3}", PeerId, message.MessageTypeId, message.Id, errorMesage);
  102. _errorHandler.OnSendFailed(PeerId, EndPoint, message.MessageTypeId, message.Id);
  103. if (_failedSendCount >= _options.SendRetriesBeforeSwitchingToClosedState)
  104. SwitchToClosedState(_options.ClosedStateDurationAfterSendFailure);
  105. ++_failedSendCount;
  106. }
  107. private bool CanSendOrConnect(TransportMessage message)
  108. {
  109. if (_isInClosedState)
  110. {
  111. if (_closedStateStopwatch.Elapsed < _closedStateDuration)
  112. {
  113. _logger.WarnFormat("Send or connect ignored in closed state, Peer: {0}, MessageTypeId: {1}, MessageId: {2}", PeerId, message.MessageTypeId, message.Id);
  114. return false;
  115. }
  116. SwitchToOpenState();
  117. }
  118. return true;
  119. }
  120. private void SwitchToClosedState(TimeSpan duration)
  121. {
  122. _logger.ErrorFormat("Switching to closed state, Peer: {0}, Duration: {1}", PeerId, duration);
  123. _closedStateStopwatch.Start();
  124. _closedStateDuration = duration;
  125. _isInClosedState = true;
  126. }
  127. private void SwitchToOpenState()
  128. {
  129. _logger.InfoFormat("Switching back to open state, Peer: {0}", PeerId);
  130. _isInClosedState = false;
  131. _closedStateStopwatch.Reset();
  132. }
  133. }
  134. }