EtcdCfgSource.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using System.Text.Json;
  4. using Apq.Cfg.Sources;
  5. using dotnet_etcd;
  6. using Etcdserverpb;
  7. using Microsoft.Extensions.Configuration;
  8. using Microsoft.Extensions.Primitives;
  9. namespace Apq.Cfg.Etcd;
  10. /// <summary>
  11. /// Etcd 配置源
  12. /// </summary>
  13. internal sealed class EtcdCfgSource : IWritableCfgSource, IDisposable
  14. {
  15. private readonly EtcdCfgOptions _options;
  16. private readonly EtcdClient _client;
  17. private readonly ConcurrentDictionary<string, string?> _data;
  18. private readonly CancellationTokenSource _watchCts;
  19. private volatile bool _disposed;
  20. private Task? _watchTask;
  21. private ConfigurationReloadToken _reloadToken;
  22. private readonly object _reloadTokenLock = new();
  23. public EtcdCfgSource(EtcdCfgOptions options, int level, bool isPrimaryWriter)
  24. {
  25. _options = options;
  26. Level = level;
  27. IsPrimaryWriter = isPrimaryWriter;
  28. _data = new ConcurrentDictionary<string, string?>();
  29. _watchCts = new CancellationTokenSource();
  30. _reloadToken = new ConfigurationReloadToken();
  31. // 创建 Etcd 客户端
  32. var connectionString = string.Join(",", options.Endpoints);
  33. _client = new EtcdClient(connectionString);
  34. // 认证
  35. if (!string.IsNullOrEmpty(options.Username) && !string.IsNullOrEmpty(options.Password))
  36. {
  37. _client.Authenticate(new AuthenticateRequest
  38. {
  39. Name = options.Username,
  40. Password = options.Password
  41. });
  42. }
  43. // 初始加载
  44. LoadDataAsync().GetAwaiter().GetResult();
  45. // 启动热重载监听
  46. if (options.EnableHotReload)
  47. {
  48. _watchTask = WatchForChangesAsync(_watchCts.Token);
  49. }
  50. }
  51. public int Level { get; }
  52. public bool IsWriteable => true;
  53. public bool IsPrimaryWriter { get; }
  54. public void Dispose()
  55. {
  56. if (_disposed) return;
  57. _disposed = true;
  58. _watchCts.Cancel();
  59. try { _watchTask?.Wait(TimeSpan.FromSeconds(2)); }
  60. catch { }
  61. _watchCts.Dispose();
  62. _client.Dispose();
  63. }
  64. public IConfigurationSource BuildSource()
  65. {
  66. ThrowIfDisposed();
  67. return new EtcdConfigurationSource(this);
  68. }
  69. public async Task ApplyChangesAsync(IReadOnlyDictionary<string, string?> changes, CancellationToken cancellationToken)
  70. {
  71. ThrowIfDisposed();
  72. foreach (var (key, value) in changes)
  73. {
  74. var etcdKey = GetEtcdKey(key);
  75. if (value is null)
  76. {
  77. await _client.DeleteAsync(etcdKey).ConfigureAwait(false);
  78. _data.TryRemove(key, out _);
  79. }
  80. else
  81. {
  82. await _client.PutAsync(etcdKey, value).ConfigureAwait(false);
  83. _data[key] = value;
  84. }
  85. }
  86. }
  87. private async Task LoadDataAsync()
  88. {
  89. try
  90. {
  91. switch (_options.DataFormat)
  92. {
  93. case EtcdDataFormat.KeyValue:
  94. await LoadKeyValueDataAsync().ConfigureAwait(false);
  95. break;
  96. case EtcdDataFormat.Json:
  97. await LoadJsonDataAsync().ConfigureAwait(false);
  98. break;
  99. }
  100. }
  101. catch
  102. {
  103. // 连接失败时保持空数据
  104. }
  105. }
  106. private async Task LoadKeyValueDataAsync()
  107. {
  108. var prefix = _options.KeyPrefix ?? "/config/";
  109. var response = await _client.GetRangeAsync(prefix).ConfigureAwait(false);
  110. _data.Clear();
  111. foreach (var kv in response.Kvs)
  112. {
  113. var key = kv.Key.ToStringUtf8();
  114. // 去掉前缀
  115. if (!string.IsNullOrEmpty(prefix) && key.StartsWith(prefix))
  116. key = key.Substring(prefix.Length);
  117. // 将 Etcd 的 / 分隔符转换为配置的 : 分隔符
  118. key = key.Replace('/', ':');
  119. var value = kv.Value.ToStringUtf8();
  120. _data[key] = value;
  121. }
  122. }
  123. private async Task LoadJsonDataAsync()
  124. {
  125. var key = _options.SingleKey ?? _options.KeyPrefix ?? "/config";
  126. var response = await _client.GetAsync(key).ConfigureAwait(false);
  127. if (response.Kvs.Count == 0) return;
  128. var json = response.Kvs[0].Value.ToStringUtf8();
  129. var flatData = FlattenJson(json);
  130. _data.Clear();
  131. foreach (var (k, v) in flatData)
  132. {
  133. _data[k] = v;
  134. }
  135. }
  136. private async Task WatchForChangesAsync(CancellationToken cancellationToken)
  137. {
  138. while (!cancellationToken.IsCancellationRequested)
  139. {
  140. try
  141. {
  142. var prefix = _options.KeyPrefix ?? "/config/";
  143. if (_options.DataFormat == EtcdDataFormat.KeyValue)
  144. {
  145. // 监听前缀下的所有键变更
  146. _client.WatchRange(prefix, async (response) =>
  147. {
  148. if (response.Events.Count > 0)
  149. {
  150. await LoadDataAsync().ConfigureAwait(false);
  151. OnReload();
  152. }
  153. }, cancellationToken: cancellationToken);
  154. }
  155. else
  156. {
  157. var key = _options.SingleKey ?? _options.KeyPrefix ?? "/config";
  158. _client.Watch(key, async (response) =>
  159. {
  160. if (response.Events.Count > 0)
  161. {
  162. await LoadDataAsync().ConfigureAwait(false);
  163. OnReload();
  164. }
  165. }, cancellationToken: cancellationToken);
  166. }
  167. // 等待取消
  168. await Task.Delay(Timeout.Infinite, cancellationToken).ConfigureAwait(false);
  169. }
  170. catch (OperationCanceledException)
  171. {
  172. break;
  173. }
  174. catch
  175. {
  176. // 连接失败,等待后重试
  177. try
  178. {
  179. await Task.Delay(_options.ReconnectInterval, cancellationToken).ConfigureAwait(false);
  180. }
  181. catch (OperationCanceledException)
  182. {
  183. break;
  184. }
  185. }
  186. }
  187. }
  188. private void OnReload()
  189. {
  190. ConfigurationReloadToken previousToken;
  191. lock (_reloadTokenLock)
  192. {
  193. previousToken = _reloadToken;
  194. _reloadToken = new ConfigurationReloadToken();
  195. }
  196. previousToken.OnReload();
  197. }
  198. private string GetEtcdKey(string configKey)
  199. {
  200. // 将配置的 : 分隔符转换为 Etcd 的 / 分隔符
  201. var etcdKey = configKey.Replace(':', '/');
  202. var prefix = _options.KeyPrefix ?? "/config/";
  203. return string.IsNullOrEmpty(prefix) ? etcdKey : prefix + etcdKey;
  204. }
  205. private static Dictionary<string, string?> FlattenJson(string json)
  206. {
  207. var result = new Dictionary<string, string?>();
  208. try
  209. {
  210. using var doc = JsonDocument.Parse(json);
  211. FlattenJsonElement(doc.RootElement, "", result);
  212. }
  213. catch
  214. {
  215. // JSON 解析失败
  216. }
  217. return result;
  218. }
  219. private static void FlattenJsonElement(JsonElement element, string prefix, Dictionary<string, string?> result)
  220. {
  221. switch (element.ValueKind)
  222. {
  223. case JsonValueKind.Object:
  224. foreach (var property in element.EnumerateObject())
  225. {
  226. var key = string.IsNullOrEmpty(prefix) ? property.Name : $"{prefix}:{property.Name}";
  227. FlattenJsonElement(property.Value, key, result);
  228. }
  229. break;
  230. case JsonValueKind.Array:
  231. var index = 0;
  232. foreach (var item in element.EnumerateArray())
  233. {
  234. var key = $"{prefix}:{index}";
  235. FlattenJsonElement(item, key, result);
  236. index++;
  237. }
  238. break;
  239. case JsonValueKind.String:
  240. result[prefix] = element.GetString();
  241. break;
  242. case JsonValueKind.Number:
  243. result[prefix] = element.GetRawText();
  244. break;
  245. case JsonValueKind.True:
  246. result[prefix] = "true";
  247. break;
  248. case JsonValueKind.False:
  249. result[prefix] = "false";
  250. break;
  251. case JsonValueKind.Null:
  252. result[prefix] = null;
  253. break;
  254. }
  255. }
  256. private void ThrowIfDisposed()
  257. {
  258. if (_disposed) throw new ObjectDisposedException(nameof(EtcdCfgSource));
  259. }
  260. internal IEnumerable<string> GetAllKeys() => _data.Keys;
  261. internal bool TryGetValue(string key, out string? value) => _data.TryGetValue(key, out value);
  262. internal IChangeToken GetReloadToken()
  263. {
  264. lock (_reloadTokenLock)
  265. {
  266. return _reloadToken;
  267. }
  268. }
  269. /// <summary>
  270. /// 内部配置源,用于集成到 Microsoft.Extensions.Configuration
  271. /// </summary>
  272. private sealed class EtcdConfigurationSource : IConfigurationSource
  273. {
  274. private readonly EtcdCfgSource _etcdSource;
  275. public EtcdConfigurationSource(EtcdCfgSource etcdSource)
  276. {
  277. _etcdSource = etcdSource;
  278. }
  279. public IConfigurationProvider Build(IConfigurationBuilder builder)
  280. {
  281. return new EtcdConfigurationProvider(_etcdSource);
  282. }
  283. }
  284. /// <summary>
  285. /// 内部配置提供程序
  286. /// </summary>
  287. private sealed class EtcdConfigurationProvider : ConfigurationProvider
  288. {
  289. private readonly EtcdCfgSource _etcdSource;
  290. public EtcdConfigurationProvider(EtcdCfgSource etcdSource)
  291. {
  292. _etcdSource = etcdSource;
  293. }
  294. public override void Load()
  295. {
  296. Data = new Dictionary<string, string?>(StringComparer.OrdinalIgnoreCase);
  297. foreach (var key in _etcdSource.GetAllKeys())
  298. {
  299. if (_etcdSource.TryGetValue(key, out var value))
  300. {
  301. Data[key] = value;
  302. }
  303. }
  304. }
  305. public override bool TryGet(string key, out string? value)
  306. {
  307. return _etcdSource.TryGetValue(key, out value);
  308. }
  309. public new IChangeToken GetReloadToken()
  310. {
  311. return _etcdSource.GetReloadToken();
  312. }
  313. }
  314. }