using Apq.Cfg.Sources; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration.Memory; using StackExchange.Redis; namespace Apq.Cfg.Redis; /// /// Redis 配置源 /// internal sealed class RedisCfgSource : IWritableCfgSource, IDisposable { private const int MinScanPageSize = 10; private const int MaxScanPageSize = 1000; private readonly ConnectionMultiplexer _multiplexer; private readonly RedisOptions _options; private volatile bool _disposed; /// /// 初始化 RedisCfgSource 实例 /// /// Redis 连接选项 /// 配置层级,数值越大优先级越高 /// 是否为主要写入源 public RedisCfgSource(RedisOptions options, int level, bool isPrimaryWriter) { _options = options; Level = level; IsPrimaryWriter = isPrimaryWriter; var conn = EnsureAllowAdmin(options.ConnectionString!); if (_options.ConnectTimeoutMs > 0) conn += $",connectTimeout={_options.ConnectTimeoutMs}"; if (_options.OperationTimeoutMs > 0) conn += $",syncTimeout={_options.OperationTimeoutMs}"; conn += ",abortConnect=false"; _multiplexer = ConnectionMultiplexer.Connect(conn); } /// /// 获取配置层级,数值越大优先级越高 /// public int Level { get; } /// /// 获取是否可写,Redis 支持通过 API 写入配置,因此始终为 true /// public bool IsWriteable => true; /// /// 获取是否为主要写入源,用于标识当多个可写源存在时的主要写入目标 /// public bool IsPrimaryWriter { get; } /// /// 释放资源,关闭 Redis 连接 /// public void Dispose() { if (_disposed) return; _disposed = true; try { _multiplexer?.Dispose(); } catch { } } /// /// 构建 Microsoft.Extensions.Configuration 的内存配置源,从 Redis 加载数据 /// /// Microsoft.Extensions.Configuration.Memory.MemoryConfigurationSource 实例 /// 当对象已释放时抛出 public IConfigurationSource BuildSource() { ThrowIfDisposed(); var data = new List>(); if (string.IsNullOrWhiteSpace(_options.ConnectionString)) return new MemoryConfigurationSource { InitialData = data }; try { var db = _multiplexer.GetDatabase(_options.Database ?? -1); var endpoints = _multiplexer.GetEndPoints(); var server = endpoints.Length > 0 ? _multiplexer.GetServer(endpoints[0]) : null; if (server != null) { var pattern = string.IsNullOrEmpty(_options.KeyPrefix) ? "*" : _options.KeyPrefix + "*"; var pageSize = Math.Clamp(_options.ScanPageSize, MinScanPageSize, MaxScanPageSize); var prefixLen = _options.KeyPrefix?.Length ?? 0; foreach (var key in server.Keys(db.Database, pattern, pageSize)) { var val = db.StringGet(key); var keyStr = key.ToString(); if (!string.IsNullOrEmpty(keyStr)) { // 去掉前缀,还原为原始配置 key var configKey = prefixLen > 0 ? keyStr.Substring(prefixLen) : keyStr; data.Add(new KeyValuePair(configKey, val.HasValue ? val.ToString() : null)); } } } } catch { } return new MemoryConfigurationSource { InitialData = data }; } /// /// 应用配置更改到 Redis /// /// 要应用的配置更改 /// 取消令牌 /// 表示异步操作的任务 /// 当对象已释放时抛出 public async Task ApplyChangesAsync(IReadOnlyDictionary changes, CancellationToken cancellationToken) { ThrowIfDisposed(); if (string.IsNullOrWhiteSpace(_options.ConnectionString)) return; var db = _multiplexer.GetDatabase(_options.Database ?? -1); var batch = db.CreateBatch(); var tasks = new List(); foreach (var (key, value) in changes) { var k = string.IsNullOrEmpty(_options.KeyPrefix) ? key : _options.KeyPrefix + key; tasks.Add(value is null ? batch.KeyDeleteAsync(k) : batch.StringSetAsync(k, value)); } batch.Execute(); await Task.WhenAll(tasks).ConfigureAwait(false); if (!string.IsNullOrWhiteSpace(_options.Channel)) { try { var sub = _multiplexer.GetSubscriber(); await sub.PublishAsync(RedisChannel.Literal(_options.Channel!), "update").ConfigureAwait(false); } catch { } } } /// /// 检查对象是否已释放,如果已释放则抛出异常 /// /// 当对象已释放时抛出 private void ThrowIfDisposed() { if (_disposed) throw new ObjectDisposedException(nameof(RedisCfgSource)); } /// /// 确保连接字符串包含 allowAdmin 选项 /// /// 原始连接字符串 /// 包含 allowAdmin 选项的连接字符串 private static string EnsureAllowAdmin(string connectionString) { return connectionString.Contains("allowAdmin", StringComparison.OrdinalIgnoreCase) ? connectionString : connectionString + ",allowAdmin=true"; } }