RedisLock.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. using StackExchange.Redis;
  2. using System;
  3. using System.Collections.Concurrent;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using System.Timers;
  7. namespace Masuit.Tools.Systems
  8. {
  9. /// <summary>
  10. /// Redis分布式锁
  11. /// </summary>
  12. public class RedisLock : IDisposable
  13. {
  14. #region Property
  15. private bool _isDisposed;
  16. /// <summary>
  17. /// 终结器
  18. /// </summary>
  19. ~RedisLock()
  20. {
  21. Dispose(false);
  22. }
  23. /// <summary>
  24. /// KEYS[1] :需要加锁的key,这里需要是字符串类型。
  25. /// ARGV[1] :锁的超时时间,防止死锁
  26. /// ARGV[2] :锁的唯一标识
  27. /// </summary>
  28. private const String LockScript = @"
  29. if (redis.call('exists', KEYS[1]) == 0) then
  30. redis.call('hset', KEYS[1], ARGV[2], 1);
  31. redis.call('pexpire', KEYS[1], ARGV[1]);
  32. return nil;
  33. end;
  34. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  35. redis.call('hincrby', KEYS[1], ARGV[2], 1);
  36. redis.call('pexpire', KEYS[1], ARGV[1]);
  37. return nil;
  38. end;
  39. return redis.call('pttl', KEYS[1]);
  40. ";
  41. /// <summary>
  42. /// – KEYS[1] :需要加锁的key,这里需要是字符串类型。
  43. /// – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
  44. /// – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
  45. /// – ARGV[2] :锁的超时时间,防止死锁
  46. /// – ARGV[3] :锁的唯一标识
  47. /// </summary>
  48. private const String UnLockScript = @"
  49. if (redis.call('exists', KEYS[1]) == 0) then
  50. redis.call('publish', KEYS[2], ARGV[1]);
  51. return 1;
  52. end;
  53. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  54. return nil;
  55. end;
  56. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  57. if (counter > 0) then
  58. redis.call('pexpire', KEYS[1], ARGV[2]);
  59. return 0;
  60. else
  61. redis.call('del', KEYS[1]);
  62. redis.call('publish', KEYS[2], ARGV[1]);
  63. return 1;
  64. end;
  65. return nil;
  66. ";
  67. private const double ClockDriveFactor = 0.01;
  68. /// <summary>
  69. /// 默认的30秒过期时间
  70. /// </summary>
  71. private readonly TimeSpan _leaseTimeSpan = new TimeSpan(0, 0, 30);
  72. private readonly ConcurrentDictionary<string, CancellationTokenSource> _expirationRenewalMap = new ConcurrentDictionary<string, CancellationTokenSource>();
  73. private ConnectionMultiplexer _server;
  74. #endregion
  75. #region Constructor
  76. /// <summary>
  77. /// 默认连接127.0.0.1:6379,synctimeout=20000
  78. /// </summary>
  79. /// <param name="connstr"></param>
  80. public RedisLock(string connstr = "127.0.0.1:6379,synctimeout=20000")
  81. {
  82. _server = ConnectionMultiplexer.Connect(connstr);
  83. _server.PreserveAsyncOrder = false;
  84. }
  85. #endregion
  86. #region Public
  87. /// <summary>
  88. /// 加锁
  89. /// </summary>
  90. /// <param name="resource">锁名</param>
  91. /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长</param>
  92. /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间</param>
  93. /// <param name="lockObject">锁成功信息包装成对象返回</param>
  94. /// <returns>true:成功</returns>
  95. public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
  96. {
  97. lockObject = null;
  98. try
  99. {
  100. var startTime = DateTime.Now;
  101. var val = CreateUniqueLockId();
  102. //申请锁,返回还剩余的锁过期时间
  103. var ttl = TryAcquire(resource, val, leaseTimeSpan);
  104. var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  105. var validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  106. // 如果为空,表示申请锁成功
  107. if (ttl.IsNull)
  108. {
  109. lockObject = new Lock(resource, val, validityTime);
  110. //开始一个调度程序
  111. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  112. return true;
  113. }
  114. // 订阅监听redis消息
  115. Subscriber(resource);
  116. startTime = DateTime.Now;
  117. while (true)
  118. {
  119. // 再次尝试一次申请锁
  120. ttl = TryAcquire(resource, val, leaseTimeSpan);
  121. // 获得锁,返回
  122. if (ttl.IsNull)
  123. {
  124. lockObject = new Lock(resource, val, validityTime);
  125. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  126. return true;
  127. }
  128. drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  129. validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  130. if (validityTime.TotalMilliseconds < 0)
  131. {
  132. //说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
  133. //Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
  134. return false;
  135. }
  136. }
  137. }
  138. catch (Exception)
  139. {
  140. return false;
  141. }
  142. finally
  143. {
  144. // 无论是否获得锁,都要取消订阅解锁消息
  145. UnSubscriber(resource);
  146. }
  147. }
  148. /// <summary>
  149. /// 解锁
  150. /// </summary>
  151. /// <param name="lockObject">锁成功的返回对象</param>
  152. /// <returns></returns>
  153. public RedisResult UnLock(Lock lockObject)
  154. {
  155. if (lockObject == null)
  156. {
  157. return null;
  158. }
  159. CancelExpirationRenewal(lockObject);
  160. RedisKey[] key =
  161. {
  162. lockObject.Resource,
  163. GetChannelName(lockObject.Resource)
  164. };
  165. RedisValue[] values =
  166. {
  167. Thread.CurrentThread.ManagedThreadId,
  168. 10000,
  169. lockObject.Value
  170. };
  171. return _server.GetDatabase().ScriptEvaluate(UnLockScript, key, values);
  172. }
  173. #endregion
  174. #region Private
  175. private void Subscriber(RedisKey resource)
  176. {
  177. //Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
  178. //var aa = Thread.CurrentThread.ManagedThreadId;
  179. ISubscriber sub = _server.GetSubscriber();
  180. sub.Subscribe(GetChannelName(resource), (channel, message) =>
  181. {
  182. //Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
  183. });
  184. }
  185. private void UnSubscriber(RedisKey resource)
  186. {
  187. ISubscriber sub = _server.GetSubscriber();
  188. sub.Unsubscribe(GetChannelName(resource));
  189. }
  190. private string GetChannelName(RedisKey resource)
  191. {
  192. return "redisson_lock__channel__{" + resource.ToString() + "}";
  193. }
  194. private RedisResult TryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
  195. {
  196. if (leaseTimeSpan != null)
  197. {
  198. return LockInnerAsync(resource, leaseTimeSpan.Value, value);
  199. }
  200. return LockInnerAsync(resource, value);
  201. }
  202. private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
  203. {
  204. RedisKey[] key =
  205. {
  206. resource
  207. };
  208. RedisValue[] values =
  209. {
  210. waitTime.TotalMilliseconds,
  211. threadId
  212. };
  213. return _server.GetDatabase().ScriptEvaluate(LockScript, key, values);
  214. }
  215. private RedisResult LockInnerAsync(RedisKey resource, string threadId)
  216. {
  217. var task = LockInnerAsync(resource, _leaseTimeSpan, threadId);
  218. return task;
  219. }
  220. /// <summary>
  221. /// 创建唯一锁id
  222. /// </summary>
  223. /// <returns></returns>
  224. protected static string CreateUniqueLockId()
  225. {
  226. return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
  227. }
  228. /// <summary>
  229. /// 设置超时
  230. /// </summary>
  231. /// <param name="doWork"></param>
  232. /// <param name="time"></param>
  233. protected void SetTimeOut(ElapsedEventHandler doWork, int time)
  234. {
  235. System.Timers.Timer timer = new System.Timers.Timer();
  236. timer.Interval = time;
  237. timer.Elapsed += (sender, args) => timer.Stop();
  238. timer.Elapsed += doWork;
  239. timer.Start();
  240. }
  241. /// <summary>
  242. /// 任务超时
  243. /// </summary>
  244. /// <param name="action"></param>
  245. /// <param name="lockObj"></param>
  246. /// <param name="time"></param>
  247. /// <returns></returns>
  248. protected CancellationTokenSource TaskTimeOut(Func<Lock, bool> action, Lock lockObj, int time)
  249. {
  250. var timeoutCancellationTokenSource = new CancellationTokenSource();
  251. Task.Run(() =>
  252. {
  253. SpinWait.SpinUntil(() => !timeoutCancellationTokenSource.IsCancellationRequested);
  254. }, timeoutCancellationTokenSource.Token);
  255. return timeoutCancellationTokenSource;
  256. }
  257. private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
  258. {
  259. ScheduleExpirationRenewal((lockObj) => _server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject, Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
  260. }
  261. private void ScheduleExpirationRenewal(Func<Lock, bool> action, Lock lockObj, int time)
  262. {
  263. // 保证任务不会被重复创建
  264. if (_expirationRenewalMap.ContainsKey(lockObj.Resource))
  265. {
  266. return;
  267. }
  268. var task = TaskTimeOut(action, lockObj, time);
  269. //如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建
  270. if (!_expirationRenewalMap.TryAdd(lockObj.Resource, task))
  271. {
  272. task.Cancel();
  273. }
  274. }
  275. private void CancelExpirationRenewal(Lock lockObj)
  276. {
  277. CancellationTokenSource task;
  278. if (_expirationRenewalMap.TryRemove(lockObj.Resource, out task))
  279. {
  280. task?.Cancel();
  281. }
  282. }
  283. #endregion
  284. /// <summary>执行与释放或重置非托管资源关联的应用程序定义的任务。</summary>
  285. public void Dispose()
  286. {
  287. Dispose(true);
  288. GC.SuppressFinalize(this);
  289. }
  290. /// <summary>
  291. /// 释放锁
  292. /// </summary>
  293. /// <param name="disposing"></param>
  294. public virtual void Dispose(bool disposing)
  295. {
  296. if (_isDisposed)
  297. {
  298. return;
  299. }
  300. _server?.Dispose();
  301. _isDisposed = true;
  302. //_server = null;
  303. }
  304. }
  305. }