using Apq.Cfg.Sources; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration.Memory; using SqlSugar; namespace Apq.Cfg.Database; /// /// 数据库配置源 /// internal sealed class DatabaseCfgSource : IWritableCfgSource { private readonly SqlSugarDatabaseProvider _databaseProvider; private readonly DatabaseOptions _options; public DatabaseCfgSource(DatabaseOptions options, int level, bool isPrimaryWriter) { _options = options; Level = level; IsPrimaryWriter = isPrimaryWriter; _databaseProvider = CreateProvider(options.Provider!); } private static SqlSugarDatabaseProvider CreateProvider(string providerName) { var dbType = providerName.ToLowerInvariant() switch { "sqlserver" => DbType.SqlServer, "mysql" => DbType.MySql, "postgresql" or "postgres" => DbType.PostgreSQL, "oracle" => DbType.Oracle, "sqlite" => DbType.Sqlite, _ => throw new ArgumentException($"不支持的数据库提供程序: '{providerName}'", nameof(providerName)) }; return new SqlSugarDatabaseProvider(dbType); } public int Level { get; } public bool IsWriteable => true; public bool IsPrimaryWriter { get; } public IConfigurationSource BuildSource() { var data = new List>(); try { using var cts = new CancellationTokenSource(_options.CommandTimeoutMs); var configData = _databaseProvider.LoadConfigurationAsync( _options.ConnectionString!, _options.Table!, _options.KeyColumn!, _options.ValueColumn!, cts.Token).GetAwaiter().GetResult(); foreach (var (key, value) in configData) data.Add(new KeyValuePair(key, value)); } catch { } return new MemoryConfigurationSource { InitialData = data }; } public async Task ApplyChangesAsync(IReadOnlyDictionary changes, CancellationToken cancellationToken) { using var timeoutCts = new CancellationTokenSource(_options.CommandTimeoutMs); using var linked = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, timeoutCts.Token); await _databaseProvider.ApplyChangesAsync( _options.ConnectionString!, _options.Table!, _options.KeyColumn!, _options.ValueColumn!, changes, linked.Token).ConfigureAwait(false); } }