ConsulCfgSource.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using System.Text.Json;
  4. using Apq.Cfg.Sources;
  5. using Consul;
  6. using Microsoft.Extensions.Configuration;
  7. using Microsoft.Extensions.Configuration.Memory;
  8. using Microsoft.Extensions.Primitives;
  9. namespace Apq.Cfg.Consul;
  10. /// <summary>
  11. /// Consul 配置源
  12. /// </summary>
  13. internal sealed class ConsulCfgSource : IWritableCfgSource, IDisposable
  14. {
  15. private readonly ConsulCfgOptions _options;
  16. private readonly ConsulClient _client;
  17. private readonly ConcurrentDictionary<string, string?> _data;
  18. private readonly CancellationTokenSource _watchCts;
  19. private ulong _lastIndex;
  20. private volatile bool _disposed;
  21. private Task? _watchTask;
  22. private ConfigurationReloadToken _reloadToken;
  23. private readonly object _reloadTokenLock = new();
  24. public ConsulCfgSource(ConsulCfgOptions options, int level, bool isPrimaryWriter)
  25. {
  26. _options = options;
  27. Level = level;
  28. IsPrimaryWriter = isPrimaryWriter;
  29. _data = new ConcurrentDictionary<string, string?>();
  30. _watchCts = new CancellationTokenSource();
  31. _reloadToken = new ConfigurationReloadToken();
  32. _client = new ConsulClient(config =>
  33. {
  34. config.Address = new Uri(options.Address);
  35. if (!string.IsNullOrEmpty(options.Token))
  36. config.Token = options.Token;
  37. if (!string.IsNullOrEmpty(options.Datacenter))
  38. config.Datacenter = options.Datacenter;
  39. config.WaitTime = options.WaitTime;
  40. });
  41. // 初始加载
  42. LoadDataAsync().GetAwaiter().GetResult();
  43. // 启动热重载监听
  44. if (options.EnableHotReload)
  45. {
  46. _watchTask = WatchForChangesAsync(_watchCts.Token);
  47. }
  48. }
  49. public int Level { get; }
  50. public bool IsWriteable => true;
  51. public bool IsPrimaryWriter { get; }
  52. public void Dispose()
  53. {
  54. if (_disposed) return;
  55. _disposed = true;
  56. _watchCts.Cancel();
  57. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  58. catch { }
  59. _watchCts.Dispose();
  60. _client.Dispose();
  61. }
  62. public IConfigurationSource BuildSource()
  63. {
  64. ThrowIfDisposed();
  65. return new ConsulConfigurationSource(this);
  66. }
  67. public async Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  68. {
  69. ThrowIfDisposed();
  70. foreach (var (key, value) in changes)
  71. {
  72. var consulKey = GetConsulKey(key);
  73. if (value is null)
  74. {
  75. await _client.KV.Delete(consulKey, cancellationToken).ConfigureAwait(false);
  76. _data.TryRemove(key, out _);
  77. }
  78. else
  79. {
  80. var pair = new KVPair(consulKey) { Value = Encoding.UTF8.GetBytes(value) };
  81. await _client.KV.Put(pair, cancellationToken).ConfigureAwait(false);
  82. _data[key] = value;
  83. }
  84. }
  85. }
  86. private async Task LoadDataAsync()
  87. {
  88. try
  89. {
  90. switch (_options.DataFormat)
  91. {
  92. case ConsulDataFormat.KeyValue:
  93. await LoadKeyValueDataAsync().ConfigureAwait(false);
  94. break;
  95. case ConsulDataFormat.Json:
  96. await LoadJsonDataAsync().ConfigureAwait(false);
  97. break;
  98. case ConsulDataFormat.Yaml:
  99. await LoadYamlDataAsync().ConfigureAwait(false);
  100. break;
  101. }
  102. }
  103. catch
  104. {
  105. // 连接失败时保持空数据
  106. }
  107. }
  108. private async Task LoadKeyValueDataAsync()
  109. {
  110. var prefix = _options.KeyPrefix ?? "";
  111. var result = await _client.KV.List(prefix).ConfigureAwait(false);
  112. _lastIndex = result.LastIndex;
  113. if (result.Response == null) return;
  114. _data.Clear();
  115. foreach (var pair in result.Response)
  116. {
  117. if (pair.Value == null) continue;
  118. var key = pair.Key;
  119. // 去掉前缀
  120. if (!string.IsNullOrEmpty(prefix) && key.StartsWith(prefix))
  121. key = key.Substring(prefix.Length);
  122. // 将 Consul 的 / 分隔符转换为配置的 : 分隔符
  123. key = key.Replace('/', ':');
  124. var value = Encoding.UTF8.GetString(pair.Value);
  125. _data[key] = value;
  126. }
  127. }
  128. private async Task LoadJsonDataAsync()
  129. {
  130. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  131. var result = await _client.KV.Get(key).ConfigureAwait(false);
  132. _lastIndex = result.LastIndex;
  133. if (result.Response?.Value == null) return;
  134. var json = Encoding.UTF8.GetString(result.Response.Value);
  135. var flatData = FlattenJson(json);
  136. _data.Clear();
  137. foreach (var (k, v) in flatData)
  138. {
  139. _data[k] = v;
  140. }
  141. }
  142. private async Task LoadYamlDataAsync()
  143. {
  144. // YAML 支持需要额外依赖,这里简单处理为不支持
  145. // 如果需要 YAML 支持,可以引用 YamlDotNet
  146. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  147. var result = await _client.KV.Get(key).ConfigureAwait(false);
  148. _lastIndex = result.LastIndex;
  149. if (result.Response?.Value == null) return;
  150. // 暂时将整个 YAML 内容作为单个值存储
  151. var yaml = Encoding.UTF8.GetString(result.Response.Value);
  152. _data["_raw"] = yaml;
  153. }
  154. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  155. {
  156. while (!cancellationToken.IsCancellationRequested)
  157. {
  158. try
  159. {
  160. var prefix = _options.KeyPrefix ?? "";
  161. var queryOptions = new QueryOptions
  162. {
  163. WaitIndex = _lastIndex,
  164. WaitTime = _options.WaitTime
  165. };
  166. QueryResult<KVPair[]>? result;
  167. if (_options.DataFormat == ConsulDataFormat.KeyValue)
  168. {
  169. result = await _client.KV.List(prefix, queryOptions, cancellationToken).ConfigureAwait(false);
  170. }
  171. else
  172. {
  173. var key = _options.SingleKey ?? _options.KeyPrefix ?? "config";
  174. var singleResult = await _client.KV.Get(key, queryOptions, cancellationToken).ConfigureAwait(false);
  175. result = new QueryResult<KVPair[]>
  176. {
  177. LastIndex = singleResult.LastIndex,
  178. Response = singleResult.Response != null ? new[] { singleResult.Response } : Array.Empty<KVPair>()
  179. };
  180. }
  181. if (result.LastIndex > _lastIndex)
  182. {
  183. _lastIndex = result.LastIndex;
  184. await LoadDataAsync().ConfigureAwait(false);
  185. OnReload();
  186. }
  187. }
  188. catch (OperationCanceledException)
  189. {
  190. break;
  191. }
  192. catch
  193. {
  194. // 连接失败,等待后重试
  195. try
  196. {
  197. await Task.Delay(_options.ReconnectInterval, cancellationToken).ConfigureAwait(false);
  198. }
  199. catch (OperationCanceledException)
  200. {
  201. break;
  202. }
  203. }
  204. }
  205. }
  206. private void OnReload()
  207. {
  208. ConfigurationReloadToken previousToken;
  209. lock (_reloadTokenLock)
  210. {
  211. previousToken = _reloadToken;
  212. _reloadToken = new ConfigurationReloadToken();
  213. }
  214. previousToken.OnReload();
  215. }
  216. private string GetConsulKey(string configKey)
  217. {
  218. // 将配置的 : 分隔符转换为 Consul 的 / 分隔符
  219. var consulKey = configKey.Replace(':', '/');
  220. var prefix = _options.KeyPrefix ?? "";
  221. return string.IsNullOrEmpty(prefix) ? consulKey : prefix + consulKey;
  222. }
  223. private static Dictionary<string, string?> FlattenJson(string json)
  224. {
  225. var result = new Dictionary<string, string?>();
  226. try
  227. {
  228. using var doc = JsonDocument.Parse(json);
  229. FlattenJsonElement(doc.RootElement, "", result);
  230. }
  231. catch
  232. {
  233. // JSON 解析失败
  234. }
  235. return result;
  236. }
  237. private static void FlattenJsonElement(JsonElement element, string prefix, Dictionary<string, string?> result)
  238. {
  239. switch (element.ValueKind)
  240. {
  241. case JsonValueKind.Object:
  242. foreach (var property in element.EnumerateObject())
  243. {
  244. var key = string.IsNullOrEmpty(prefix) ? property.Name : $"{prefix}:{property.Name}";
  245. FlattenJsonElement(property.Value, key, result);
  246. }
  247. break;
  248. case JsonValueKind.Array:
  249. var index = 0;
  250. foreach (var item in element.EnumerateArray())
  251. {
  252. var key = $"{prefix}:{index}";
  253. FlattenJsonElement(item, key, result);
  254. index++;
  255. }
  256. break;
  257. case JsonValueKind.String:
  258. result[prefix] = element.GetString();
  259. break;
  260. case JsonValueKind.Number:
  261. result[prefix] = element.GetRawText();
  262. break;
  263. case JsonValueKind.True:
  264. result[prefix] = "true";
  265. break;
  266. case JsonValueKind.False:
  267. result[prefix] = "false";
  268. break;
  269. case JsonValueKind.Null:
  270. result[prefix] = null;
  271. break;
  272. }
  273. }
  274. private void ThrowIfDisposed()
  275. {
  276. if (_disposed) throw new ObjectDisposedException(nameof(ConsulCfgSource));
  277. }
  278. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  279. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  280. internal IChangeToken GetReloadToken()
  281. {
  282. lock (_reloadTokenLock)
  283. {
  284. return _reloadToken;
  285. }
  286. }
  287. /// <summary>
  288. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  289. /// </summary>
  290. private sealed class ConsulConfigurationSource : IConfigurationSource
  291. {
  292. private readonly ConsulCfgSource _consulSource;
  293. public ConsulConfigurationSource(ConsulCfgSource consulSource)
  294. {
  295. _consulSource = consulSource;
  296. }
  297. public IConfigurationProvider Build(IConfigurationBuilder builder)
  298. {
  299. return new ConsulConfigurationProvider(_consulSource);
  300. }
  301. }
  302. /// <summary>
  303. /// 内部配置提供程序
  304. /// </summary>
  305. private sealed class ConsulConfigurationProvider : ConfigurationProvider
  306. {
  307. private readonly ConsulCfgSource _consulSource;
  308. public ConsulConfigurationProvider(ConsulCfgSource consulSource)
  309. {
  310. _consulSource = consulSource;
  311. }
  312. public override void Load()
  313. {
  314. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  315. foreach (var key in _consulSource.GetAllKeys())
  316. {
  317. if (_consulSource.TryGetValue(key, out var value))
  318. {
  319. Data[key] = value;
  320. }
  321. }
  322. }
  323. public override bool TryGet(string key, out string? value)
  324. {
  325. return _consulSource.TryGetValue(key, out value);
  326. }
  327. public new IChangeToken GetReloadToken()
  328. {
  329. return _consulSource.GetReloadToken();
  330. }
  331. }
  332. }