using StackExchange.Redis;
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
namespace Masuit.Tools.Systems
{
///
/// Redis分布式锁
///
public class RedisLock : IDisposable
{
#region Property
private bool _isDisposed;
///
/// 终结器
///
~RedisLock()
{
Dispose(false);
}
///
/// KEYS[1] :需要加锁的key,这里需要是字符串类型。
/// ARGV[1] :锁的超时时间,防止死锁
/// ARGV[2] :锁的唯一标识
///
private const String LockScript = @"
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
return redis.call('pttl', KEYS[1]);
";
///
/// – KEYS[1] :需要加锁的key,这里需要是字符串类型。
/// – KEYS[2] :redis消息的ChannelName,一个分布式锁对应唯一的一个channelName:“redisson_lock__channel__{” + getName() + “}”
/// – ARGV[1] :reids消息体,这里只需要一个字节的标记就可以,主要标记redis的key已经解锁,再结合redis的Subscribe,能唤醒其他订阅解锁消息的客户端线程申请锁。
/// – ARGV[2] :锁的超时时间,防止死锁
/// – ARGV[3] :锁的唯一标识
///
private const String UnLockScript = @"
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
";
private const double ClockDriveFactor = 0.01;
///
/// 默认的30秒过期时间
///
private readonly TimeSpan _leaseTimeSpan = new TimeSpan(0, 0, 30);
private readonly ConcurrentDictionary _expirationRenewalMap = new ConcurrentDictionary();
private ConnectionMultiplexer _server;
#endregion
#region Constructor
///
/// 默认连接127.0.0.1:6379,synctimeout=20000
///
///
public RedisLock(string connstr = "127.0.0.1:6379,synctimeout=20000")
{
_server = ConnectionMultiplexer.Connect(connstr);
_server.PreserveAsyncOrder = false;
}
#endregion
#region Public
///
/// 加锁
///
/// 锁名
/// 如果没有锁成功,允许动重试申请锁的最大时长
/// 如果锁成功,对于锁(key)的过期时间
/// 锁成功信息包装成对象返回
/// true:成功
public bool TryLock(RedisKey resource, TimeSpan waitTimeSpan, TimeSpan leaseTimeSpan, out Lock lockObject)
{
lockObject = null;
try
{
var startTime = DateTime.Now;
var val = CreateUniqueLockId();
//申请锁,返回还剩余的锁过期时间
var ttl = TryAcquire(resource, val, leaseTimeSpan);
var drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
var validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
// 如果为空,表示申请锁成功
if (ttl.IsNull)
{
lockObject = new Lock(resource, val, validityTime);
//开始一个调度程序
ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
return true;
}
// 订阅监听redis消息
Subscriber(resource);
startTime = DateTime.Now;
while (true)
{
// 再次尝试一次申请锁
ttl = TryAcquire(resource, val, leaseTimeSpan);
// 获得锁,返回
if (ttl.IsNull)
{
lockObject = new Lock(resource, val, validityTime);
ScheduleExpirationRenewal(leaseTimeSpan, lockObject);
return true;
}
drift = Convert.ToInt32((waitTimeSpan.TotalMilliseconds * ClockDriveFactor) + 2);
validityTime = waitTimeSpan - (DateTime.Now - startTime) - new TimeSpan(0, 0, 0, 0, drift);
if (validityTime.TotalMilliseconds < 0)
{
//说明已经超过了客户端设置的最大wait time,则直接返回false,取消订阅,不再继续申请锁了。
//Console.WriteLine("已经超过了客户端设置的最大wait time,Thread ID:" + Thread.CurrentThread.ManagedThreadId);
return false;
}
}
}
catch (Exception)
{
return false;
}
finally
{
// 无论是否获得锁,都要取消订阅解锁消息
UnSubscriber(resource);
}
}
///
/// 解锁
///
/// 锁成功的返回对象
///
public RedisResult UnLock(Lock lockObject)
{
if (lockObject == null)
{
return null;
}
CancelExpirationRenewal(lockObject);
RedisKey[] key =
{
lockObject.Resource,
GetChannelName(lockObject.Resource)
};
RedisValue[] values =
{
Thread.CurrentThread.ManagedThreadId,
10000,
lockObject.Value
};
return _server.GetDatabase().ScriptEvaluate(UnLockScript, key, values);
}
#endregion
#region Private
private void Subscriber(RedisKey resource)
{
//Console.WriteLine("Thread ID:" + Thread.CurrentThread.ManagedThreadId + " 订阅广播");
//var aa = Thread.CurrentThread.ManagedThreadId;
ISubscriber sub = _server.GetSubscriber();
sub.Subscribe(GetChannelName(resource), (channel, message) =>
{
//Console.WriteLine("Thread ID:" + aa + ",收到广播:Thread ID:" + message + " 已解锁");
});
}
private void UnSubscriber(RedisKey resource)
{
ISubscriber sub = _server.GetSubscriber();
sub.Unsubscribe(GetChannelName(resource));
}
private string GetChannelName(RedisKey resource)
{
return "redisson_lock__channel__{" + resource.ToString() + "}";
}
private RedisResult TryAcquire(RedisKey resource, string value, TimeSpan? leaseTimeSpan)
{
if (leaseTimeSpan != null)
{
return LockInnerAsync(resource, leaseTimeSpan.Value, value);
}
return LockInnerAsync(resource, value);
}
private RedisResult LockInnerAsync(RedisKey resource, TimeSpan waitTime, string threadId)
{
RedisKey[] key =
{
resource
};
RedisValue[] values =
{
waitTime.TotalMilliseconds,
threadId
};
return _server.GetDatabase().ScriptEvaluate(LockScript, key, values);
}
private RedisResult LockInnerAsync(RedisKey resource, string threadId)
{
var task = LockInnerAsync(resource, _leaseTimeSpan, threadId);
return task;
}
///
/// 创建唯一锁id
///
///
protected static string CreateUniqueLockId()
{
return string.Concat(Guid.NewGuid().ToString(), Thread.CurrentThread.ManagedThreadId);
}
///
/// 设置超时
///
///
///
protected void SetTimeOut(ElapsedEventHandler doWork, int time)
{
System.Timers.Timer timer = new System.Timers.Timer();
timer.Interval = time;
timer.Elapsed += (sender, args) => timer.Stop();
timer.Elapsed += doWork;
timer.Start();
}
///
/// 任务超时
///
///
///
///
///
protected CancellationTokenSource TaskTimeOut(Func action, Lock lockObj, int time)
{
var timeoutCancellationTokenSource = new CancellationTokenSource();
Task.Run(() =>
{
SpinWait.SpinUntil(() => !timeoutCancellationTokenSource.IsCancellationRequested);
}, timeoutCancellationTokenSource.Token);
return timeoutCancellationTokenSource;
}
private void ScheduleExpirationRenewal(TimeSpan leaseTimeSpan, Lock lockObject)
{
ScheduleExpirationRenewal((lockObj) => _server.GetDatabase().KeyExpire(lockObj.Resource, leaseTimeSpan), lockObject, Convert.ToInt32(leaseTimeSpan.TotalMilliseconds) / 3);
}
private void ScheduleExpirationRenewal(Func action, Lock lockObj, int time)
{
// 保证任务不会被重复创建
if (_expirationRenewalMap.ContainsKey(lockObj.Resource))
{
return;
}
var task = TaskTimeOut(action, lockObj, time);
//如果已经存在,停止任务,也是为了在极端的并发情况下,保证任务不会被重复创建
if (!_expirationRenewalMap.TryAdd(lockObj.Resource, task))
{
task.Cancel();
}
}
private void CancelExpirationRenewal(Lock lockObj)
{
CancellationTokenSource task;
if (_expirationRenewalMap.TryRemove(lockObj.Resource, out task))
{
task?.Cancel();
}
}
#endregion
/// 执行与释放或重置非托管资源关联的应用程序定义的任务。
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
///
/// 释放锁
///
///
public virtual void Dispose(bool disposing)
{
if (_isDisposed)
{
return;
}
_server?.Dispose();
_isDisposed = true;
//_server = null;
}
}
}