RedisLock.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using System.Timers;
  6. using StackExchange.Redis;
  7. namespace Masuit.Tools.Core.Systems
  8. {
  9. public class RedisLock : IDisposable
  10. {
  11. #region Property
  12. private bool isDisposed;
  13. ~RedisLock()
  14. {
  15. Dispose(false);
  16. }
  17. /// <summary>
  18. /// KEYS[1] :需要加锁的key,这里需要是字符串类型。
  19. /// ARGV[1] :锁的超时时间,防止死锁
  20. /// ARGV[2] :锁的唯一标识
  21. /// </summary>
  22. private const String LockScript = @"
  23. if (redis.call('exists', KEYS[1]) == 0) then
  24. redis.call('hset', KEYS[1], ARGV[2], 1);
  25. redis.call('pexpire', KEYS[1], ARGV[1]);
  26. return nil;
  27. end;
  28. if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
  29. redis.call('hincrby', KEYS[1], ARGV[2], 1);
  30. redis.call('pexpire', KEYS[1], ARGV[1]);
  31. return nil;
  32. end;
  33. return redis.call('pttl', KEYS[1]);
  34. ";
  35. /// <summary>
  36. /// – KEYS[1] :需要加锁的key,这里需要是字符串类型。
  37. /// – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
  38. /// – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
  39. /// – ARGV[2] :锁的超时时间,防止死锁
  40. /// – ARGV[3] :锁的唯一标识
  41. /// </summary>
  42. private const String UnLockScript = @"
  43. if (redis.call('exists', KEYS[1]) == 0) then
  44. redis.call('publish', KEYS[2], ARGV[1]);
  45. return 1;
  46. end;
  47. if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
  48. return nil;
  49. end;
  50. local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
  51. if (counter > 0) then
  52. redis.call('pexpire', KEYS[1], ARGV[2]);
  53. return 0;
  54. else
  55. redis.call('del', KEYS[1]);
  56. redis.call('publish', KEYS[2], ARGV[1]);
  57. return 1;
  58. end;
  59. return nil;
  60. ";
  61. private const double ClockDriveFactor = 0.01;
  62. /// <summary>
  63. /// 默认的30秒过期时间
  64. /// </summary>
  65. private readonly TimeSpan _leaseTimeSpan = new TimeSpan(0, 0, 30);
  66. private readonly ConcurrentDictionary<string, CancellationTokenSource> _expirationRenewalMap = new ConcurrentDictionary<string, CancellationTokenSource>();
  67. private ConnectionMultiplexer _server;
  68. #endregion
  69. #region Constructor
  70. public RedisLock(string connstr = "127.0.0.1:6379,synctimeout=20000")
  71. {
  72. _server = ConnectionMultiplexer.Connect(connstr);
  73. _server.PreserveAsyncOrder = false;
  74. }
  75. #endregion
  76. #region Public
  77. /// <summary>
  78. /// 加锁
  79. /// </summary>
  80. /// <param name="resource">锁名</param>
  81. /// <param name="waitTimeSpan">如果没有锁成功,允许动重试申请锁的最大时长</param>
  82. /// <param name="leaseTimeSpan">如果锁成功,对于锁(key)的过期时间</param>
  83. /// <param name="lockObject">锁成功信息包装成对象返回</param>
  84. /// <returns>true:成功</returns>
  85. public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
  86. {
  87. lockObject = null;
  88. try
  89. {
  90. var startTime = DateTime.Now;
  91. var val = CreateUniqueLockId();
  92. //申请锁,返回还剩余的锁过期时间
  93. var ttl = TryAcquire(resource, val, leaseTimeSpan);
  94. var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  95. var validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  96. // 如果为空,表示申请锁成功
  97. if (ttl.IsNull)
  98. {
  99. lockObject = new Lock(resource, val, validityTime);
  100. //开始一个调度程序
  101. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  102. return true;
  103. }
  104. // 订阅监听redis消息
  105. Subscriber(resource);
  106. startTime = DateTime.Now;
  107. while (true)
  108. {
  109. // 再次尝试一次申请锁
  110. ttl = TryAcquire(resource, val, leaseTimeSpan);
  111. // 获得锁,返回
  112. if (ttl.IsNull)
  113. {
  114. lockObject = new Lock(resource, val, validityTime);
  115. ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
  116. return true;
  117. }
  118. drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
  119. validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
  120. if (validityTime.TotalMilliseconds < 0)
  121. {
  122. //说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
  123. //Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
  124. return false;
  125. }
  126. }
  127. }
  128. catch (Exception)
  129. {
  130. return false;
  131. }
  132. finally
  133. {
  134. // 无论是否获得锁,都要取消订阅解锁消息
  135. UnSubscriber(resource);
  136. }
  137. }
  138. /// <summary>
  139. /// 解锁
  140. /// </summary>
  141. /// <param name="lockObject">锁成功的返回对象</param>
  142. /// <returns></returns>
  143. public RedisResult UnLock(Lock lockObject)
  144. {
  145. if (lockObject == null) return null;
  146. CancelExpirationRenewal(lockObject);
  147. RedisKey[] key = { lockObject.Resource, GetChannelName(lockObject.Resource) };
  148. RedisValue[] values = { Thread.CurrentThread.ManagedThreadId, 10000, lockObject.Value };
  149. return _server.GetDatabase().ScriptEvaluate(UnLockScript, key, values);
  150. }
  151. #endregion
  152. #region Private
  153. private void Subscriber(RedisKey resource)
  154. {
  155. //Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
  156. //var aa = Thread.CurrentThread.ManagedThreadId;
  157. ISubscriber sub = _server.GetSubscriber();
  158. sub.Subscribe(GetChannelName(resource), (channel, message) =>
  159. {
  160. //Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
  161. });
  162. }
  163. private void UnSubscriber(RedisKey resource)
  164. {
  165. ISubscriber sub = _server.GetSubscriber();
  166. sub.Unsubscribe(GetChannelName(resource));
  167. }
  168. private string GetChannelName(RedisKey resource)
  169. {
  170. return "redisson_lock__channel__{" + resource.ToString() + "}";
  171. }
  172. private RedisResult TryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
  173. {
  174. if (leaseTimeSpan != null)
  175. {
  176. return LockInnerAsync(resource, leaseTimeSpan.Value, value);
  177. }
  178. return LockInnerAsync(resource, value);
  179. }
  180. private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
  181. {
  182. RedisKey[] key = { resource };
  183. RedisValue[] values = { waitTime.TotalMilliseconds, threadId };
  184. return _server.GetDatabase().ScriptEvaluate(LockScript, key, values);
  185. }
  186. private RedisResult LockInnerAsync(RedisKey resource, string threadId)
  187. {
  188. var task = LockInnerAsync(resource, _leaseTimeSpan, threadId);
  189. return task;
  190. }
  191. protected static string CreateUniqueLockId()
  192. {
  193. return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
  194. }
  195. protected void SetTimeOut(ElapsedEventHandler doWork, int time)
  196. {
  197. System.Timers.Timer timer = new System.Timers.Timer();
  198. timer.Interval = time;
  199. timer.Elapsed += (sender, args) => timer.Stop();
  200. timer.Elapsed += doWork;
  201. timer.Start();
  202. }
  203. protected CancellationTokenSource TaskTimeOut(Func<Lock, bool> action, Lock lockObj, int time)
  204. {
  205. var timeoutCancellationTokenSource = new CancellationTokenSource();
  206. var task = Task.Run(() =>
  207. {
  208. SpinWait.SpinUntil(() => !timeoutCancellationTokenSource.IsCancellationRequested);
  209. if (action(lockObj))
  210. {
  211. //Console.WriteLine("锁:" + lockObj.Value + " 重置过期时间");
  212. }
  213. //while (!timeoutCancellationTokenSource.IsCancellationRequested)
  214. //{
  215. // Thread.Sleep(time);
  216. // if (action(lockObj))
  217. // {
  218. // Console.WriteLine("锁:" + lockObj.Value + " 重置过期时间");
  219. // }
  220. //}
  221. }, timeoutCancellationTokenSource.Token);
  222. return timeoutCancellationTokenSource;
  223. }
  224. private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
  225. {
  226. ScheduleExpirationRenewal((lockObj) => _server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject, Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
  227. }
  228. private void ScheduleExpirationRenewal(Func<Lock, bool> action, Lock lockObj, int time)
  229. {
  230. // 保证任务不会被重复创建
  231. if (_expirationRenewalMap.ContainsKey(lockObj.Resource))
  232. {
  233. return;
  234. }
  235. var task = TaskTimeOut(action, lockObj, time);
  236. //如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建
  237. if (!_expirationRenewalMap.TryAdd(lockObj.Resource, task))
  238. {
  239. task.Cancel();
  240. }
  241. }
  242. private void CancelExpirationRenewal(Lock lockObj)
  243. {
  244. CancellationTokenSource task;
  245. if (_expirationRenewalMap.TryRemove(lockObj.Resource, out task))
  246. {
  247. task?.Cancel();
  248. }
  249. }
  250. #endregion
  251. /// <summary>执行与释放或重置非托管资源关联的应用程序定义的任务。</summary>
  252. public void Dispose()
  253. {
  254. Dispose(true);
  255. GC.SuppressFinalize(this);
  256. }
  257. public virtual void Dispose(bool disposing)
  258. {
  259. if (isDisposed)
  260. {
  261. return;
  262. }
  263. _server?.Dispose();
  264. isDisposed = true;
  265. //_server = null;
  266. }
  267. }
  268. }