RedisLock.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using System.Timers;
  6. using StackExchange.Redis;
  7. namespace Masuit.Tools.Systems
  8. {
  9. public class RedisLock : IDisposable
  10. {
  11. #region Property
  12. private bool _isDisposed;
  13. ~RedisLock()
  14. {
  15. Dispose(false);
  16. }
  17. /// <summary>
  18. /// KEYS[1] :需要加锁的key,这里需要是字符串类型。
  19. /// ARGV[1] :锁的超时时间,防止死锁
  20. /// ARGV[2] :锁的唯一标识
  21. /// </summary>
  22. private const String LockScript = @"
  23. if (redis.call('exists', KEYS[1]) == 0) then
  24. redis.call('hset', KEYS[1], ARGV[2], 1);
  25. redis.call('pexpire', KEYS[1], ARGV[1]);
  26. return nil;
  27. end;
  28. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  29. redis.call('hincrby', KEYS[1], ARGV[2], 1);
  30. redis.call('pexpire', KEYS[1], ARGV[1]);
  31. return nil;
  32. end;
  33. return redis.call('pttl', KEYS[1]);
  34. ";
  35. /// <summary>
  36. /// – KEYS[1] :需要加锁的key,这里需要是字符串类型。
  37. /// – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
  38. /// – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
  39. /// – ARGV[2] :锁的超时时间,防止死锁
  40. /// – ARGV[3] :锁的唯一标识
  41. /// </summary>
  42. private const String UnLockScript = @"
  43. if (redis.call('exists', KEYS[1]) == 0) then
  44. redis.call('publish', KEYS[2], ARGV[1]);
  45. return 1;
  46. end;
  47. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  48. return nil;
  49. end;
  50. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  51. if (counter > 0) then
  52. redis.call('pexpire', KEYS[1], ARGV[2]);
  53. return 0;
  54. else
  55. redis.call('del', KEYS[1]);
  56. redis.call('publish', KEYS[2], ARGV[1]);
  57. return 1;
  58. end;
  59. return nil;
  60. ";
  61. private const double ClockDriveFactor = 0.01;
  62. /// <summary>
  63. /// 默认的30秒过期时间
  64. /// </summary>
  65. private readonly TimeSpan _leaseTimeSpan = new TimeSpan(0, 0, 30);
  66. private readonly ConcurrentDictionary<string, CancellationTokenSource> _expirationRenewalMap = new ConcurrentDictionary<string, CancellationTokenSource>();
  67. private ConnectionMultiplexer _server;
  68. #endregion
  69. #region Constructor
  70. /// <summary>
  71. /// 默认连接127.0.0.1:6379,synctimeout=20000
  72. /// </summary>
  73. /// <param name="connstr"></param>
  74. public RedisLock(string connstr = "127.0.0.1:6379,synctimeout=20000")
  75. {
  76. _server = ConnectionMultiplexer.Connect(connstr);
  77. _server.PreserveAsyncOrder = false;
  78. }
  79. #endregion
  80. #region Public
  81. /// <summary>
  82. /// 加锁
  83. /// </summary>
  84. /// <param name="resource">锁名</param>
  85. /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长</param>
  86. /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间</param>
  87. /// <param name="lockObject">锁成功信息包装成对象返回</param>
  88. /// <returns>true:成功</returns>
  89. public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
  90. {
  91. lockObject = null;
  92. try
  93. {
  94. var startTime = DateTime.Now;
  95. var val = CreateUniqueLockId();
  96. //申请锁,返回还剩余的锁过期时间
  97. var ttl = TryAcquire(resource, val, leaseTimeSpan);
  98. var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  99. var validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  100. // 如果为空,表示申请锁成功
  101. if (ttl.IsNull)
  102. {
  103. lockObject = new Lock(resource, val, validityTime);
  104. //开始一个调度程序
  105. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  106. return true;
  107. }
  108. // 订阅监听redis消息
  109. Subscriber(resource);
  110. startTime = DateTime.Now;
  111. while (true)
  112. {
  113. // 再次尝试一次申请锁
  114. ttl = TryAcquire(resource, val, leaseTimeSpan);
  115. // 获得锁,返回
  116. if (ttl.IsNull)
  117. {
  118. lockObject = new Lock(resource, val, validityTime);
  119. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  120. return true;
  121. }
  122. drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  123. validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  124. if (validityTime.TotalMilliseconds < 0)
  125. {
  126. //说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
  127. //Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
  128. return false;
  129. }
  130. }
  131. }
  132. catch (Exception)
  133. {
  134. return false;
  135. }
  136. finally
  137. {
  138. // 无论是否获得锁,都要取消订阅解锁消息
  139. UnSubscriber(resource);
  140. }
  141. }
  142. /// <summary>
  143. /// 解锁
  144. /// </summary>
  145. /// <param name="lockObject">锁成功的返回对象</param>
  146. /// <returns></returns>
  147. public RedisResult UnLock(Lock lockObject)
  148. {
  149. if (lockObject == null)
  150. {
  151. return null;
  152. }
  153. CancelExpirationRenewal(lockObject);
  154. RedisKey[] key =
  155. {
  156. lockObject.Resource,
  157. GetChannelName(lockObject.Resource)
  158. };
  159. RedisValue[] values =
  160. {
  161. Thread.CurrentThread.ManagedThreadId,
  162. 10000,
  163. lockObject.Value
  164. };
  165. return _server.GetDatabase().ScriptEvaluate(UnLockScript, key, values);
  166. }
  167. #endregion
  168. #region Private
  169. private void Subscriber(RedisKey resource)
  170. {
  171. //Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
  172. //var aa = Thread.CurrentThread.ManagedThreadId;
  173. ISubscriber sub = _server.GetSubscriber();
  174. sub.Subscribe(GetChannelName(resource), (channel, message) =>
  175. {
  176. //Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
  177. });
  178. }
  179. private void UnSubscriber(RedisKey resource)
  180. {
  181. ISubscriber sub = _server.GetSubscriber();
  182. sub.Unsubscribe(GetChannelName(resource));
  183. }
  184. private string GetChannelName(RedisKey resource)
  185. {
  186. return "redisson_lock__channel__{" + resource.ToString() + "}";
  187. }
  188. private RedisResult TryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
  189. {
  190. if (leaseTimeSpan != null)
  191. {
  192. return LockInnerAsync(resource, leaseTimeSpan.Value, value);
  193. }
  194. return LockInnerAsync(resource, value);
  195. }
  196. private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
  197. {
  198. RedisKey[] key =
  199. {
  200. resource
  201. };
  202. RedisValue[] values =
  203. {
  204. waitTime.TotalMilliseconds,
  205. threadId
  206. };
  207. return _server.GetDatabase().ScriptEvaluate(LockScript, key, values);
  208. }
  209. private RedisResult LockInnerAsync(RedisKey resource, string threadId)
  210. {
  211. var task = LockInnerAsync(resource, _leaseTimeSpan, threadId);
  212. return task;
  213. }
  214. protected static string CreateUniqueLockId()
  215. {
  216. return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
  217. }
  218. protected void SetTimeOut(ElapsedEventHandler doWork, int time)
  219. {
  220. System.Timers.Timer timer = new System.Timers.Timer();
  221. timer.Interval = time;
  222. timer.Elapsed += (sender, args) => timer.Stop();
  223. timer.Elapsed += doWork;
  224. timer.Start();
  225. }
  226. protected CancellationTokenSource TaskTimeOut(Func<Lock, bool> action, Lock lockObj, int time)
  227. {
  228. var timeoutCancellationTokenSource = new CancellationTokenSource();
  229. Task.Run(() =>
  230. {
  231. SpinWait.SpinUntil(() => !timeoutCancellationTokenSource.IsCancellationRequested);
  232. }, timeoutCancellationTokenSource.Token);
  233. return timeoutCancellationTokenSource;
  234. }
  235. private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
  236. {
  237. ScheduleExpirationRenewal((lockObj) => _server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject, Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
  238. }
  239. private void ScheduleExpirationRenewal(Func<Lock, bool> action, Lock lockObj, int time)
  240. {
  241. // 保证任务不会被重复创建
  242. if (_expirationRenewalMap.ContainsKey(lockObj.Resource))
  243. {
  244. return;
  245. }
  246. var task = TaskTimeOut(action, lockObj, time);
  247. //如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建
  248. if (!_expirationRenewalMap.TryAdd(lockObj.Resource, task))
  249. {
  250. task.Cancel();
  251. }
  252. }
  253. private void CancelExpirationRenewal(Lock lockObj)
  254. {
  255. CancellationTokenSource task;
  256. if (_expirationRenewalMap.TryRemove(lockObj.Resource, out task))
  257. {
  258. task?.Cancel();
  259. }
  260. }
  261. #endregion
  262. /// <summary>执行与释放或重置非托管资源关联的应用程序定义的任务。</summary>
  263. public void Dispose()
  264. {
  265. Dispose(true);
  266. GC.SuppressFinalize(this);
  267. }
  268. public virtual void Dispose(bool disposing)
  269. {
  270. if (_isDisposed)
  271. {
  272. return;
  273. }
  274. _server?.Dispose();
  275. _isDisposed = true;
  276. //_server = null;
  277. }
  278. }
  279. }