RedisConnectionManager.cs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading.Tasks;
  4. using Masuit.Tools.Core.Systems;
  5. using StackExchange.Redis;
  6. namespace Masuit.Tools.Core.NoSQL
  7. {
  8. /// <summary>
  9. /// ConnectionMultiplexer对象管理帮助类
  10. /// </summary>
  11. public static class RedisConnectionManager
  12. {
  13. /// <summary>
  14. /// Redis服务器连接字符串,默认为:127.0.0.1:6379,allowadmin=true<br/>
  15. /// </summary>
  16. public static string RedisConnectionString
  17. {
  18. get => "127.0.0.1:6379,allowadmin=true";
  19. set { }
  20. }
  21. //private static readonly object Locker = new object();
  22. //private static ConnectionMultiplexer _instance;
  23. private static readonly ConcurrentDictionary<string, ConcurrentLimitedQueue<ConnectionMultiplexer>> ConnectionCache = new ConcurrentDictionary<string, ConcurrentLimitedQueue<ConnectionMultiplexer>>();
  24. /// <summary>
  25. /// 对象池获取线程内唯一对象
  26. /// </summary>
  27. public static ConnectionMultiplexer Instance
  28. {
  29. get
  30. {
  31. var queue = ConnectionCache.GetOrAdd(RedisConnectionString, new ConcurrentLimitedQueue<ConnectionMultiplexer>(32));
  32. if (queue.IsEmpty)
  33. {
  34. Parallel.For(0, queue.Limit, i =>
  35. {
  36. queue.Enqueue(GetManager(RedisConnectionString));
  37. });
  38. }
  39. queue.TryDequeue(out var multiplexer);
  40. return multiplexer;
  41. }
  42. }
  43. /// <summary>
  44. /// 缓存获取
  45. /// </summary>
  46. /// <param name="connectionString">连接字符串</param>
  47. /// <returns>连接对象</returns>
  48. public static ConnectionMultiplexer GetConnectionMultiplexer(string connectionString)
  49. {
  50. var queue = ConnectionCache.GetOrAdd(connectionString, new ConcurrentLimitedQueue<ConnectionMultiplexer>(32));
  51. if (queue.IsEmpty)
  52. {
  53. Parallel.For(0, queue.Limit, i =>
  54. {
  55. queue.Enqueue(GetManager(connectionString));
  56. });
  57. }
  58. queue.TryDequeue(out var multiplexer);
  59. return multiplexer;
  60. }
  61. private static ConnectionMultiplexer GetManager(string connectionString = null)
  62. {
  63. connectionString = connectionString ?? RedisConnectionString;
  64. var connect = ConnectionMultiplexer.Connect(ConfigurationOptions.Parse(connectionString, true));
  65. //注册如下事件
  66. connect.ConnectionFailed += MuxerConnectionFailed;
  67. connect.ConnectionRestored += MuxerConnectionRestored;
  68. connect.ErrorMessage += MuxerErrorMessage;
  69. connect.ConfigurationChanged += MuxerConfigurationChanged;
  70. connect.HashSlotMoved += MuxerHashSlotMoved;
  71. connect.InternalError += MuxerInternalError;
  72. return connect;
  73. }
  74. #region 事件
  75. /// <summary>
  76. /// 配置更改时
  77. /// </summary>
  78. /// <param name="sender">触发者</param>
  79. /// <param name="e">事件参数</param>
  80. private static void MuxerConfigurationChanged(object sender, EndPointEventArgs e)
  81. {
  82. Console.WriteLine("Configuration changed: " + e.EndPoint);
  83. }
  84. /// <summary>
  85. /// 发生错误时
  86. /// </summary>
  87. /// <param name="sender"></param>
  88. /// <param name="e"></param>
  89. private static void MuxerErrorMessage(object sender, RedisErrorEventArgs e)
  90. {
  91. Console.WriteLine("ErrorMessage: " + e.Message);
  92. }
  93. /// <summary>
  94. /// 重新建立连接之前的错误
  95. /// </summary>
  96. /// <param name="sender"></param>
  97. /// <param name="e"></param>
  98. private static void MuxerConnectionRestored(object sender, ConnectionFailedEventArgs e)
  99. {
  100. Console.WriteLine("ConnectionRestored: " + e.EndPoint);
  101. }
  102. /// <summary>
  103. /// 连接失败 , 如果重新连接成功你将不会收到这个通知
  104. /// </summary>
  105. /// <param name="sender"></param>
  106. /// <param name="e"></param>
  107. private static void MuxerConnectionFailed(object sender, ConnectionFailedEventArgs e)
  108. {
  109. Console.WriteLine("重新连接:Endpoint failed: " + e.EndPoint + ", " + e.FailureType + (e.Exception == null ? "" : (", " + e.Exception.Message)));
  110. }
  111. /// <summary>
  112. /// 更改集群
  113. /// </summary>
  114. /// <param name="sender"></param>
  115. /// <param name="e"></param>
  116. private static void MuxerHashSlotMoved(object sender, HashSlotMovedEventArgs e)
  117. {
  118. Console.WriteLine("HashSlotMoved:NewEndPoint" + e.NewEndPoint + ", OldEndPoint" + e.OldEndPoint);
  119. }
  120. /// <summary>
  121. /// redis类库错误
  122. /// </summary>
  123. /// <param name="sender"></param>
  124. /// <param name="e"></param>
  125. private static void MuxerInternalError(object sender, InternalErrorEventArgs e)
  126. {
  127. Console.WriteLine("InternalError:Message" + e.Exception.Message);
  128. }
  129. #endregion 事件
  130. }
  131. }