ConsulCfgSource.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using System.Text.Json;
  4. using Apq.Cfg.Sources;
  5. using Consul;
  6. using Microsoft.Extensions.Configuration;
  7. using Microsoft.Extensions.Configuration.Memory;
  8. using Microsoft.Extensions.Primitives;
  9. namespace Apq.Cfg.Consul;
  10. /// <summary>
  11. /// Consul 配置源
  12. /// </summary>
  13. internal sealed class ConsulCfgSource : IWritableCfgSource, IDisposable
  14. {
  15. private readonly ConsulCfgOptions _options;
  16. private readonly ConsulClient _client;
  17. private readonly ConcurrentDictionary<string, string?> _data;
  18. private readonly CancellationTokenSource _watchCts;
  19. private ulong _lastIndex;
  20. private volatile bool _disposed;
  21. private Task? _watchTask;
  22. private ConfigurationReloadToken _reloadToken;
  23. private readonly object _reloadTokenLock = new();
  24. public ConsulCfgSource(ConsulCfgOptions options, int level, bool isPrimaryWriter)
  25. {
  26. _options = options;
  27. Level = level;
  28. IsPrimaryWriter = isPrimaryWriter;
  29. _data = new ConcurrentDictionary<string, string?>();
  30. _watchCts = new CancellationTokenSource();
  31. _reloadToken = new ConfigurationReloadToken();
  32. _client = new ConsulClient(config =>
  33. {
  34. config.Address = new Uri(options.Address);
  35. if (!string.IsNullOrEmpty(options.Token))
  36. config.Token = options.Token;
  37. if (!string.IsNullOrEmpty(options.Datacenter))
  38. config.Datacenter = options.Datacenter;
  39. config.WaitTime = options.WaitTime;
  40. });
  41. // 初始加载
  42. LoadDataAsync().GetAwaiter().GetResult();
  43. // 启动热重载监听
  44. if (options.EnableHotReload)
  45. {
  46. _watchTask = WatchForChangesAsync(_watchCts.Token);
  47. }
  48. }
  49. /// <summary>
  50. /// 获取配置层级,数值越大优先级越高
  51. /// </summary>
  52. public int Level { get; }
  53. /// <summary>
  54. /// 获取是否可写,Consul 支持通过 API 写入配置,因此始终为 true
  55. /// </summary>
  56. public bool IsWriteable => true;
  57. /// <summary>
  58. /// 获取是否为主要写入源,用于标识当多个可写源存在时的主要写入目标
  59. /// </summary>
  60. public bool IsPrimaryWriter { get; }
  61. /// <summary>
  62. /// 释放资源,取消所有异步操作并释放 Consul 客户端
  63. /// </summary>
  64. public void Dispose()
  65. {
  66. if (_disposed) return;
  67. _disposed = true;
  68. _watchCts.Cancel();
  69. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  70. catch { }
  71. _watchCts.Dispose();
  72. _client.Dispose();
  73. }
  74. /// <summary>
  75. /// 构建 Microsoft.Extensions.Configuration 的配置源
  76. /// </summary>
  77. /// <returns>Microsoft.Extensions.Configuration.IConfigurationSource 实例</returns>
  78. /// <exception cref="ObjectDisposedException">当对象已释放时抛出</exception>
  79. public IConfigurationSource BuildSource()
  80. {
  81. ThrowIfDisposed();
  82. return new ConsulConfigurationSource(this);
  83. }
  84. /// <summary>
  85. /// 应用配置更改到 Consul
  86. /// </summary>
  87. /// <param name="changes">要应用的配置更改</param>
  88. /// <param name="cancellationToken">取消令牌</param>
  89. /// <returns>表示异步操作的任务</returns>
  90. /// <exception cref="ObjectDisposedException">当对象已释放时抛出</exception>
  91. public async Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  92. {
  93. ThrowIfDisposed();
  94. foreach (var (key, value) in changes)
  95. {
  96. var consulKey = GetConsulKey(key);
  97. if (value is null)
  98. {
  99. await _client.KV.Delete(consulKey, cancellationToken).ConfigureAwait(false);
  100. _data.TryRemove(key, out _);
  101. }
  102. else
  103. {
  104. var pair = new KVPair(consulKey) { Value = Encoding.UTF8.GetBytes(value) };
  105. await _client.KV.Put(pair, cancellationToken).ConfigureAwait(false);
  106. _data[key] = value;
  107. }
  108. }
  109. }
  110. private async Task LoadDataAsync()
  111. {
  112. try
  113. {
  114. switch (_options.DataFormat)
  115. {
  116. case ConsulDataFormat.KeyValue:
  117. await LoadKeyValueDataAsync().ConfigureAwait(false);
  118. break;
  119. case ConsulDataFormat.Json:
  120. await LoadJsonDataAsync().ConfigureAwait(false);
  121. break;
  122. case ConsulDataFormat.Yaml:
  123. await LoadYamlDataAsync().ConfigureAwait(false);
  124. break;
  125. }
  126. }
  127. catch
  128. {
  129. // 连接失败时保持空数据
  130. }
  131. }
  132. private async Task LoadKeyValueDataAsync()
  133. {
  134. var prefix = _options.KeyPrefix ?? "";
  135. var result = await _client.KV.List(prefix).ConfigureAwait(false);
  136. _lastIndex = result.LastIndex;
  137. if (result.Response == null) return;
  138. _data.Clear();
  139. foreach (var pair in result.Response)
  140. {
  141. if (pair.Value == null) continue;
  142. var key = pair.Key;
  143. // 去掉前缀
  144. if (!string.IsNullOrEmpty(prefix) && key.StartsWith(prefix))
  145. key = key.Substring(prefix.Length);
  146. // 将 Consul 的 / 分隔符转换为配置的 : 分隔符
  147. key = key.Replace('/', ':');
  148. var value = Encoding.UTF8.GetString(pair.Value);
  149. _data[key] = value;
  150. }
  151. }
  152. private async Task LoadJsonDataAsync()
  153. {
  154. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  155. var result = await _client.KV.Get(key).ConfigureAwait(false);
  156. _lastIndex = result.LastIndex;
  157. if (result.Response?.Value == null) return;
  158. var json = Encoding.UTF8.GetString(result.Response.Value);
  159. var flatData = FlattenJson(json);
  160. _data.Clear();
  161. foreach (var (k, v) in flatData)
  162. {
  163. _data[k] = v;
  164. }
  165. }
  166. private async Task LoadYamlDataAsync()
  167. {
  168. // YAML 支持需要额外依赖,这里简单处理为不支持
  169. // 如果需要 YAML 支持,可以引用 YamlDotNet
  170. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  171. var result = await _client.KV.Get(key).ConfigureAwait(false);
  172. _lastIndex = result.LastIndex;
  173. if (result.Response?.Value == null) return;
  174. // 暂时将整个 YAML 内容作为单个值存储
  175. var yaml = Encoding.UTF8.GetString(result.Response.Value);
  176. _data["_raw"] = yaml;
  177. }
  178. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  179. {
  180. while (!cancellationToken.IsCancellationRequested)
  181. {
  182. try
  183. {
  184. var prefix = _options.KeyPrefix ?? "";
  185. var queryOptions = new QueryOptions
  186. {
  187. WaitIndex = _lastIndex,
  188. WaitTime = _options.WaitTime
  189. };
  190. QueryResult<KVPair[]>? result;
  191. if (_options.DataFormat == ConsulDataFormat.KeyValue)
  192. {
  193. result = await _client.KV.List(prefix, queryOptions, cancellationToken).ConfigureAwait(false);
  194. }
  195. else
  196. {
  197. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  198. var singleResult = await _client.KV.Get(key, queryOptions, cancellationToken).ConfigureAwait(false);
  199. result = new QueryResult<KVPair[]>
  200. {
  201. LastIndex = singleResult.LastIndex,
  202. Response = singleResult.Response != null ? new[] { singleResult.Response } : Array.Empty<KVPair>()
  203. };
  204. }
  205. if (result.LastIndex > _lastIndex)
  206. {
  207. _lastIndex = result.LastIndex;
  208. await LoadDataAsync().ConfigureAwait(false);
  209. OnReload();
  210. }
  211. }
  212. catch (OperationCanceledException)
  213. {
  214. break;
  215. }
  216. catch
  217. {
  218. // 连接失败,等待后重试
  219. try
  220. {
  221. await Task.Delay(_options.ReconnectInterval, cancellationToken).ConfigureAwait(false);
  222. }
  223. catch (OperationCanceledException)
  224. {
  225. break;
  226. }
  227. }
  228. }
  229. }
  230. private void OnReload()
  231. {
  232. ConfigurationReloadToken previousToken;
  233. lock (_reloadTokenLock)
  234. {
  235. previousToken = _reloadToken;
  236. _reloadToken = new ConfigurationReloadToken();
  237. }
  238. previousToken.OnReload();
  239. }
  240. private string GetConsulKey(string configKey)
  241. {
  242. // 将配置的 : 分隔符转换为 Consul 的 / 分隔符
  243. var consulKey = configKey.Replace(':', '/');
  244. var prefix = _options.KeyPrefix ?? "";
  245. return string.IsNullOrEmpty(prefix) ? consulKey : prefix + consulKey;
  246. }
  247. private static Dictionary<string, string?> FlattenJson(string json)
  248. {
  249. var result = new Dictionary<string, string?>();
  250. try
  251. {
  252. using var doc = JsonDocument.Parse(json);
  253. FlattenJsonElement(doc.RootElement, "", result);
  254. }
  255. catch
  256. {
  257. // JSON 解析失败
  258. }
  259. return result;
  260. }
  261. private static void FlattenJsonElement(JsonElement element, string prefix, Dictionary<string, string?> result)
  262. {
  263. switch (element.ValueKind)
  264. {
  265. case JsonValueKind.Object:
  266. foreach (var property in element.EnumerateObject())
  267. {
  268. var key = string.IsNullOrEmpty(prefix) ? property.Name : $"{prefix}:{property.Name}";
  269. FlattenJsonElement(property.Value, key, result);
  270. }
  271. break;
  272. case JsonValueKind.Array:
  273. var index = 0;
  274. foreach (var item in element.EnumerateArray())
  275. {
  276. var key = $"{prefix}:{index}";
  277. FlattenJsonElement(item, key, result);
  278. index++;
  279. }
  280. break;
  281. case JsonValueKind.String:
  282. result[prefix] = element.GetString();
  283. break;
  284. case JsonValueKind.Number:
  285. result[prefix] = element.GetRawText();
  286. break;
  287. case JsonValueKind.True:
  288. result[prefix] = "true";
  289. break;
  290. case JsonValueKind.False:
  291. result[prefix] = "false";
  292. break;
  293. case JsonValueKind.Null:
  294. result[prefix] = null;
  295. break;
  296. }
  297. }
  298. private void ThrowIfDisposed()
  299. {
  300. if (_disposed) throw new ObjectDisposedException(nameof(ConsulCfgSource));
  301. }
  302. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  303. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  304. internal IChangeToken GetReloadToken()
  305. {
  306. lock (_reloadTokenLock)
  307. {
  308. return _reloadToken;
  309. }
  310. }
  311. /// <summary>
  312. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  313. /// </summary>
  314. private sealed class ConsulConfigurationSource : IConfigurationSource
  315. {
  316. private readonly ConsulCfgSource _consulSource;
  317. public ConsulConfigurationSource(ConsulCfgSource consulSource)
  318. {
  319. _consulSource = consulSource;
  320. }
  321. public IConfigurationProvider Build(IConfigurationBuilder builder)
  322. {
  323. return new ConsulConfigurationProvider(_consulSource);
  324. }
  325. }
  326. /// <summary>
  327. /// 内部配置提供程序
  328. /// </summary>
  329. private sealed class ConsulConfigurationProvider : ConfigurationProvider
  330. {
  331. private readonly ConsulCfgSource _consulSource;
  332. public ConsulConfigurationProvider(ConsulCfgSource consulSource)
  333. {
  334. _consulSource = consulSource;
  335. }
  336. public override void Load()
  337. {
  338. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  339. foreach (var key in _consulSource.GetAllKeys())
  340. {
  341. if (_consulSource.TryGetValue(key, out var value))
  342. {
  343. Data[key] = value;
  344. }
  345. }
  346. }
  347. public override bool TryGet(string key, out string? value)
  348. {
  349. return _consulSource.TryGetValue(key, out value);
  350. }
  351. public new IChangeToken GetReloadToken()
  352. {
  353. return _consulSource.GetReloadToken();
  354. }
  355. }
  356. }